esp32_bluetooth_classic_sni.../libs/scapy/contrib/automotive/uds.py
Matheus Eduardo Garbelini 86890704fd initial commit
todo: add documentation & wireshark dissector
2021-08-31 19:51:03 +08:00

1412 lines
47 KiB
Python
Executable file

# This file is part of Scapy
# See http://www.secdev.org/projects/scapy for more information
# Copyright (C) Nils Weiss <nils@we155.de>
# This program is published under a GPLv2 license
# scapy.contrib.description = Unified Diagnostic Service (UDS)
# scapy.contrib.status = loads
import struct
from itertools import product
from scapy.fields import ByteEnumField, StrField, ConditionalField, \
BitEnumField, BitField, XByteField, FieldListField, \
XShortField, X3BytesField, XIntField, ByteField, \
ShortField, ObservableDict, XShortEnumField, XByteEnumField
from scapy.packet import Packet, bind_layers, NoPayload
from scapy.config import conf
from scapy.error import log_loading
from scapy.utils import PeriodicSenderThread
from scapy.contrib.isotp import ISOTP
"""
UDS
"""
try:
if conf.contribs['UDS']['treat-response-pending-as-answer']:
pass
except KeyError:
log_loading.info("Specify \"conf.contribs['UDS'] = "
"{'treat-response-pending-as-answer': True}\" to treat "
"a negative response 'requestCorrectlyReceived-"
"ResponsePending' as answer of a request. \n"
"The default value is False.")
conf.contribs['UDS'] = {'treat-response-pending-as-answer': False}
class UDS(ISOTP):
services = ObservableDict(
{0x10: 'DiagnosticSessionControl',
0x11: 'ECUReset',
0x14: 'ClearDiagnosticInformation',
0x19: 'ReadDTCInformation',
0x22: 'ReadDataByIdentifier',
0x23: 'ReadMemoryByAddress',
0x24: 'ReadScalingDataByIdentifier',
0x27: 'SecurityAccess',
0x28: 'CommunicationControl',
0x2A: 'ReadDataPeriodicIdentifier',
0x2C: 'DynamicallyDefineDataIdentifier',
0x2E: 'WriteDataByIdentifier',
0x2F: 'InputOutputControlByIdentifier',
0x31: 'RoutineControl',
0x34: 'RequestDownload',
0x35: 'RequestUpload',
0x36: 'TransferData',
0x37: 'RequestTransferExit',
0x3D: 'WriteMemoryByAddress',
0x3E: 'TesterPresent',
0x50: 'DiagnosticSessionControlPositiveResponse',
0x51: 'ECUResetPositiveResponse',
0x54: 'ClearDiagnosticInformationPositiveResponse',
0x59: 'ReadDTCInformationPositiveResponse',
0x62: 'ReadDataByIdentifierPositiveResponse',
0x63: 'ReadMemoryByAddressPositiveResponse',
0x64: 'ReadScalingDataByIdentifierPositiveResponse',
0x67: 'SecurityAccessPositiveResponse',
0x68: 'CommunicationControlPositiveResponse',
0x6A: 'ReadDataPeriodicIdentifierPositiveResponse',
0x6C: 'DynamicallyDefineDataIdentifierPositiveResponse',
0x6E: 'WriteDataByIdentifierPositiveResponse',
0x6F: 'InputOutputControlByIdentifierPositiveResponse',
0x71: 'RoutineControlPositiveResponse',
0x74: 'RequestDownloadPositiveResponse',
0x75: 'RequestUploadPositiveResponse',
0x76: 'TransferDataPositiveResponse',
0x77: 'RequestTransferExitPositiveResponse',
0x7D: 'WriteMemoryByAddressPositiveResponse',
0x7E: 'TesterPresentPositiveResponse',
0x83: 'AccessTimingParameter',
0x84: 'SecuredDataTransmission',
0x85: 'ControlDTCSetting',
0x86: 'ResponseOnEvent',
0x87: 'LinkControl',
0xC3: 'AccessTimingParameterPositiveResponse',
0xC4: 'SecuredDataTransmissionPositiveResponse',
0xC5: 'ControlDTCSettingPositiveResponse',
0xC6: 'ResponseOnEventPositiveResponse',
0xC7: 'LinkControlPositiveResponse',
0x7f: 'NegativeResponse'})
name = 'UDS'
fields_desc = [
XByteEnumField('service', 0, services)
]
def answers(self, other):
if other.__class__ != self.__class__:
return False
if self.service == 0x7f:
return self.payload.answers(other)
if self.service == (other.service + 0x40):
if isinstance(self.payload, NoPayload) or \
isinstance(other.payload, NoPayload):
return len(self) <= len(other)
else:
return self.payload.answers(other.payload)
return False
def hashret(self):
if self.service == 0x7f:
return struct.pack('B', self.requestServiceId)
return struct.pack('B', self.service & ~0x40)
# ########################DSC###################################
class UDS_DSC(Packet):
diagnosticSessionTypes = ObservableDict({
0x00: 'ISOSAEReserved',
0x01: 'defaultSession',
0x02: 'programmingSession',
0x03: 'extendedDiagnosticSession',
0x04: 'safetySystemDiagnosticSession',
0x7F: 'ISOSAEReserved'})
name = 'DiagnosticSessionControl'
fields_desc = [
ByteEnumField('diagnosticSessionType', 0, diagnosticSessionTypes)
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_DSC.diagnosticSessionType%")
bind_layers(UDS, UDS_DSC, service=0x10)
class UDS_DSCPR(Packet):
name = 'DiagnosticSessionControlPositiveResponse'
fields_desc = [
ByteEnumField('diagnosticSessionType', 0,
UDS_DSC.diagnosticSessionTypes),
StrField('sessionParameterRecord', B"")
]
def answers(self, other):
return other.__class__ == UDS_DSC and \
other.diagnosticSessionType == self.diagnosticSessionType
@staticmethod
def modifies_ecu_state(pkt, ecu):
ecu.current_session = pkt.diagnosticSessionType
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_DSCPR.diagnosticSessionType%")
bind_layers(UDS, UDS_DSCPR, service=0x50)
# #########################ER###################################
class UDS_ER(Packet):
resetTypes = {
0x00: 'ISOSAEReserved',
0x01: 'hardReset',
0x02: 'keyOffOnReset',
0x03: 'softReset',
0x04: 'enableRapidPowerShutDown',
0x05: 'disableRapidPowerShutDown',
0x7F: 'ISOSAEReserved'}
name = 'ECUReset'
fields_desc = [
ByteEnumField('resetType', 0, resetTypes)
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_ER.resetType%")
bind_layers(UDS, UDS_ER, service=0x11)
class UDS_ERPR(Packet):
name = 'ECUResetPositiveResponse'
fields_desc = [
ByteEnumField('resetType', 0, UDS_ER.resetTypes),
ConditionalField(ByteField('powerDownTime', 0),
lambda pkt: pkt.resetType == 0x04)
]
def answers(self, other):
return other.__class__ == UDS_ER
@staticmethod
def modifies_ecu_state(_, ecu):
ecu.reset()
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_ER.resetType%")
bind_layers(UDS, UDS_ERPR, service=0x51)
# #########################SA###################################
class UDS_SA(Packet):
name = 'SecurityAccess'
fields_desc = [
ByteField('securityAccessType', 0),
ConditionalField(StrField('securityAccessDataRecord', B""),
lambda pkt: pkt.securityAccessType % 2 == 1),
ConditionalField(StrField('securityKey', B""),
lambda pkt: pkt.securityAccessType % 2 == 0)
]
@staticmethod
def get_log(pkt):
if pkt.securityAccessType % 2 == 1:
return pkt.sprintf("%UDS.service%"),\
(pkt.securityAccessType, None)
else:
return pkt.sprintf("%UDS.service%"),\
(pkt.securityAccessType, pkt.securityKey)
bind_layers(UDS, UDS_SA, service=0x27)
class UDS_SAPR(Packet):
name = 'SecurityAccessPositiveResponse'
fields_desc = [
ByteField('securityAccessType', 0),
ConditionalField(StrField('securitySeed', B""),
lambda pkt: pkt.securityAccessType % 2 == 1),
]
def answers(self, other):
return other.__class__ == UDS_SA \
and other.securityAccessType == self.securityAccessType
@staticmethod
def modifies_ecu_state(pkt, ecu):
if pkt.securityAccessType % 2 == 0:
ecu.current_security_level = pkt.securityAccessType
@staticmethod
def get_log(pkt):
if pkt.securityAccessType % 2 == 0:
return pkt.sprintf("%UDS.service%"),\
(pkt.securityAccessType, None)
else:
return pkt.sprintf("%UDS.service%"),\
(pkt.securityAccessType, pkt.securitySeed)
bind_layers(UDS, UDS_SAPR, service=0x67)
# #########################CC###################################
class UDS_CC(Packet):
controlTypes = {
0x00: 'enableRxAndTx',
0x01: 'enableRxAndDisableTx',
0x02: 'disableRxAndEnableTx',
0x03: 'disableRxAndTx'
}
name = 'CommunicationControl'
fields_desc = [
ByteEnumField('controlType', 0, controlTypes),
BitEnumField('communicationType0', 0, 2,
{0: 'ISOSAEReserved',
1: 'normalCommunicationMessages',
2: 'networkManagmentCommunicationMessages',
3: 'networkManagmentCommunicationMessages and '
'normalCommunicationMessages'}),
BitField('communicationType1', 0, 2),
BitEnumField('communicationType2', 0, 4,
{0: 'Disable/Enable specified communication Type',
1: 'Disable/Enable specific subnet',
2: 'Disable/Enable specific subnet',
3: 'Disable/Enable specific subnet',
4: 'Disable/Enable specific subnet',
5: 'Disable/Enable specific subnet',
6: 'Disable/Enable specific subnet',
7: 'Disable/Enable specific subnet',
8: 'Disable/Enable specific subnet',
9: 'Disable/Enable specific subnet',
10: 'Disable/Enable specific subnet',
11: 'Disable/Enable specific subnet',
12: 'Disable/Enable specific subnet',
13: 'Disable/Enable specific subnet',
14: 'Disable/Enable specific subnet',
15: 'Disable/Enable network'})
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_CC.controlType%")
bind_layers(UDS, UDS_CC, service=0x28)
class UDS_CCPR(Packet):
name = 'CommunicationControlPositiveResponse'
fields_desc = [
ByteEnumField('controlType', 0, UDS_CC.controlTypes)
]
def answers(self, other):
return other.__class__ == UDS_CC \
and other.controlType == self.controlType
@staticmethod
def modifies_ecu_state(pkt, ecu):
ecu.communication_control = pkt.controlType
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_CCPR.controlType%")
bind_layers(UDS, UDS_CCPR, service=0x68)
# #########################TP###################################
class UDS_TP(Packet):
name = 'TesterPresent'
fields_desc = [
ByteField('subFunction', 0)
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.subFunction
bind_layers(UDS, UDS_TP, service=0x3E)
class UDS_TPPR(Packet):
name = 'TesterPresentPositiveResponse'
fields_desc = [
ByteField('zeroSubFunction', 0)
]
def answers(self, other):
return other.__class__ == UDS_TP
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.zeroSubFunction
bind_layers(UDS, UDS_TPPR, service=0x7E)
# #########################ATP###################################
class UDS_ATP(Packet):
timingParameterAccessTypes = {
0: 'ISOSAEReserved',
1: 'readExtendedTimingParameterSet',
2: 'setTimingParametersToDefaultValues',
3: 'readCurrentlyActiveTimingParameters',
4: 'setTimingParametersToGivenValues'
}
name = 'AccessTimingParameter'
fields_desc = [
ByteEnumField('timingParameterAccessType', 0,
timingParameterAccessTypes),
ConditionalField(StrField('timingParameterRequestRecord', B""),
lambda pkt: pkt.timingParameterAccessType == 0x4)
]
bind_layers(UDS, UDS_ATP, service=0x83)
class UDS_ATPPR(Packet):
name = 'AccessTimingParameterPositiveResponse'
fields_desc = [
ByteEnumField('timingParameterAccessType', 0,
UDS_ATP.timingParameterAccessTypes),
ConditionalField(StrField('timingParameterResponseRecord', B""),
lambda pkt: pkt.timingParameterAccessType == 0x3)
]
def answers(self, other):
return other.__class__ == UDS_ATP \
and other.timingParameterAccessType == \
self.timingParameterAccessType
bind_layers(UDS, UDS_ATPPR, service=0xC3)
# #########################SDT###################################
class UDS_SDT(Packet):
name = 'SecuredDataTransmission'
fields_desc = [
StrField('securityDataRequestRecord', B"")
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.securityDataRequestRecord
bind_layers(UDS, UDS_SDT, service=0x84)
class UDS_SDTPR(Packet):
name = 'SecuredDataTransmissionPositiveResponse'
fields_desc = [
StrField('securityDataResponseRecord', B"")
]
def answers(self, other):
return other.__class__ == UDS_SDT
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.securityDataResponseRecord
bind_layers(UDS, UDS_SDTPR, service=0xC4)
# #########################CDTCS###################################
class UDS_CDTCS(Packet):
DTCSettingTypes = {
0: 'ISOSAEReserved',
1: 'on',
2: 'off'
}
name = 'ControlDTCSetting'
fields_desc = [
ByteEnumField('DTCSettingType', 0, DTCSettingTypes),
StrField('DTCSettingControlOptionRecord', B"")
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_CDTCS.DTCSettingType%")
bind_layers(UDS, UDS_CDTCS, service=0x85)
class UDS_CDTCSPR(Packet):
name = 'ControlDTCSettingPositiveResponse'
fields_desc = [
ByteEnumField('DTCSettingType', 0, UDS_CDTCS.DTCSettingTypes)
]
def answers(self, other):
return other.__class__ == UDS_CDTCS
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_CDTCSPR.DTCSettingType%")
bind_layers(UDS, UDS_CDTCSPR, service=0xC5)
# #########################ROE###################################
# TODO: improve this protocol implementation
class UDS_ROE(Packet):
eventTypes = {
0: 'doNotStoreEvent',
1: 'storeEvent'
}
name = 'ResponseOnEvent'
fields_desc = [
ByteEnumField('eventType', 0, eventTypes),
ByteField('eventWindowTime', 0),
StrField('eventTypeRecord', B"")
]
bind_layers(UDS, UDS_ROE, service=0x86)
class UDS_ROEPR(Packet):
name = 'ResponseOnEventPositiveResponse'
fields_desc = [
ByteEnumField('eventType', 0, UDS_ROE.eventTypes),
ByteField('numberOfIdentifiedEvents', 0),
ByteField('eventWindowTime', 0),
StrField('eventTypeRecord', B"")
]
def answers(self, other):
return other.__class__ == UDS_ROE \
and other.eventType == self.eventType
bind_layers(UDS, UDS_ROEPR, service=0xC6)
# #########################LC###################################
class UDS_LC(Packet):
linkControlTypes = {
0: 'ISOSAEReserved',
1: 'verifyBaudrateTransitionWithFixedBaudrate',
2: 'verifyBaudrateTransitionWithSpecificBaudrate',
3: 'transitionBaudrate'
}
name = 'LinkControl'
fields_desc = [
ByteEnumField('linkControlType', 0, linkControlTypes),
ConditionalField(ByteField('baudrateIdentifier', 0),
lambda pkt: pkt.linkControlType == 0x1),
ConditionalField(ByteField('baudrateHighByte', 0),
lambda pkt: pkt.linkControlType == 0x2),
ConditionalField(ByteField('baudrateMiddleByte', 0),
lambda pkt: pkt.linkControlType == 0x2),
ConditionalField(ByteField('baudrateLowByte', 0),
lambda pkt: pkt.linkControlType == 0x2)
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS.linkControlType%")
bind_layers(UDS, UDS_LC, service=0x87)
class UDS_LCPR(Packet):
name = 'LinkControlPositiveResponse'
fields_desc = [
ByteEnumField('linkControlType', 0, UDS_LC.linkControlTypes)
]
def answers(self, other):
return other.__class__ == UDS_LC \
and other.linkControlType == self.linkControlType
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS.linkControlType%")
bind_layers(UDS, UDS_LCPR, service=0xC7)
# #########################RDBI###################################
class UDS_RDBI(Packet):
dataIdentifiers = ObservableDict()
name = 'ReadDataByIdentifier'
fields_desc = [
FieldListField("identifiers", [0],
XShortEnumField('dataIdentifier', 0,
dataIdentifiers))
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_RDBI.identifiers%")
bind_layers(UDS, UDS_RDBI, service=0x22)
class UDS_RDBIPR(Packet):
name = 'ReadDataByIdentifierPositiveResponse'
fields_desc = [
XShortEnumField('dataIdentifier', 0,
UDS_RDBI.dataIdentifiers),
]
def answers(self, other):
return other.__class__ == UDS_RDBI \
and self.dataIdentifier in other.identifiers
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_RDBIPR.dataIdentifier%")
bind_layers(UDS, UDS_RDBIPR, service=0x62)
# #########################RMBA###################################
class UDS_RMBA(Packet):
name = 'ReadMemoryByAddress'
fields_desc = [
BitField('memorySizeLen', 0, 4),
BitField('memoryAddressLen', 0, 4),
ConditionalField(XByteField('memoryAddress1', 0),
lambda pkt: pkt.memoryAddressLen == 1),
ConditionalField(XShortField('memoryAddress2', 0),
lambda pkt: pkt.memoryAddressLen == 2),
ConditionalField(X3BytesField('memoryAddress3', 0),
lambda pkt: pkt.memoryAddressLen == 3),
ConditionalField(XIntField('memoryAddress4', 0),
lambda pkt: pkt.memoryAddressLen == 4),
ConditionalField(XByteField('memorySize1', 0),
lambda pkt: pkt.memorySizeLen == 1),
ConditionalField(XShortField('memorySize2', 0),
lambda pkt: pkt.memorySizeLen == 2),
ConditionalField(X3BytesField('memorySize3', 0),
lambda pkt: pkt.memorySizeLen == 3),
ConditionalField(XIntField('memorySize4', 0),
lambda pkt: pkt.memorySizeLen == 4),
]
@staticmethod
def get_log(pkt):
addr = getattr(pkt, "memoryAddress%d" % pkt.memoryAddressLen)
size = getattr(pkt, "memorySize%d" % pkt.memorySizeLen)
return pkt.sprintf("%UDS.service%"), (addr, size)
bind_layers(UDS, UDS_RMBA, service=0x23)
class UDS_RMBAPR(Packet):
name = 'ReadMemoryByAddressPositiveResponse'
fields_desc = [
StrField('dataRecord', None, fmt="B")
]
def answers(self, other):
return other.__class__ == UDS_RMBA
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.dataRecord
bind_layers(UDS, UDS_RMBAPR, service=0x63)
# #########################RSDBI###################################
class UDS_RSDBI(Packet):
name = 'ReadScalingDataByIdentifier'
dataIdentifiers = ObservableDict()
fields_desc = [
XShortEnumField('dataIdentifier', 0, dataIdentifiers)
]
bind_layers(UDS, UDS_RSDBI, service=0x24)
# TODO: Implement correct scaling here, instead of using just the dataRecord
class UDS_RSDBIPR(Packet):
name = 'ReadScalingDataByIdentifierPositiveResponse'
fields_desc = [
XShortEnumField('dataIdentifier', 0, UDS_RSDBI.dataIdentifiers),
ByteField('scalingByte', 0),
StrField('dataRecord', None, fmt="B")
]
def answers(self, other):
return other.__class__ == UDS_RSDBI \
and other.dataIdentifier == self.dataIdentifier
bind_layers(UDS, UDS_RSDBIPR, service=0x64)
# #########################RDBPI###################################
class UDS_RDBPI(Packet):
transmissionModes = {
0: 'ISOSAEReserved',
1: 'sendAtSlowRate',
2: 'sendAtMediumRate',
3: 'sendAtFastRate',
4: 'stopSending'
}
name = 'ReadDataByPeriodicIdentifier'
fields_desc = [
ByteEnumField('transmissionMode', 0, transmissionModes),
ByteField('periodicDataIdentifier', 0),
StrField('furtherPeriodicDataIdentifier', 0, fmt="B")
]
bind_layers(UDS, UDS_RDBPI, service=0x2A)
# TODO: Implement correct scaling here, instead of using just the dataRecord
class UDS_RDBPIPR(Packet):
name = 'ReadDataByPeriodicIdentifierPositiveResponse'
fields_desc = [
ByteField('periodicDataIdentifier', 0),
StrField('dataRecord', None, fmt="B")
]
def answers(self, other):
return other.__class__ == UDS_RDBPI \
and other.periodicDataIdentifier == self.periodicDataIdentifier
bind_layers(UDS, UDS_RDBPIPR, service=0x6A)
# #########################DDDI###################################
# TODO: Implement correct interpretation here,
# instead of using just the dataRecord
class UDS_DDDI(Packet):
name = 'DynamicallyDefineDataIdentifier'
subFunctions = {0x1: "defineByIdentifier",
0x2: "defineByMemoryAddress",
0x3: "clearDynamicallyDefinedDataIdentifier"}
fields_desc = [
ByteEnumField('subFunction', 0, subFunctions),
StrField('dataRecord', 0, fmt="B")
]
bind_layers(UDS, UDS_DDDI, service=0x2C)
class UDS_DDDIPR(Packet):
name = 'DynamicallyDefineDataIdentifierPositiveResponse'
fields_desc = [
ByteEnumField('subFunction', 0, UDS_DDDI.subFunctions),
XShortField('dynamicallyDefinedDataIdentifier', 0)
]
def answers(self, other):
return other.__class__ == UDS_DDDI \
and other.subFunction == self.subFunction
bind_layers(UDS, UDS_DDDIPR, service=0x6C)
# #########################WDBI###################################
class UDS_WDBI(Packet):
name = 'WriteDataByIdentifier'
fields_desc = [
XShortEnumField('dataIdentifier', 0,
UDS_RDBI.dataIdentifiers)
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_WDBI.dataIdentifier%")
bind_layers(UDS, UDS_WDBI, service=0x2E)
class UDS_WDBIPR(Packet):
name = 'WriteDataByIdentifierPositiveResponse'
fields_desc = [
XShortEnumField('dataIdentifier', 0,
UDS_RDBI.dataIdentifiers),
]
def answers(self, other):
return other.__class__ == UDS_WDBI \
and other.dataIdentifier == self.dataIdentifier
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
pkt.sprintf("%UDS_WDBIPR.dataIdentifier%")
bind_layers(UDS, UDS_WDBIPR, service=0x6E)
# #########################WMBA###################################
class UDS_WMBA(Packet):
name = 'WriteMemoryByAddress'
fields_desc = [
BitField('memorySizeLen', 0, 4),
BitField('memoryAddressLen', 0, 4),
ConditionalField(XByteField('memoryAddress1', 0),
lambda pkt: pkt.memoryAddressLen == 1),
ConditionalField(XShortField('memoryAddress2', 0),
lambda pkt: pkt.memoryAddressLen == 2),
ConditionalField(X3BytesField('memoryAddress3', 0),
lambda pkt: pkt.memoryAddressLen == 3),
ConditionalField(XIntField('memoryAddress4', 0),
lambda pkt: pkt.memoryAddressLen == 4),
ConditionalField(XByteField('memorySize1', 0),
lambda pkt: pkt.memorySizeLen == 1),
ConditionalField(XShortField('memorySize2', 0),
lambda pkt: pkt.memorySizeLen == 2),
ConditionalField(X3BytesField('memorySize3', 0),
lambda pkt: pkt.memorySizeLen == 3),
ConditionalField(XIntField('memorySize4', 0),
lambda pkt: pkt.memorySizeLen == 4),
StrField('dataRecord', b'\x00', fmt="B"),
]
@staticmethod
def get_log(pkt):
addr = getattr(pkt, "memoryAddress%d" % pkt.memoryAddressLen)
size = getattr(pkt, "memorySize%d" % pkt.memorySizeLen)
return pkt.sprintf("%UDS.service%"), (addr, size, pkt.dataRecord)
bind_layers(UDS, UDS_WMBA, service=0x3D)
class UDS_WMBAPR(Packet):
name = 'WriteMemoryByAddressPositiveResponse'
fields_desc = [
BitField('memorySizeLen', 0, 4),
BitField('memoryAddressLen', 0, 4),
ConditionalField(XByteField('memoryAddress1', 0),
lambda pkt: pkt.memoryAddressLen == 1),
ConditionalField(XShortField('memoryAddress2', 0),
lambda pkt: pkt.memoryAddressLen == 2),
ConditionalField(X3BytesField('memoryAddress3', 0),
lambda pkt: pkt.memoryAddressLen == 3),
ConditionalField(XIntField('memoryAddress4', 0),
lambda pkt: pkt.memoryAddressLen == 4),
ConditionalField(XByteField('memorySize1', 0),
lambda pkt: pkt.memorySizeLen == 1),
ConditionalField(XShortField('memorySize2', 0),
lambda pkt: pkt.memorySizeLen == 2),
ConditionalField(X3BytesField('memorySize3', 0),
lambda pkt: pkt.memorySizeLen == 3),
ConditionalField(XIntField('memorySize4', 0),
lambda pkt: pkt.memorySizeLen == 4)
]
def answers(self, other):
return other.__class__ == UDS_WMBA \
and other.memorySizeLen == self.memorySizeLen \
and other.memoryAddressLen == self.memoryAddressLen
@staticmethod
def get_log(pkt):
addr = getattr(pkt, "memoryAddress%d" % pkt.memoryAddressLen)
size = getattr(pkt, "memorySize%d" % pkt.memorySizeLen)
return pkt.sprintf("%UDS.service%"), (addr, size)
bind_layers(UDS, UDS_WMBAPR, service=0x7D)
# #########################CDTCI###################################
class UDS_CDTCI(Packet):
name = 'ClearDiagnosticInformation'
fields_desc = [
ByteField('groupOfDTCHighByte', 0),
ByteField('groupOfDTCMiddleByte', 0),
ByteField('groupOfDTCLowByte', 0),
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), (pkt.groupOfDTCHighByte,
pkt.groupOfDTCMiddleByte,
pkt.groupOfDTCLowByte)
bind_layers(UDS, UDS_CDTCI, service=0x14)
class UDS_CDTCIPR(Packet):
name = 'ClearDiagnosticInformationPositiveResponse'
def answers(self, other):
return other.__class__ == UDS_CDTCI
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), None
bind_layers(UDS, UDS_CDTCIPR, service=0x54)
# #########################RDTCI###################################
class UDS_RDTCI(Packet):
reportTypes = {
0: 'ISOSAEReserved',
1: 'reportNumberOfDTCByStatusMask',
2: 'reportDTCByStatusMask',
3: 'reportDTCSnapshotIdentification',
4: 'reportDTCSnapshotRecordByDTCNumber',
5: 'reportDTCSnapshotRecordByRecordNumber',
6: 'reportDTCExtendedDataRecordByDTCNumber',
7: 'reportNumberOfDTCBySeverityMaskRecord',
8: 'reportDTCBySeverityMaskRecord',
9: 'reportSeverityInformationOfDTC',
10: 'reportSupportedDTC',
11: 'reportFirstTestFailedDTC',
12: 'reportFirstConfirmedDTC',
13: 'reportMostRecentTestFailedDTC',
14: 'reportMostRecentConfirmedDTC',
15: 'reportMirrorMemoryDTCByStatusMask',
16: 'reportMirrorMemoryDTCExtendedDataRecordByDTCNumber',
17: 'reportNumberOfMirrorMemoryDTCByStatusMask',
18: 'reportNumberOfEmissionsRelatedOBDDTCByStatusMask',
19: 'reportEmissionsRelatedOBDDTCByStatusMask',
20: 'reportDTCFaultDetectionCounter',
21: 'reportDTCWithPermanentStatus'
}
name = 'ReadDTCInformation'
fields_desc = [
ByteEnumField('reportType', 0, reportTypes),
ConditionalField(XByteField('DTCStatusMask', 0),
lambda pkt: pkt.reportType in [0x01, 0x02, 0x0f,
0x11, 0x12, 0x13]),
ConditionalField(ByteField('DTCHighByte', 0),
lambda pkt: pkt.reportType in [0x3, 0x4, 0x6,
0x10, 0x09]),
ConditionalField(ByteField('DTCMiddleByte', 0),
lambda pkt: pkt.reportType in [0x3, 0x4, 0x6,
0x10, 0x09]),
ConditionalField(ByteField('DTCLowByte', 0),
lambda pkt: pkt.reportType in [0x3, 0x4, 0x6,
0x10, 0x09]),
ConditionalField(ByteField('DTCSnapshotRecordNumber', 0),
lambda pkt: pkt.reportType in [0x3, 0x4, 0x5]),
ConditionalField(ByteField('DTCExtendedDataRecordNumber', 0),
lambda pkt: pkt.reportType in [0x6, 0x10]),
ConditionalField(ByteField('DTCSeverityMask', 0),
lambda pkt: pkt.reportType in [0x07, 0x08]),
ConditionalField(ByteField('DTCStatusMask', 0),
lambda pkt: pkt.reportType in [0x07, 0x08]),
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), repr(pkt)
bind_layers(UDS, UDS_RDTCI, service=0x19)
class UDS_RDTCIPR(Packet):
name = 'ReadDTCInformationPositiveResponse'
fields_desc = [
ByteEnumField('reportType', 0, UDS_RDTCI.reportTypes),
ConditionalField(XByteField('DTCStatusAvailabilityMask', 0),
lambda pkt: pkt.reportType in [0x01, 0x07, 0x11,
0x12, 0x02, 0x0A,
0x0B, 0x0C, 0x0D,
0x0E, 0x0F, 0x13,
0x15]),
ConditionalField(ByteEnumField('DTCFormatIdentifier', 0,
{0: 'ISO15031-6DTCFormat',
1: 'UDS-1DTCFormat',
2: 'SAEJ1939-73DTCFormat',
3: 'ISO11992-4DTCFormat'}),
lambda pkt: pkt.reportType in [0x01, 0x07,
0x11, 0x12]),
ConditionalField(ShortField('DTCCount', 0),
lambda pkt: pkt.reportType in [0x01, 0x07,
0x11, 0x12]),
ConditionalField(StrField('DTCAndStatusRecord', 0),
lambda pkt: pkt.reportType in [0x02, 0x0A, 0x0B,
0x0C, 0x0D, 0x0E,
0x0F, 0x13, 0x15]),
ConditionalField(StrField('dataRecord', 0),
lambda pkt: pkt.reportType in [0x03, 0x04, 0x05,
0x06, 0x08, 0x09,
0x10, 0x14])
]
def answers(self, other):
return other.__class__ == UDS_RDTCI \
and other.reportType == self.reportType
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), repr(pkt)
bind_layers(UDS, UDS_RDTCIPR, service=0x59)
# #########################RC###################################
class UDS_RC(Packet):
routineControlTypes = {
0: 'ISOSAEReserved',
1: 'startRoutine',
2: 'stopRoutine',
3: 'requestRoutineResults'
}
routineControlIdentifiers = ObservableDict()
name = 'RoutineControl'
fields_desc = [
ByteEnumField('routineControlType', 0, routineControlTypes),
XShortEnumField('routineIdentifier', 0, routineControlIdentifiers),
StrField('routineControlOptionRecord', 0, fmt="B"),
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"),\
(pkt.routineControlType,
pkt.routineIdentifier,
pkt.routineControlOptionRecord)
bind_layers(UDS, UDS_RC, service=0x31)
class UDS_RCPR(Packet):
name = 'RoutineControlPositiveResponse'
fields_desc = [
ByteEnumField('routineControlType', 0,
UDS_RC.routineControlTypes),
XShortEnumField('routineIdentifier', 0,
UDS_RC.routineControlIdentifiers),
StrField('routineStatusRecord', 0, fmt="B"),
]
def answers(self, other):
return other.__class__ == UDS_RC \
and other.routineControlType == self.routineControlType
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"),\
(pkt.routineControlType,
pkt.routineIdentifier,
pkt.routineStatusRecord)
bind_layers(UDS, UDS_RCPR, service=0x71)
# #########################RD###################################
class UDS_RD(Packet):
dataFormatIdentifiers = ObservableDict({
0: 'noCompressionNoEncryption'
})
name = 'RequestDownload'
fields_desc = [
ByteEnumField('dataFormatIdentifier', 0, dataFormatIdentifiers),
BitField('memorySizeLen', 0, 4),
BitField('memoryAddressLen', 0, 4),
ConditionalField(XByteField('memoryAddress1', 0),
lambda pkt: pkt.memoryAddressLen == 1),
ConditionalField(XShortField('memoryAddress2', 0),
lambda pkt: pkt.memoryAddressLen == 2),
ConditionalField(X3BytesField('memoryAddress3', 0),
lambda pkt: pkt.memoryAddressLen == 3),
ConditionalField(XIntField('memoryAddress4', 0),
lambda pkt: pkt.memoryAddressLen == 4),
ConditionalField(XByteField('memorySize1', 0),
lambda pkt: pkt.memorySizeLen == 1),
ConditionalField(XShortField('memorySize2', 0),
lambda pkt: pkt.memorySizeLen == 2),
ConditionalField(X3BytesField('memorySize3', 0),
lambda pkt: pkt.memorySizeLen == 3),
ConditionalField(XIntField('memorySize4', 0),
lambda pkt: pkt.memorySizeLen == 4)
]
@staticmethod
def get_log(pkt):
addr = getattr(pkt, "memoryAddress%d" % pkt.memoryAddressLen)
size = getattr(pkt, "memorySize%d" % pkt.memorySizeLen)
return pkt.sprintf("%UDS.service%"), (addr, size)
bind_layers(UDS, UDS_RD, service=0x34)
class UDS_RDPR(Packet):
name = 'RequestDownloadPositiveResponse'
fields_desc = [
BitField('memorySizeLen', 0, 4),
BitField('reserved', 0, 4),
StrField('maxNumberOfBlockLength', 0, fmt="B"),
]
def answers(self, other):
return other.__class__ == UDS_RD
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.memorySizeLen
bind_layers(UDS, UDS_RDPR, service=0x74)
# #########################RU###################################
class UDS_RU(Packet):
name = 'RequestUpload'
fields_desc = [
ByteEnumField('dataFormatIdentifier', 0,
UDS_RD.dataFormatIdentifiers),
BitField('memorySizeLen', 0, 4),
BitField('memoryAddressLen', 0, 4),
ConditionalField(XByteField('memoryAddress1', 0),
lambda pkt: pkt.memoryAddressLen == 1),
ConditionalField(XShortField('memoryAddress2', 0),
lambda pkt: pkt.memoryAddressLen == 2),
ConditionalField(X3BytesField('memoryAddress3', 0),
lambda pkt: pkt.memoryAddressLen == 3),
ConditionalField(XIntField('memoryAddress4', 0),
lambda pkt: pkt.memoryAddressLen == 4),
ConditionalField(XByteField('memorySize1', 0),
lambda pkt: pkt.memorySizeLen == 1),
ConditionalField(XShortField('memorySize2', 0),
lambda pkt: pkt.memorySizeLen == 2),
ConditionalField(X3BytesField('memorySize3', 0),
lambda pkt: pkt.memorySizeLen == 3),
ConditionalField(XIntField('memorySize4', 0),
lambda pkt: pkt.memorySizeLen == 4)
]
@staticmethod
def get_log(pkt):
addr = getattr(pkt, "memoryAddress%d" % pkt.memoryAddressLen)
size = getattr(pkt, "memorySize%d" % pkt.memorySizeLen)
return pkt.sprintf("%UDS.service%"), (addr, size)
bind_layers(UDS, UDS_RU, service=0x35)
class UDS_RUPR(Packet):
name = 'RequestUploadPositiveResponse'
fields_desc = [
BitField('memorySizeLen', 0, 4),
BitField('reserved', 0, 4),
StrField('maxNumberOfBlockLength', 0, fmt="B"),
]
def answers(self, other):
return other.__class__ == UDS_RU
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.memorySizeLen
bind_layers(UDS, UDS_RUPR, service=0x75)
# #########################TD###################################
class UDS_TD(Packet):
name = 'TransferData'
fields_desc = [
ByteField('blockSequenceCounter', 0),
StrField('transferRequestParameterRecord', 0, fmt="B")
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"),\
(pkt.blockSequenceCounter, pkt.transferRequestParameterRecord)
bind_layers(UDS, UDS_TD, service=0x36)
class UDS_TDPR(Packet):
name = 'TransferDataPositiveResponse'
fields_desc = [
ByteField('blockSequenceCounter', 0),
StrField('transferResponseParameterRecord', 0, fmt="B")
]
def answers(self, other):
return other.__class__ == UDS_TD \
and other.blockSequenceCounter == self.blockSequenceCounter
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.blockSequenceCounter
bind_layers(UDS, UDS_TDPR, service=0x76)
# #########################RTE###################################
class UDS_RTE(Packet):
name = 'RequestTransferExit'
fields_desc = [
StrField('transferRequestParameterRecord', 0, fmt="B")
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"),\
pkt.transferRequestParameterRecord
bind_layers(UDS, UDS_RTE, service=0x37)
class UDS_RTEPR(Packet):
name = 'RequestTransferExitPositiveResponse'
fields_desc = [
StrField('transferResponseParameterRecord', 0, fmt="B")
]
def answers(self, other):
return other.__class__ == UDS_RTE
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"),\
pkt.transferResponseParameterRecord
bind_layers(UDS, UDS_RTEPR, service=0x77)
# #########################IOCBI###################################
class UDS_IOCBI(Packet):
name = 'InputOutputControlByIdentifier'
dataIdentifiers = ObservableDict()
fields_desc = [
XShortEnumField('dataIdentifier', 0, dataIdentifiers),
ByteField('controlOptionRecord', 0),
StrField('controlEnableMaskRecord', 0, fmt="B")
]
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.dataIdentifier
bind_layers(UDS, UDS_IOCBI, service=0x2F)
class UDS_IOCBIPR(Packet):
name = 'InputOutputControlByIdentifierPositiveResponse'
fields_desc = [
XShortField('dataIdentifier', 0),
StrField('controlStatusRecord', 0, fmt="B")
]
def answers(self, other):
return other.__class__ == UDS_IOCBI \
and other.dataIdentifier == self.dataIdentifier
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), pkt.dataIdentifier
bind_layers(UDS, UDS_IOCBIPR, service=0x6F)
# #########################NR###################################
class UDS_NR(Packet):
negativeResponseCodes = {
0x00: 'positiveResponse',
0x10: 'generalReject',
0x11: 'serviceNotSupported',
0x12: 'subFunctionNotSupported',
0x13: 'incorrectMessageLengthOrInvalidFormat',
0x14: 'responseTooLong',
0x20: 'ISOSAEReserved',
0x21: 'busyRepeatRequest',
0x22: 'conditionsNotCorrect',
0x23: 'ISOSAEReserved',
0x24: 'requestSequenceError',
0x25: 'noResponseFromSubnetComponent',
0x26: 'failurePreventsExecutionOfRequestedAction',
0x31: 'requestOutOfRange',
0x33: 'securityAccessDenied',
0x35: 'invalidKey',
0x36: 'exceedNumberOfAttempts',
0x37: 'requiredTimeDelayNotExpired',
0x70: 'uploadDownloadNotAccepted',
0x71: 'transferDataSuspended',
0x72: 'generalProgrammingFailure',
0x73: 'wrongBlockSequenceCounter',
0x78: 'requestCorrectlyReceived-ResponsePending',
0x7E: 'subFunctionNotSupportedInActiveSession',
0x7F: 'serviceNotSupportedInActiveSession',
0x80: 'ISOSAEReserved',
0x81: 'rpmTooHigh',
0x82: 'rpmTooLow',
0x83: 'engineIsRunning',
0x84: 'engineIsNotRunning',
0x85: 'engineRunTimeTooLow',
0x86: 'temperatureTooHigh',
0x87: 'temperatureTooLow',
0x88: 'vehicleSpeedTooHigh',
0x89: 'vehicleSpeedTooLow',
0x8a: 'throttle/PedalTooHigh',
0x8b: 'throttle/PedalTooLow',
0x8c: 'transmissionRangeNotInNeutral',
0x8d: 'transmissionRangeNotInGear',
0x8e: 'ISOSAEReserved',
0x8f: 'brakeSwitch(es)NotClosed',
0x90: 'shifterLeverNotInPark',
0x91: 'torqueConverterClutchLocked',
0x92: 'voltageTooHigh',
0x93: 'voltageTooLow',
}
name = 'NegativeResponse'
fields_desc = [
XByteEnumField('requestServiceId', 0, UDS.services),
ByteEnumField('negativeResponseCode', 0, negativeResponseCodes)
]
def answers(self, other):
return self.requestServiceId == other.service and \
(self.negativeResponseCode != 0x78 or
conf.contribs['UDS']['treat-response-pending-as-answer'])
@staticmethod
def get_log(pkt):
return pkt.sprintf("%UDS.service%"), \
(pkt.sprintf("%UDS_NR.requestServiceId%"),
pkt.sprintf("%UDS_NR.negativeResponseCode%"))
bind_layers(UDS, UDS_NR, service=0x7f)
# ##################################################################
# ######################## UTILS ###################################
# ##################################################################
class UDS_TesterPresentSender(PeriodicSenderThread):
def __init__(self, sock, pkt=UDS() / UDS_TP(), interval=2):
""" Thread to send TesterPresent messages packets periodically
Args:
sock: socket where packet is sent periodically
pkt: packet to send
interval: interval between two packets
"""
PeriodicSenderThread.__init__(self, sock, pkt, interval)
def UDS_SessionEnumerator(sock, session_range=range(0x100), reset_wait=1.5):
""" Enumerates session ID's in given range
and returns list of UDS()/UDS_DSC() packets
with valid session types
Args:
sock: socket where packets are sent
session_range: range for session ID's
reset_wait: wait time in sec after every packet
"""
pkts = (req for tup in
product(UDS() / UDS_DSC(diagnosticSessionType=session_range),
UDS() / UDS_ER(resetType='hardReset')) for req in tup)
results, _ = sock.sr(pkts, timeout=len(session_range) * reset_wait * 2 + 1,
verbose=False, inter=reset_wait)
return [req for req, res in results if req is not None and
req.service != 0x11 and
(res.service == 0x50 or
res.negativeResponseCode not in [0x10, 0x11, 0x12])]
def UDS_ServiceEnumerator(sock, session="DefaultSession",
filter_responses=True):
""" Enumerates every service ID
and returns list of tuples. Each tuple contains
the session and the respective positive response
Args:
sock: socket where packet is sent periodically
session: session in which the services are enumerated
"""
pkts = (UDS(service=x) for x in set(x & ~0x40 for x in range(0x100)))
found_services = sock.sr(pkts, timeout=5, verbose=False)
return [(session, p) for _, p in found_services[0] if
p.service != 0x7f or
(p.negativeResponseCode not in [0x10, 0x11] or not
filter_responses)]
def getTableEntry(tup):
""" Helping function for make_lined_table.
Returns the session and response code of tup.
Args:
tup: tuple with session and UDS response package
Example:
make_lined_table([('DefaultSession', UDS()/UDS_SAPR(),
'ExtendedDiagnosticSession', UDS()/UDS_IOCBI())],
getTableEntry)
"""
session, pkt = tup
if pkt.service == 0x7f:
return (session,
"0x%02x: %s" % (pkt.requestServiceId,
pkt.sprintf("%UDS_NR.requestServiceId%")),
pkt.sprintf("%UDS_NR.negativeResponseCode%"))
else:
return (session,
"0x%02x: %s" % (pkt.service & ~0x40,
pkt.get_field('service').
i2s[pkt.service & ~0x40]),
"PositiveResponse")