86890704fd
todo: add documentation & wireshark dissector
565 lines
17 KiB
Python
Executable file
565 lines
17 KiB
Python
Executable file
# This file is part of Scapy
|
|
# See http://www.secdev.org/projects/scapy for more information
|
|
# Copyright (C) Philippe Biondi <phil@secdev.org>
|
|
# Modified by Maxence Tury <maxence.tury@ssi.gouv.fr>
|
|
# Acknowledgment: Ralph Broenink
|
|
# This program is published under a GPLv2 license
|
|
|
|
"""
|
|
Basic Encoding Rules (BER) for ASN.1
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
from scapy.error import warning
|
|
from scapy.compat import chb, orb, bytes_encode
|
|
from scapy.utils import binrepr, inet_aton, inet_ntoa
|
|
from scapy.asn1.asn1 import ASN1_Decoding_Error, ASN1_Encoding_Error, \
|
|
ASN1_BadTag_Decoding_Error, ASN1_Codecs, ASN1_Class_UNIVERSAL, \
|
|
ASN1_Error, ASN1_DECODING_ERROR, ASN1_BADTAG
|
|
from scapy.modules import six
|
|
|
|
##################
|
|
# BER encoding #
|
|
##################
|
|
|
|
|
|
# [ BER tools ] #
|
|
|
|
|
|
class BER_Exception(Exception):
|
|
pass
|
|
|
|
|
|
class BER_Encoding_Error(ASN1_Encoding_Error):
|
|
def __init__(self, msg, encoded=None, remaining=None):
|
|
Exception.__init__(self, msg)
|
|
self.remaining = remaining
|
|
self.encoded = encoded
|
|
|
|
def __str__(self):
|
|
s = Exception.__str__(self)
|
|
if isinstance(self.encoded, BERcodec_Object):
|
|
s += "\n### Already encoded ###\n%s" % self.encoded.strshow()
|
|
else:
|
|
s += "\n### Already encoded ###\n%r" % self.encoded
|
|
s += "\n### Remaining ###\n%r" % self.remaining
|
|
return s
|
|
|
|
|
|
class BER_Decoding_Error(ASN1_Decoding_Error):
|
|
def __init__(self, msg, decoded=None, remaining=None):
|
|
Exception.__init__(self, msg)
|
|
self.remaining = remaining
|
|
self.decoded = decoded
|
|
|
|
def __str__(self):
|
|
s = Exception.__str__(self)
|
|
if isinstance(self.decoded, BERcodec_Object):
|
|
s += "\n### Already decoded ###\n%s" % self.decoded.strshow()
|
|
else:
|
|
s += "\n### Already decoded ###\n%r" % self.decoded
|
|
s += "\n### Remaining ###\n%r" % self.remaining
|
|
return s
|
|
|
|
|
|
class BER_BadTag_Decoding_Error(BER_Decoding_Error,
|
|
ASN1_BadTag_Decoding_Error):
|
|
pass
|
|
|
|
|
|
def BER_len_enc(ll, size=0):
|
|
if ll <= 127 and size == 0:
|
|
return chb(ll)
|
|
s = b""
|
|
while ll or size > 0:
|
|
s = chb(ll & 0xff) + s
|
|
ll >>= 8
|
|
size -= 1
|
|
if len(s) > 127:
|
|
raise BER_Exception(
|
|
"BER_len_enc: Length too long (%i) to be encoded [%r]" %
|
|
(len(s), s)
|
|
)
|
|
return chb(len(s) | 0x80) + s
|
|
|
|
|
|
def BER_len_dec(s):
|
|
tmp_len = orb(s[0])
|
|
if not tmp_len & 0x80:
|
|
return tmp_len, s[1:]
|
|
tmp_len &= 0x7f
|
|
if len(s) <= tmp_len:
|
|
raise BER_Decoding_Error(
|
|
"BER_len_dec: Got %i bytes while expecting %i" %
|
|
(len(s) - 1, tmp_len),
|
|
remaining=s
|
|
)
|
|
ll = 0
|
|
for c in s[1:tmp_len + 1]:
|
|
ll <<= 8
|
|
ll |= orb(c)
|
|
return ll, s[tmp_len + 1:]
|
|
|
|
|
|
def BER_num_enc(ll, size=1):
|
|
x = []
|
|
while ll or size > 0:
|
|
x.insert(0, ll & 0x7f)
|
|
if len(x) > 1:
|
|
x[0] |= 0x80
|
|
ll >>= 7
|
|
size -= 1
|
|
return b"".join(chb(k) for k in x)
|
|
|
|
|
|
def BER_num_dec(s, cls_id=0):
|
|
if len(s) == 0:
|
|
raise BER_Decoding_Error("BER_num_dec: got empty string", remaining=s)
|
|
x = cls_id
|
|
for i, c in enumerate(s):
|
|
c = orb(c)
|
|
x <<= 7
|
|
x |= c & 0x7f
|
|
if not c & 0x80:
|
|
break
|
|
if c & 0x80:
|
|
raise BER_Decoding_Error("BER_num_dec: unfinished number description",
|
|
remaining=s)
|
|
return x, s[i + 1:]
|
|
|
|
|
|
def BER_id_dec(s):
|
|
# This returns the tag ALONG WITH THE PADDED CLASS+CONSTRUCTIVE INFO.
|
|
# Let's recall that bits 8-7 from the first byte of the tag encode
|
|
# the class information, while bit 6 means primitive or constructive.
|
|
#
|
|
# For instance, with low-tag-number b'\x81', class would be 0b10
|
|
# ('context-specific') and tag 0x01, but we return 0x81 as a whole.
|
|
# For b'\xff\x22', class would be 0b11 ('private'), constructed, then
|
|
# padding, then tag 0x22, but we return (0xff>>5)*128^1 + 0x22*128^0.
|
|
# Why the 5-bit-shifting? Because it provides an unequivocal encoding
|
|
# on base 128 (note that 0xff would equal 1*128^1 + 127*128^0...),
|
|
# as we know that bits 5 to 1 are fixed to 1 anyway.
|
|
#
|
|
# As long as there is no class differentiation, we have to keep this info
|
|
# encoded in scapy's tag in order to reuse it for packet building.
|
|
# Note that tags thus may have to be hard-coded with their extended
|
|
# information, e.g. a SEQUENCE from asn1.py has a direct tag 0x20|16.
|
|
x = orb(s[0])
|
|
if x & 0x1f != 0x1f:
|
|
# low-tag-number
|
|
return x, s[1:]
|
|
else:
|
|
# high-tag-number
|
|
return BER_num_dec(s[1:], cls_id=x >> 5)
|
|
|
|
|
|
def BER_id_enc(n):
|
|
if n < 256:
|
|
# low-tag-number
|
|
return chb(n)
|
|
else:
|
|
# high-tag-number
|
|
s = BER_num_enc(n)
|
|
tag = orb(s[0]) # first byte, as an int
|
|
tag &= 0x07 # reset every bit from 8 to 4
|
|
tag <<= 5 # move back the info bits on top
|
|
tag |= 0x1f # pad with 1s every bit from 5 to 1
|
|
return chb(tag) + s[1:]
|
|
|
|
# The functions below provide implicit and explicit tagging support.
|
|
|
|
|
|
def BER_tagging_dec(s, hidden_tag=None, implicit_tag=None,
|
|
explicit_tag=None, safe=False):
|
|
# We output the 'real_tag' if it is different from the (im|ex)plicit_tag.
|
|
real_tag = None
|
|
if len(s) > 0:
|
|
err_msg = "BER_tagging_dec: observed tag does not match expected tag"
|
|
if implicit_tag is not None:
|
|
ber_id, s = BER_id_dec(s)
|
|
if ber_id != implicit_tag:
|
|
if not safe:
|
|
raise BER_Decoding_Error(err_msg, remaining=s)
|
|
else:
|
|
real_tag = ber_id
|
|
s = chb(hash(hidden_tag)) + s
|
|
elif explicit_tag is not None:
|
|
ber_id, s = BER_id_dec(s)
|
|
if ber_id != explicit_tag:
|
|
if not safe:
|
|
raise BER_Decoding_Error(err_msg, remaining=s)
|
|
else:
|
|
real_tag = ber_id
|
|
l, s = BER_len_dec(s)
|
|
return real_tag, s
|
|
|
|
|
|
def BER_tagging_enc(s, implicit_tag=None, explicit_tag=None):
|
|
if len(s) > 0:
|
|
if implicit_tag is not None:
|
|
s = BER_id_enc(implicit_tag) + s[1:]
|
|
elif explicit_tag is not None:
|
|
s = BER_id_enc(explicit_tag) + BER_len_enc(len(s)) + s
|
|
return s
|
|
|
|
# [ BER classes ] #
|
|
|
|
|
|
class BERcodec_metaclass(type):
|
|
def __new__(cls, name, bases, dct):
|
|
c = super(BERcodec_metaclass, cls).__new__(cls, name, bases, dct)
|
|
try:
|
|
c.tag.register(c.codec, c)
|
|
except Exception:
|
|
warning("Error registering %r for %r" % (c.tag, c.codec))
|
|
return c
|
|
|
|
|
|
class BERcodec_Object(six.with_metaclass(BERcodec_metaclass)):
|
|
codec = ASN1_Codecs.BER
|
|
tag = ASN1_Class_UNIVERSAL.ANY
|
|
|
|
@classmethod
|
|
def asn1_object(cls, val):
|
|
return cls.tag.asn1_object(val)
|
|
|
|
@classmethod
|
|
def check_string(cls, s):
|
|
if not s:
|
|
raise BER_Decoding_Error(
|
|
"%s: Got empty object while expecting tag %r" %
|
|
(cls.__name__, cls.tag), remaining=s
|
|
)
|
|
|
|
@classmethod
|
|
def check_type(cls, s):
|
|
cls.check_string(s)
|
|
tag, remainder = BER_id_dec(s)
|
|
if not isinstance(tag, int) or cls.tag != tag:
|
|
raise BER_BadTag_Decoding_Error(
|
|
"%s: Got tag [%i/%#x] while expecting %r" %
|
|
(cls.__name__, tag, tag, cls.tag), remaining=s
|
|
)
|
|
return remainder
|
|
|
|
@classmethod
|
|
def check_type_get_len(cls, s):
|
|
s2 = cls.check_type(s)
|
|
if not s2:
|
|
raise BER_Decoding_Error("%s: No bytes while expecting a length" %
|
|
cls.__name__, remaining=s)
|
|
return BER_len_dec(s2)
|
|
|
|
@classmethod
|
|
def check_type_check_len(cls, s):
|
|
l, s3 = cls.check_type_get_len(s)
|
|
if len(s3) < l:
|
|
raise BER_Decoding_Error("%s: Got %i bytes while expecting %i" %
|
|
(cls.__name__, len(s3), l), remaining=s)
|
|
return l, s3[:l], s3[l:]
|
|
|
|
@classmethod
|
|
def do_dec(cls, s, context=None, safe=False):
|
|
if context is None:
|
|
context = cls.tag.context
|
|
cls.check_string(s)
|
|
p, remainder = BER_id_dec(s)
|
|
if p not in context:
|
|
t = s
|
|
if len(t) > 18:
|
|
t = t[:15] + b"..."
|
|
raise BER_Decoding_Error("Unknown prefix [%02x] for [%r]" %
|
|
(p, t), remaining=s)
|
|
codec = context[p].get_codec(ASN1_Codecs.BER)
|
|
if codec == BERcodec_Object:
|
|
# Value type defined as Unknown
|
|
l, s = BER_num_dec(remainder)
|
|
return ASN1_BADTAG(s[:l]), s[l:]
|
|
return codec.dec(s, context, safe)
|
|
|
|
@classmethod
|
|
def dec(cls, s, context=None, safe=False):
|
|
if not safe:
|
|
return cls.do_dec(s, context, safe)
|
|
try:
|
|
return cls.do_dec(s, context, safe)
|
|
except BER_BadTag_Decoding_Error as e:
|
|
o, remain = BERcodec_Object.dec(e.remaining, context, safe)
|
|
return ASN1_BADTAG(o), remain
|
|
except BER_Decoding_Error as e:
|
|
return ASN1_DECODING_ERROR(s, exc=e), ""
|
|
except ASN1_Error as e:
|
|
return ASN1_DECODING_ERROR(s, exc=e), ""
|
|
|
|
@classmethod
|
|
def safedec(cls, s, context=None):
|
|
return cls.dec(s, context, safe=True)
|
|
|
|
@classmethod
|
|
def enc(cls, s):
|
|
if isinstance(s, six.string_types + (bytes,)):
|
|
return BERcodec_STRING.enc(s)
|
|
else:
|
|
return BERcodec_INTEGER.enc(int(s))
|
|
|
|
|
|
ASN1_Codecs.BER.register_stem(BERcodec_Object)
|
|
|
|
|
|
##########################
|
|
# BERcodec objects #
|
|
##########################
|
|
|
|
class BERcodec_INTEGER(BERcodec_Object):
|
|
tag = ASN1_Class_UNIVERSAL.INTEGER
|
|
|
|
@classmethod
|
|
def enc(cls, i):
|
|
s = []
|
|
while True:
|
|
s.append(i & 0xff)
|
|
if -127 <= i < 0:
|
|
break
|
|
if 128 <= i <= 255:
|
|
s.append(0)
|
|
i >>= 8
|
|
if not i:
|
|
break
|
|
s = [chb(hash(c)) for c in s]
|
|
s.append(BER_len_enc(len(s)))
|
|
s.append(chb(hash(cls.tag)))
|
|
s.reverse()
|
|
return b"".join(s)
|
|
|
|
@classmethod
|
|
def do_dec(cls, s, context=None, safe=False):
|
|
l, s, t = cls.check_type_check_len(s)
|
|
x = 0
|
|
if s:
|
|
if orb(s[0]) & 0x80: # negative int
|
|
x = -1
|
|
for c in s:
|
|
x <<= 8
|
|
x |= orb(c)
|
|
return cls.asn1_object(x), t
|
|
|
|
|
|
class BERcodec_BOOLEAN(BERcodec_INTEGER):
|
|
tag = ASN1_Class_UNIVERSAL.BOOLEAN
|
|
|
|
|
|
class BERcodec_BIT_STRING(BERcodec_Object):
|
|
tag = ASN1_Class_UNIVERSAL.BIT_STRING
|
|
|
|
@classmethod
|
|
def do_dec(cls, s, context=None, safe=False):
|
|
# /!\ the unused_bits information is lost after this decoding
|
|
l, s, t = cls.check_type_check_len(s)
|
|
if len(s) > 0:
|
|
unused_bits = orb(s[0])
|
|
if safe and unused_bits > 7:
|
|
raise BER_Decoding_Error(
|
|
"BERcodec_BIT_STRING: too many unused_bits advertised",
|
|
remaining=s
|
|
)
|
|
s = "".join(binrepr(orb(x)).zfill(8) for x in s[1:])
|
|
if unused_bits > 0:
|
|
s = s[:-unused_bits]
|
|
return cls.tag.asn1_object(s), t
|
|
else:
|
|
raise BER_Decoding_Error(
|
|
"BERcodec_BIT_STRING found no content "
|
|
"(not even unused_bits byte)",
|
|
remaining=s
|
|
)
|
|
|
|
@classmethod
|
|
def enc(cls, s):
|
|
# /!\ this is DER encoding (bit strings are only zero-bit padded)
|
|
s = bytes_encode(s)
|
|
if len(s) % 8 == 0:
|
|
unused_bits = 0
|
|
else:
|
|
unused_bits = 8 - len(s) % 8
|
|
s += b"0" * unused_bits
|
|
s = b"".join(chb(int(b"".join(chb(y) for y in x), 2))
|
|
for x in zip(*[iter(s)] * 8))
|
|
s = chb(unused_bits) + s
|
|
return chb(hash(cls.tag)) + BER_len_enc(len(s)) + s
|
|
|
|
|
|
class BERcodec_STRING(BERcodec_Object):
|
|
tag = ASN1_Class_UNIVERSAL.STRING
|
|
|
|
@classmethod
|
|
def enc(cls, s):
|
|
s = bytes_encode(s)
|
|
# Be sure we are encoding bytes
|
|
return chb(hash(cls.tag)) + BER_len_enc(len(s)) + s
|
|
|
|
@classmethod
|
|
def do_dec(cls, s, context=None, safe=False):
|
|
l, s, t = cls.check_type_check_len(s)
|
|
return cls.tag.asn1_object(s), t
|
|
|
|
|
|
class BERcodec_NULL(BERcodec_INTEGER):
|
|
tag = ASN1_Class_UNIVERSAL.NULL
|
|
|
|
@classmethod
|
|
def enc(cls, i):
|
|
if i == 0:
|
|
return chb(hash(cls.tag)) + b"\0"
|
|
else:
|
|
return super(cls, cls).enc(i)
|
|
|
|
|
|
class BERcodec_OID(BERcodec_Object):
|
|
tag = ASN1_Class_UNIVERSAL.OID
|
|
|
|
@classmethod
|
|
def enc(cls, oid):
|
|
oid = bytes_encode(oid)
|
|
if oid:
|
|
lst = [int(x) for x in oid.strip(b".").split(b".")]
|
|
else:
|
|
lst = list()
|
|
if len(lst) >= 2:
|
|
lst[1] += 40 * lst[0]
|
|
del(lst[0])
|
|
s = b"".join(BER_num_enc(k) for k in lst)
|
|
return chb(hash(cls.tag)) + BER_len_enc(len(s)) + s
|
|
|
|
@classmethod
|
|
def do_dec(cls, s, context=None, safe=False):
|
|
l, s, t = cls.check_type_check_len(s)
|
|
lst = []
|
|
while s:
|
|
l, s = BER_num_dec(s)
|
|
lst.append(l)
|
|
if (len(lst) > 0):
|
|
lst.insert(0, lst[0] // 40)
|
|
lst[1] %= 40
|
|
return (
|
|
cls.asn1_object(b".".join(str(k).encode('ascii') for k in lst)),
|
|
t,
|
|
)
|
|
|
|
|
|
class BERcodec_ENUMERATED(BERcodec_INTEGER):
|
|
tag = ASN1_Class_UNIVERSAL.ENUMERATED
|
|
|
|
|
|
class BERcodec_UTF8_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.UTF8_STRING
|
|
|
|
|
|
class BERcodec_NUMERIC_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING
|
|
|
|
|
|
class BERcodec_PRINTABLE_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
|
|
|
|
|
|
class BERcodec_T61_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.T61_STRING
|
|
|
|
|
|
class BERcodec_VIDEOTEX_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING
|
|
|
|
|
|
class BERcodec_IA5_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.IA5_STRING
|
|
|
|
|
|
class BERcodec_UTC_TIME(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.UTC_TIME
|
|
|
|
|
|
class BERcodec_GENERALIZED_TIME(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
|
|
|
|
|
|
class BERcodec_ISO646_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.ISO646_STRING
|
|
|
|
|
|
class BERcodec_UNIVERSAL_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.UNIVERSAL_STRING
|
|
|
|
|
|
class BERcodec_BMP_STRING(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.BMP_STRING
|
|
|
|
|
|
class BERcodec_SEQUENCE(BERcodec_Object):
|
|
tag = ASN1_Class_UNIVERSAL.SEQUENCE
|
|
|
|
@classmethod
|
|
def enc(cls, ll):
|
|
if not isinstance(ll, bytes):
|
|
ll = b"".join(x.enc(cls.codec) for x in ll)
|
|
return chb(hash(cls.tag)) + BER_len_enc(len(ll)) + ll
|
|
|
|
@classmethod
|
|
def do_dec(cls, s, context=None, safe=False):
|
|
if context is None:
|
|
context = cls.tag.context
|
|
ll, st = cls.check_type_get_len(s) # we may have len(s) < ll
|
|
s, t = st[:ll], st[ll:]
|
|
obj = []
|
|
while s:
|
|
try:
|
|
o, s = BERcodec_Object.dec(s, context, safe)
|
|
except BER_Decoding_Error as err:
|
|
err.remaining += t
|
|
if err.decoded is not None:
|
|
obj.append(err.decoded)
|
|
err.decoded = obj
|
|
raise
|
|
obj.append(o)
|
|
if len(st) < ll:
|
|
raise BER_Decoding_Error("Not enough bytes to decode sequence",
|
|
decoded=obj)
|
|
return cls.asn1_object(obj), t
|
|
|
|
|
|
class BERcodec_SET(BERcodec_SEQUENCE):
|
|
tag = ASN1_Class_UNIVERSAL.SET
|
|
|
|
|
|
class BERcodec_IPADDRESS(BERcodec_STRING):
|
|
tag = ASN1_Class_UNIVERSAL.IPADDRESS
|
|
|
|
@classmethod
|
|
def enc(cls, ipaddr_ascii):
|
|
try:
|
|
s = inet_aton(ipaddr_ascii)
|
|
except Exception:
|
|
raise BER_Encoding_Error("IPv4 address could not be encoded")
|
|
return chb(hash(cls.tag)) + BER_len_enc(len(s)) + s
|
|
|
|
@classmethod
|
|
def do_dec(cls, s, context=None, safe=False):
|
|
l, s, t = cls.check_type_check_len(s)
|
|
try:
|
|
ipaddr_ascii = inet_ntoa(s)
|
|
except Exception:
|
|
raise BER_Decoding_Error("IP address could not be decoded",
|
|
remaining=s)
|
|
return cls.asn1_object(ipaddr_ascii), t
|
|
|
|
|
|
class BERcodec_COUNTER32(BERcodec_INTEGER):
|
|
tag = ASN1_Class_UNIVERSAL.COUNTER32
|
|
|
|
|
|
class BERcodec_GAUGE32(BERcodec_INTEGER):
|
|
tag = ASN1_Class_UNIVERSAL.GAUGE32
|
|
|
|
|
|
class BERcodec_TIME_TICKS(BERcodec_INTEGER):
|
|
tag = ASN1_Class_UNIVERSAL.TIME_TICKS
|