Move codec2 functions to codec2 interface

This commit is contained in:
Pedro 2023-11-25 23:14:49 +01:00
parent bfaa284837
commit f96c566223
6 changed files with 75 additions and 65 deletions

View file

@ -425,3 +425,46 @@ class resampler:
self.filter_mem8 = in8_mem[: self.MEM8]
return out48
def open_instance(mode: int) -> ctypes.c_void_p:
"""
Return a codec2 instance of the type `mode`
:param mode: Type of codec2 instance to return
:type mode: Union[int, str]
:return: C-function of the requested codec2 instance
:rtype: ctypes.c_void_p
"""
if mode in [FREEDV_MODE.fsk_ldpc_0.value]:
return ctypes.cast(
api.freedv_open_advanced(
FREEDV_MODE.fsk_ldpc.value,
ctypes.byref(api.FREEDV_MODE_FSK_LDPC_0_ADV),
),
ctypes.c_void_p,
)
if mode in [FREEDV_MODE.fsk_ldpc_1.value]:
return ctypes.cast(
api.freedv_open_advanced(
FREEDV_MODE.fsk_ldpc.value,
ctypes.byref(api.FREEDV_MODE_FSK_LDPC_1_ADV),
),
ctypes.c_void_p,
)
return ctypes.cast(api.freedv_open(mode), ctypes.c_void_p)
def get_bytes_per_frame(mode: int) -> int:
"""
Provide bytes per frame information for accessing from data handler
:param mode: Codec2 mode to query
:type mode: int or str
:return: Bytes per frame of the supplied codec2 data mode
:rtype: int
"""
freedv = open_instance(mode)
# TODO add close session
# get number of bytes per frame for mode
return int(api.freedv_get_bits_per_modem_frame(freedv) / 8)

View file

@ -17,7 +17,7 @@ class TxCommand():
def get_name(self):
return type(self).__name__
def emit_event(self):
def emit_event(self, event_queue):
pass
def log_message(self):
@ -26,14 +26,17 @@ class TxCommand():
def build_frame(self):
pass
def get_c2_mode(self):
c2_mode = FREEDV_MODE.fsk_ldpc_0.value if self.config.enable_fsk else FREEDV_MODE.sig0.value
return c2_mode
def transmit(self, tx_frame_queue):
frame = self.build_frame()
c2_mode = FREEDV_MODE.fsk_ldpc_0.value if self.config.enable_fsk else FREEDV_MODE.sig0.value
tx_queue_item = [c2_mode, 1, 0, frame]
tx_queue_item = [self.get_c2_mode(), 1, 0, frame]
tx_frame_queue.put(tx_queue_item)
def run(self, tx_frame_queue: queue.Queue):
self.emit_event()
def run(self, event_queue: queue.Queue, tx_frame_queue: queue.Queue):
self.emit_event(event_queue)
self.logger.info(self.log_message)
self.transmit(tx_frame_queue)
pass

View file

@ -6,7 +6,7 @@ class PingCommand(TxCommand):
self.dxcall = apiParams['dxcall']
return super().setParamsFromApi()
def execute(self, modem_state, tx_frame_queue):
def run(self, modem_state, tx_frame_queue):
frame = self.frame_factory.build_ping(self.dxcall)

View file

@ -5,14 +5,21 @@ import codec2
class DataFrameFactory:
def __init__(self):
self.modem_config = modem.config
self.modem_state = modem.state
self.myfullcall = f"{self.modem_config['STATION']['mycall']}-{self.modem_config['STATION']['myssid']}"
def build(self):
build_method = getattr(self, self.type.name)
return build_method()
def get_bytes_per_frame(mode: int) -> int:
"""
Provide bytes per frame information for accessing from data handler
:param mode: Codec2 mode to query
:type mode: int or str
:return: Bytes per frame of the supplied codec2 data mode
:rtype: int
"""
freedv = codec2.open_instance(mode)
# TODO add close session
# get number of bytes per frame for mode
return int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8)
def build_ping(self, dxcallsign):
ping_frame = bytearray(self.length_sig0_frame)
@ -50,7 +57,7 @@ class DataFrameFactory:
beacon_frame[7:11] = helpers.encode_grid(self.mygrid)
return beacon_frame
def build_fec_wakeup(self):
def build_fec_wakeup(self, mode):
mode_int_wakeup = codec2.freedv_get_mode_value_by_name("sig0")
payload_per_wakeup_frame = self.modem.get_bytes_per_frame(mode_int_wakeup) - 2
fec_wakeup_frame = bytearray(payload_per_wakeup_frame)

View file

@ -120,8 +120,8 @@ class DISPATCHER():
"""Dispatch incoming UI instructions for transmitting operations"""
while True:
command = self.data_queue_transmit.get()
command.execute(MODEM_TRANSMIT_QUEUE)
next
command.execute(self.event_queue, MODEM_TRANSMIT_QUEUE)
continue
# Dispatch commands known to command_dispatcher
if data[0] in self.command_dispatcher:

View file

@ -963,13 +963,13 @@ class RF:
)
# INIT TX MODES - here we need all modes.
self.freedv_datac0_tx = open_codec2_instance(codec2.FREEDV_MODE.datac0.value)
self.freedv_datac1_tx = open_codec2_instance(codec2.FREEDV_MODE.datac1.value)
self.freedv_datac3_tx = open_codec2_instance(codec2.FREEDV_MODE.datac3.value)
self.freedv_datac4_tx = open_codec2_instance(codec2.FREEDV_MODE.datac4.value)
self.freedv_datac13_tx = open_codec2_instance(codec2.FREEDV_MODE.datac13.value)
self.freedv_ldpc0_tx = open_codec2_instance(codec2.FREEDV_MODE.fsk_ldpc_0.value)
self.freedv_ldpc1_tx = open_codec2_instance(codec2.FREEDV_MODE.fsk_ldpc_1.value)
self.freedv_datac0_tx = codec2.open_instance(codec2.FREEDV_MODE.datac0.value)
self.freedv_datac1_tx = codec2.open_instance(codec2.FREEDV_MODE.datac1.value)
self.freedv_datac3_tx = codec2.open_instance(codec2.FREEDV_MODE.datac3.value)
self.freedv_datac4_tx = codec2.open_instance(codec2.FREEDV_MODE.datac4.value)
self.freedv_datac13_tx = codec2.open_instance(codec2.FREEDV_MODE.datac13.value)
self.freedv_ldpc0_tx = codec2.open_instance(codec2.FREEDV_MODE.fsk_ldpc_0.value)
self.freedv_ldpc1_tx = codec2.open_instance(codec2.FREEDV_MODE.fsk_ldpc_1.value)
def init_codec2_mode(self, mode, adv):
"""
@ -1470,49 +1470,6 @@ class RF:
def set_FFT_stream(self, enable: bool):
self.enable_fft = enable
def open_codec2_instance(mode: int) -> ctypes.c_void_p:
"""
Return a codec2 instance of the type `mode`
:param mode: Type of codec2 instance to return
:type mode: Union[int, str]
:return: C-function of the requested codec2 instance
:rtype: ctypes.c_void_p
"""
if mode in [codec2.FREEDV_MODE.fsk_ldpc_0.value]:
return ctypes.cast(
codec2.api.freedv_open_advanced(
codec2.FREEDV_MODE.fsk_ldpc.value,
ctypes.byref(codec2.api.FREEDV_MODE_FSK_LDPC_0_ADV),
),
ctypes.c_void_p,
)
if mode in [codec2.FREEDV_MODE.fsk_ldpc_1.value]:
return ctypes.cast(
codec2.api.freedv_open_advanced(
codec2.FREEDV_MODE.fsk_ldpc.value,
ctypes.byref(codec2.api.FREEDV_MODE_FSK_LDPC_1_ADV),
),
ctypes.c_void_p,
)
return ctypes.cast(codec2.api.freedv_open(mode), ctypes.c_void_p)
def get_bytes_per_frame(mode: int) -> int:
"""
Provide bytes per frame information for accessing from data handler
:param mode: Codec2 mode to query
:type mode: int or str
:return: Bytes per frame of the supplied codec2 data mode
:rtype: int
"""
freedv = open_codec2_instance(mode)
# TODO add close session
# get number of bytes per frame for mode
return int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8)
def set_audio_volume(datalist: np.ndarray, dB: float) -> np.ndarray:
"""
Scale values for the provided audio samples by dB.