WIP ARQ - fixed tests

This commit is contained in:
DJ2LS 2023-12-16 12:54:16 +01:00
parent 6d80d15859
commit 6f7166201f
6 changed files with 53 additions and 27 deletions

View file

@ -48,3 +48,9 @@ class TxCommand():
self.emit_event(event_queue) self.emit_event(event_queue)
self.logger.info(self.log_message()) self.logger.info(self.log_message())
self.transmit(modem) self.transmit(modem)
def test(self, event_queue: queue.Queue):
self.emit_event(event_queue)
self.logger.info(self.log_message())
frame = self.build_frame()
return frame

View file

@ -93,7 +93,6 @@ class DISPATCHER():
self.states, self.states,
self.event_manager, self.event_manager,
self.modem) self.modem)
handler.handle(deconstructed_frame, snr, frequency_offset, freedv, bytes_per_frame) handler.handle(deconstructed_frame, snr, frequency_offset, freedv, bytes_per_frame)
def get_id_from_frame(self, data): def get_id_from_frame(self, data):

View file

@ -6,6 +6,8 @@ import structlog
import time, uuid import time, uuid
from codec2 import FREEDV_MODE from codec2 import FREEDV_MODE
TESTMODE = False
class FrameHandler(): class FrameHandler():
def __init__(self, name: str, config, states: StateManager, event_manager: EventManager, def __init__(self, name: str, config, states: StateManager, event_manager: EventManager,
@ -87,14 +89,13 @@ class FrameHandler():
self.event_manager.broadcast(event_data) self.event_manager.broadcast(event_data)
def get_tx_mode(self): def get_tx_mode(self):
return ( return FREEDV_MODE.signalling.value
FREEDV_MODE.fsk_ldpc_0.value
if self.config['MODEM']['enable_fsk']
else FREEDV_MODE.sig0.value
)
def transmit(self, frame): def transmit(self, frame):
self.modem.transmit(self.get_tx_mode(), 1, 0, frame) if not TESTMODE:
self.modem.transmit(self.get_tx_mode(), 1, 0, frame)
else:
self.event_manager.broadcast(frame)
def follow_protocol(self): def follow_protocol(self):
pass pass

View file

@ -322,9 +322,10 @@ def check_callsign(callsign: str, crc_to_check: bytes, ssid_list):
#call_with_ssid.extend(str(ssid).encode("utf-8")) #call_with_ssid.extend(str(ssid).encode("utf-8"))
callsign_crc = get_crc_24(call_with_ssid) callsign_crc = get_crc_24(call_with_ssid)
callsign_crc = callsign_crc.hex()
if callsign_crc == crc_to_check: if callsign_crc == crc_to_check:
log.debug("[HLP] check_callsign matched:", call_with_ssid=call_with_ssid) log.debug("[HLP] check_callsign matched:", call_with_ssid=call_with_ssid, checksum=crc_to_check)
return [True, call_with_ssid.decode()] return [True, call_with_ssid.decode()]
return [False, b''] return [False, b'']

View file

@ -26,6 +26,8 @@ import event_manager
import beacon import beacon
import demodulator import demodulator
TESTMODE = False
class RF: class RF:
"""Class to encapsulate interactions between the audio device and codec2""" """Class to encapsulate interactions between the audio device and codec2"""
@ -108,13 +110,14 @@ class RF:
self.tci_module.push_audio(audio_48k) self.tci_module.push_audio(audio_48k)
def start_modem(self): def start_modem(self):
result = False # testmode: We need to call the modem without audio parts for running protocol tests
if self.radiocontrol not in ["tci"]: if self.radiocontrol not in ["tci"]:
result = self.init_audio() result = self.init_audio() if not TESTMODE else True
if not result: if not result:
raise RuntimeError("Unable to init audio devices") raise RuntimeError("Unable to init audio devices")
self.demodulator.start(self.sd_input_stream) if not TESTMODE:
self.demodulator.start(self.sd_input_stream)
else: else:
result = self.init_tci() result = self.init_tci()
@ -128,7 +131,8 @@ class RF:
# init data thread # init data thread
self.init_data_threads() self.init_data_threads()
atexit.register(self.sd_input_stream.stop) if not TESTMODE:
atexit.register(self.sd_input_stream.stop)
# init beacon # init beacon
self.beacon.start() self.beacon.start()
@ -225,6 +229,7 @@ class RF:
daemon=True, daemon=True,
) )
tci_tx_callback_thread.start() tci_tx_callback_thread.start()
return True
def audio_auto_tune(self): def audio_auto_tune(self):
# enable / disable AUDIO TUNE Feature / ALC correction # enable / disable AUDIO TUNE Feature / ALC correction
@ -267,6 +272,10 @@ class RF:
frames: frames:
""" """
if TESTMODE:
return
self.demodulator.reset_data_sync() self.demodulator.reset_data_sync()
# get freedv instance by mode # get freedv instance by mode
mode_transition = { mode_transition = {
@ -438,6 +447,7 @@ class RF:
end_of_transmission = time.time() end_of_transmission = time.time()
transmission_time = end_of_transmission - start_of_transmission transmission_time = end_of_transmission - start_of_transmission
self.log.debug("[MDM] ON AIR TIME", time=transmission_time) self.log.debug("[MDM] ON AIR TIME", time=transmission_time)
return True
def transmit_morse(self, repeats, repeat_delay, frames): def transmit_morse(self, repeats, repeat_delay, frames):
self.states.waitForTransmission() self.states.waitForTransmission()

View file

@ -10,6 +10,8 @@ from state_manager import StateManager
from command_ping import PingCommand from command_ping import PingCommand
from command_cq import CQCommand from command_cq import CQCommand
import modem import modem
import frame_handler
class TestProtocols(unittest.TestCase): class TestProtocols(unittest.TestCase):
@ -26,15 +28,16 @@ class TestProtocols(unittest.TestCase):
cls.modem_transmit_queue = queue.Queue() cls.modem_transmit_queue = queue.Queue()
cls.modem = modem.RF(cls.config, cls.event_queue, queue.Queue(), queue.Queue(), cls.state_manager) cls.modem = modem.RF(cls.config, cls.event_queue, queue.Queue(), queue.Queue(), cls.state_manager)
modem.TESTMODE = True
frame_handler.TESTMODE = True
#cls.modem.start_modem()
cls.frame_dispatcher = DISPATCHER(cls.config, cls.frame_dispatcher = DISPATCHER(cls.config,
cls.event_queue, cls.event_queue,
cls.state_manager, cls.state_manager,
cls.modem) cls.modem)
def shortcutTransmission(self, frame_bytes):
def shortcutTransmission(self):
transmission_item = self.modem_transmit_queue.get()
frame_bytes = bytes(transmission_item['frame'])
self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0) self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
def assertEventReceivedType(self, event_type): def assertEventReceivedType(self, event_type):
@ -45,30 +48,36 @@ class TestProtocols(unittest.TestCase):
def testPingWithAck(self): def testPingWithAck(self):
# Run ping command # Run ping command
api_params = { "dxcall": "XX1XXX-7" } api_params = { "dxcall": "XX1XXX-7"}
ping_cmd = PingCommand(self.config, self.state_manager, self.event_queue, api_params) ping_cmd = PingCommand(self.config, self.state_manager, self.event_queue, api_params)
ping_cmd.run(self.event_queue, self.modem_transmit_queue) #ping_cmd.run(self.event_queue, self.modem)
frame = ping_cmd.test(self.event_queue)
# Shortcut the transmit queue directly to the frame dispatcher # Shortcut the transmit queue directly to the frame dispatcher
self.shortcutTransmission() self.shortcutTransmission(frame)
self.assertEventReceivedType('PING') self.assertEventReceivedType('PING')
event_frame = self.event_queue.get()
# Check ACK # Check ACK
self.shortcutTransmission() self.shortcutTransmission(event_frame)
self.assertEventReceivedType('PING_ACK') self.assertEventReceivedType('PING_ACK')
print("PING/PING ACK CHECK SUCCESSFULLY")
def testCQWithQRV(self): def testCQWithQRV(self):
self.config['MODEM']['respond_to_cq'] = True self.config['MODEM']['respond_to_cq'] = True
api_params = {} api_params = {}
cmd = CQCommand(self.config, self.state_manager, self.event_queue, api_params) cmd = CQCommand(self.config, self.state_manager, self.event_queue, api_params)
cmd.run(self.event_queue, self.modem_transmit_queue) #cmd.run(self.event_queue, self.modem)
frame = cmd.test(self.event_queue)
self.shortcutTransmission() self.shortcutTransmission(frame)
self.assertEventReceivedType('CQ') self.assertEventReceivedType('CQ')
self.shortcutTransmission() event_frame = self.event_queue.get()
# Check QRV
self.shortcutTransmission(event_frame)
self.assertEventReceivedType('QRV') self.assertEventReceivedType('QRV')
print("CQ/QRV CHECK SUCCESSFULLY")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()