2024-01-21 19:34:01 +00:00
|
|
|
import sys
|
|
|
|
sys.path.append('modem')
|
|
|
|
|
|
|
|
import unittest
|
2024-01-27 11:07:07 +00:00
|
|
|
import queue
|
2024-01-30 08:30:13 +00:00
|
|
|
from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES
|
2024-01-27 11:07:07 +00:00
|
|
|
from event_manager import EventManager
|
2024-02-02 18:50:16 +00:00
|
|
|
from state_manager import StateManager
|
2024-01-21 19:34:01 +00:00
|
|
|
|
|
|
|
class TestDispatcher(unittest.TestCase):
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def setUpClass(cls):
|
2024-01-27 11:07:07 +00:00
|
|
|
cls.event_queue = queue.Queue()
|
2024-02-02 18:50:16 +00:00
|
|
|
cls.state_queue = queue.Queue()
|
2024-01-27 11:07:07 +00:00
|
|
|
cls.event_manager = EventManager([cls.event_queue])
|
2024-02-02 18:50:16 +00:00
|
|
|
cls.state_manager = StateManager([cls.state_queue])
|
|
|
|
cls.arq_data_type_handler = ARQDataTypeHandler(cls.event_manager, cls.state_manager)
|
2024-01-21 19:34:01 +00:00
|
|
|
|
|
|
|
|
2024-01-27 11:07:07 +00:00
|
|
|
def testDataTypeHevent_managerandlerRaw(self):
|
2024-01-21 19:34:01 +00:00
|
|
|
# Example usage
|
|
|
|
example_data = b"Hello FreeDATA!"
|
2024-01-30 08:30:13 +00:00
|
|
|
formatted_data, type_byte = self.arq_data_type_handler.prepare(example_data, ARQ_SESSION_TYPES.raw)
|
2024-02-24 21:27:01 +00:00
|
|
|
dispatched_data = self.arq_data_type_handler.dispatch(type_byte, formatted_data, statistics={})
|
2024-01-21 19:34:01 +00:00
|
|
|
self.assertEqual(example_data, dispatched_data)
|
|
|
|
|
|
|
|
def testDataTypeHandlerLZMA(self):
|
|
|
|
# Example usage
|
|
|
|
example_data = b"Hello FreeDATA!"
|
2024-01-30 08:30:13 +00:00
|
|
|
formatted_data, type_byte = self.arq_data_type_handler.prepare(example_data, ARQ_SESSION_TYPES.raw_lzma)
|
2024-02-24 21:27:01 +00:00
|
|
|
dispatched_data = self.arq_data_type_handler.dispatch(type_byte, formatted_data, statistics={})
|
2024-01-21 19:34:01 +00:00
|
|
|
self.assertEqual(example_data, dispatched_data)
|
|
|
|
|
|
|
|
def testDataTypeHandlerGZIP(self):
|
|
|
|
# Example usage
|
|
|
|
example_data = b"Hello FreeDATA!"
|
2024-01-30 08:30:13 +00:00
|
|
|
formatted_data, type_byte = self.arq_data_type_handler.prepare(example_data, ARQ_SESSION_TYPES.raw_gzip)
|
2024-02-24 21:27:01 +00:00
|
|
|
dispatched_data = self.arq_data_type_handler.dispatch(type_byte, formatted_data, statistics={})
|
2024-01-21 19:34:01 +00:00
|
|
|
self.assertEqual(example_data, dispatched_data)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|