Add MessageP2P attachment encoding/decoding

This commit is contained in:
Pedro 2024-01-20 14:41:51 +01:00
parent 53fcc6cc56
commit ac77e1edbd
2 changed files with 34 additions and 15 deletions

View file

@ -31,28 +31,41 @@ class MessageP2P:
if 'attachments' in params: if 'attachments' in params:
for a in params['attachments']: for a in params['attachments']:
api_validations.validate_message_attachment(a) api_validations.validate_message_attachment(a)
attachments.append({ attachments.append(cls.__decode_attachment__(a))
'name': a['name'],
'type': a['type'],
'data': base64.decode(a['data']),
})
return cls(origin, dxcall, body, attachments) return cls(origin, dxcall, body, attachments)
@classmethod
def from_payload(cls, payload):
json_string = str(lzma.decompress(payload), 'utf-8')
payload_message = json.loads(json_string)
attachments = list(map(cls.__decode_attachment__, payload_message['attachments']))
return cls(payload_message['origin'], payload_message['destination'],
payload_message['body'], attachments)
def get_id(self) -> str: def get_id(self) -> str:
return f"{self.origin}.{self.destination}.{self.timestamp}" return f"{self.origin}.{self.destination}.{self.timestamp}"
def __encode_attachment__(self, binary_attachment: dict):
encoded_attachment = binary_attachment.copy()
encoded_attachment['data'] = str(base64.b64encode(binary_attachment['data']), 'utf-8')
return encoded_attachment
def __decode_attachment__(encoded_attachment: dict):
decoded_attachment = encoded_attachment.copy()
decoded_attachment['data'] = base64.b64decode(encoded_attachment['data'])
return decoded_attachment
def to_dict(self): def to_dict(self):
"""Make a dictionary out of the message data """Make a dictionary out of the message data
""" """
message = { return {
'id': self.get_id(), 'id': self.get_id(),
'origin': self.origin, 'origin': self.origin,
'destination': self.destination, 'destination': self.destination,
'body': self.body, 'body': self.body,
'attachments': self.attachments, 'attachments': list(map(self.__encode_attachment__, self.attachments)),
} }
return message
def to_payload(self): def to_payload(self):
"""Make a byte array ready to be sent out of the message data""" """Make a byte array ready to be sent out of the message data"""

View file

@ -1,5 +1,6 @@
import sys import sys
sys.path.append('modem') sys.path.append('modem')
import numpy as np
import unittest import unittest
from config import CONFIG from config import CONFIG
@ -23,15 +24,20 @@ class TestDataFrameFactory(unittest.TestCase):
self.assertEqual(message.destination, api_params['dxcall']) self.assertEqual(message.destination, api_params['dxcall'])
self.assertEqual(message.body, api_params['body']) self.assertEqual(message.body, api_params['body'])
def testToPayload(self): def testToPayloadWithAttachment(self):
api_params = { attachment = {
'dxcall': 'DJ2LS-3', 'name': 'test.gif',
'body': 'Hello World!', 'type': 'image/gif',
'data': np.random.bytes(1024)
} }
message = MessageP2P.from_api_params(self.mycall, api_params) message = MessageP2P(self.mycall, 'DJ2LS-3', 'Hello World!', [attachment])
payload = message.to_payload() payload = message.to_payload()
self.assertGreater(len(payload), 0)
self.assertIsInstance(payload, bytes) received_message = MessageP2P.from_payload(payload)
self.assertEqual(message.origin, received_message.origin)
self.assertEqual(message.destination, received_message.destination)
self.assertCountEqual(message.attachments, received_message.attachments)
self.assertEqual(attachment['data'], received_message.attachments[0]['data'])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()