# This file is part of Scapy # See http://www.secdev.org/projects/scapy for more information # Copyright (C) Philippe Biondi # This program is published under a GPLv2 license """ Unit testing infrastructure for Scapy """ from __future__ import print_function import bz2 import copy import code import getopt import glob import hashlib import importlib import json import logging import os.path import sys import time import traceback import warnings import zlib from scapy.consts import WINDOWS import scapy.modules.six as six from scapy.modules.six.moves import range from scapy.config import conf from scapy.compat import base64_bytes, bytes_hex, plain_str # Util class # class Bunch: __init__ = lambda self, **kw: setattr(self, '__dict__', kw) def retry_test(func): """Retries the passed function 3 times before failing""" success = False ex = Exception("Unknown") for _ in six.moves.range(3): try: result = func() except Exception as e: time.sleep(1) ex = e else: success = True break if not success: raise ex assert success return result # Import tool # def import_module(name): if name.endswith(".py"): name = name[:-3] try: return importlib.import_module(name, package="scapy") except Exception: return importlib.import_module(name) # INTERNAL/EXTERNAL FILE EMBEDDING # class File: def __init__(self, name, URL, local): self.name = name self.local = local.encode("utf8") self.URL = URL def get_local(self): return bz2.decompress(base64_bytes(self.local)) def get_URL(self): return self.URL def write(self, dir): if dir: dir += "/" with open(dir + self.name, "wb") as fdesc: fdesc.write(self.get_local()) # Embed a base64 encoded bziped version of js and css files # to work if you can't reach Internet. class External_Files: UTscapy_js = File("UTscapy.js", "https://scapy.net/files/UTscapy/UTscapy.js", # noqa: E501 """QlpoOTFBWSZTWWVijKQAAXxfgERUYOvAChIhBAC /79+qQAH8AFA0poANAMjQAAAGABo0NGEZNBo0\n0BhgAaNDRhGTQaNNAYFURJinp lGaKbRkJiekzSenqmpA0Gm1LFMpRUklVQlK9WUTZYpNFI1IiEWE\nFT09Sfj5uO+ qO6S5DQwKIxM92+Zku94wL6V/1KTKan2c66Ug6SmVKy1ZIrgauxMVLF5xLH0lJRQ u\nKlqLF10iatlTzqvw7S9eS3+h4lu3GZyMgoOude3NJ1pQy8eo+X96IYZw+yneh siPj73m0rnvQ3QX\nZ9BJQiZQYQ5/uNcl2WOlC5vyQqV/BWsnr2NZYLYXQLDs/Bf fk4ZfR4/SH6GfA5Xlek4xHNHqbSsR\nbREOgueXo3kcYi94K6hSO3ldD2O/qJXOF qJ8o3TE2aQahxtQpCVUKQMvODHwu2YkaORYZC6gihEa\nllcHDIAtRPScBACAJnU ggYhLDX6DEko7nC9GvAw5OcEkiyDUbLdiGCzDaXWMC2DuQ2Y6sGf6NcRu\nON7QS bhHsPc4KKmZ/xdyRThQkGVijKQ=\n""") UTscapy_css = File("UTscapy.css", "https://scapy.net/files/UTscapy/UTscapy.css", # noqa: E501 """QlpoOTFBWSZTWbpATIwAAFpfgHwQSB//+Cpj2Q C//9/6UAS5t7qcLut3NNDp0gxKMmpqaep6n6iP\n1J+pPU0yAAaeoaDI0BJCTJqa j1BoaGhoAAPSAAAJNSRqmmk8TQmj1DT1Hom1HkQABoNDmmJgATAB\nMAAJgACYJI hDQUzCR5Q0niRoaAGgGmZS+faw7LNbkliDG1Q52WJCd85cxRVVKegld8qCRISoto GD\nEGREFEYRW0CxAgTb13lodjuN7E1aCFgRFVhiEmZAZ/ek+XR0c8DWiAKpBgY2 LNpQ1rOvlnoUI1Al\n0ySaP1w2MyFxoQqRicScCm6WnQOxDnufxk8s2deLLKlN+r fvxyTTCGRAWZONkVGIxVQRZGZLeAwH\nbpQXZcYj467i85knEOYWmLcokaqEGYGS xMCpD+cOIaL7GCxEU/aNSlWFNCvQBvzb915huAgdIdD2\nya9ZQGoqrmtommfAxu 7FGTDBNBfir9UkAMmT1KRzxasJ0n2OE+mlgTZzJnhydbJaMtAk8DJzUuvv\nZpc3 CJLVyr8F3NmIQO5E3SJSY3SQnk1CQwlELqFutXjeWWzmiywo7xJk5rUcVOV9+Ro4 96WmXsUr\nkKhNocbnFztqPhesccW5kja+KuNFmzdw4DVOBJ2JPhGOYSwCUiwUe2 kOshYBdULUmwYwToAGdgA9\n5n3bSpG85LUFIE0Cw78EYVgY0ESnYW5UdfgBhj1w PiiXDEG2vAtr38O9kdwg3tFU/0okilEjDYDa\nEfkomkLUSokmE8g1fMYBqQyyaP RWmySO3EtAuMVhQqIuMldOzLqWubl7k1MnhuBaELOgtB2TChcS\n0k7jvgdBKIef UkdAf3t2GO/LVSrDvkcb4l4TrwrI7JeCo8pBvXqZBqZJSqbsAziG7QDQVNqdtFGz \nEvMKOvKvUQ6mJFigLxBnziGQGQDEMQPSGhlV2BwAN6rZEmLwgED0OrEiSxXDcB MDskp36AV7IbKa\nCila/Wm1BKhBF+ZIqtiFyYpUhI1Q5+JK0zK7aVyLS9y7GaSr NCRpr7uaa1UgapVKs6wKKQzYCWsV\n8iCGrAkgWZEnDMJWCGUZOIpcmMle1UXSAl d5OoUYXNo0L7WSOcxEkSGjCcRhjvMRP1pAUuBPRCRA\n2lhC0ZgLYDAf5V2agMUa ki1ZgOQDXQ7aIDTdjGRTgnzPML0V1X+tIoSSZmZhrxZbluMWGEkwwky6\n0ObWIM cEbX4cawPPBVc6m5UUPbEmBANyjtNvTKE2ri7oOmBVKIMLqQKm+4rlmisu2uGSxW zTov5w\nqQDp61FkHk40wzQUKk4YcBlbQT1l8VXeZJYAVFjSJIcC8JykBYZJ1yka I4LDm5WP7s2NaRkhhV7A\nFVSD5zA8V/DJzfTk0QHmCT2wRgwPKjP60EqqlDUaST /i7kinChIXSAmRgA==\n""") def get_local_dict(cls): return {x: y.name for (x, y) in six.iteritems(cls.__dict__) if isinstance(y, File)} get_local_dict = classmethod(get_local_dict) def get_URL_dict(cls): return {x: y.URL for (x, y) in six.iteritems(cls.__dict__) if isinstance(y, File)} get_URL_dict = classmethod(get_URL_dict) # HELPER CLASSES FOR PARAMETRING OUTPUT FORMAT # class EnumClass: def from_string(cls, x): return cls.__dict__[x.upper()] from_string = classmethod(from_string) class Format(EnumClass): TEXT = 1 ANSI = 2 HTML = 3 LATEX = 4 XUNIT = 5 LIVE = 6 # TEST CLASSES # class TestClass: def __getitem__(self, item): return getattr(self, item) def add_keywords(self, kws): if isinstance(kws, six.string_types): kws = [kws.lower()] for kwd in kws: kwd = kwd.lower() if kwd.startswith('-'): try: self.keywords.remove(kwd[1:]) except KeyError: pass else: self.keywords.add(kwd) class TestCampaign(TestClass): def __init__(self, title): self.title = title self.filename = None self.headcomments = "" self.campaign = [] self.keywords = set() self.crc = None self.sha = None self.preexec = None self.preexec_output = None self.end_pos = 0 self.interrupted = False self.duration = 0.0 def add_testset(self, testset): self.campaign.append(testset) testset.keywords.update(self.keywords) def trunc(self, index): self.campaign = self.campaign[:index] def startNum(self, beginpos): for ts in self: for t in ts: t.num = beginpos beginpos += 1 self.end_pos = beginpos def __iter__(self): return self.campaign.__iter__() def all_tests(self): for ts in self: for t in ts: yield t class TestSet(TestClass): def __init__(self, name): self.name = name self.tests = [] self.comments = "" self.keywords = set() self.crc = None self.expand = 1 def add_test(self, test): self.tests.append(test) test.keywords.update(self.keywords) def trunc(self, index): self.tests = self.tests[:index] def __iter__(self): return self.tests.__iter__() class UnitTest(TestClass): def __init__(self, name): self.name = name self.test = "" self.comments = "" self.result = "passed" # make instance True at init to have a different truth value than None self.duration = 0 self.output = "" self.num = -1 self.keywords = set() self.crc = None self.expand = 1 def decode(self): if six.PY2: self.test = self.test.decode("utf8", "ignore") self.output = self.output.decode("utf8", "ignore") self.comments = self.comments.decode("utf8", "ignore") self.result = self.result.decode("utf8", "ignore") def __nonzero__(self): return self.result == "passed" __bool__ = __nonzero__ # Careful note: all data not included will be set by default. # Use -c as first argument !! def parse_config_file(config_path, verb=3): """Parse provided json to get configuration Empty default json: { "testfiles": [], "breakfailed": true, "onlyfailed": false, "verb": 3, "dump": 0, "docs": 0, "crc": true, "preexec": {}, "global_preexec": "", "outputfile": null, "local": true, "format": "ansi", "num": null, "modules": [], "kw_ok": [], "kw_ko": [] } """ with open(config_path) as config_file: data = json.load(config_file) if verb > 2: print("### Loaded config file", config_path, file=sys.stderr) def get_if_exist(key, default): return data[key] if key in data else default return Bunch(testfiles=get_if_exist("testfiles", []), breakfailed=get_if_exist("breakfailed", True), remove_testfiles=get_if_exist("remove_testfiles", []), onlyfailed=get_if_exist("onlyfailed", False), verb=get_if_exist("verb", 3), dump=get_if_exist("dump", 0), crc=get_if_exist("crc", 1), docs=get_if_exist("docs", 0), preexec=get_if_exist("preexec", {}), global_preexec=get_if_exist("global_preexec", ""), outfile=get_if_exist("outputfile", sys.stdout), local=get_if_exist("local", False), num=get_if_exist("num", None), modules=get_if_exist("modules", []), kw_ok=get_if_exist("kw_ok", []), kw_ko=get_if_exist("kw_ko", []), format=get_if_exist("format", "ansi")) # PARSE CAMPAIGN # def parse_campaign_file(campaign_file): test_campaign = TestCampaign("Test campaign") test_campaign.filename = campaign_file.name testset = None test = None testnb = 0 for l in campaign_file.readlines(): if l[0] == '#': continue if l[0] == "~": (test or testset or test_campaign).add_keywords(l[1:].split()) elif l[0] == "%": test_campaign.title = l[1:].strip() elif l[0] == "+": testset = TestSet(l[1:].strip()) test_campaign.add_testset(testset) test = None elif l[0] == "=": test = UnitTest(l[1:].strip()) test.num = testnb testnb += 1 if testset is None: error_m = "Please create a test set (i.e. '+' section)." raise getopt.GetoptError(error_m) testset.add_test(test) elif l[0] == "*": if test is not None: test.comments += l[1:] elif testset is not None: testset.comments += l[1:] else: test_campaign.headcomments += l[1:] else: if test is None: if l.strip(): print("Unknown content [%s]" % l.strip(), file=sys.stderr) else: test.test += l return test_campaign def dump_campaign(test_campaign): print("#" * (len(test_campaign.title) + 6)) print("## %(title)s ##" % test_campaign) print("#" * (len(test_campaign.title) + 6)) if test_campaign.sha and test_campaign.crc: print("CRC=[%(crc)s] SHA=[%(sha)s]" % test_campaign) print("from file %(filename)s" % test_campaign) print() for ts in test_campaign: if ts.crc: print("+--[%s]%s(%s)--" % (ts.name, "-" * max(2, 80 - len(ts.name) - 18), ts.crc)) # noqa: E501 else: print("+--[%s]%s" % (ts.name, "-" * max(2, 80 - len(ts.name) - 6))) if ts.keywords: print(" kw=%s" % ",".join(ts.keywords)) for t in ts: print("%(num)03i %(name)s" % t) c = k = "" if t.keywords: k = "kw=%s" % ",".join(t.keywords) if t.crc: c = "[%(crc)s] " % t if c or k: print(" %s%s" % (c, k)) def docs_campaign(test_campaign): print("%(title)s" % test_campaign) print("=" * (len(test_campaign.title))) print() if len(test_campaign.headcomments): print("%s" % test_campaign.headcomments.strip().replace("\n", "")) print() for ts in test_campaign: print("%s" % ts.name) print("-" * len(ts.name)) print() if len(ts.comments): print("%s" % ts.comments.strip().replace("\n", "")) print() for t in ts: print("%s" % t.name) print("^" * len(t.name)) print() if len(t.comments): print("%s" % t.comments.strip().replace("\n", "")) print() print("Usage example::") for l in t.test.split('\n'): if not l.rstrip().endswith('# no_docs'): print("\t%s" % l) # COMPUTE CAMPAIGN DIGESTS # if six.PY2: def crc32(x): return "%08X" % (0xffffffff & zlib.crc32(x)) def sha1(x): return hashlib.sha1(x).hexdigest().upper() else: def crc32(x): return "%08X" % (0xffffffff & zlib.crc32(bytearray(x, "utf8"))) def sha1(x): return hashlib.sha1(x.encode("utf8")).hexdigest().upper() def compute_campaign_digests(test_campaign): dc = "" for ts in test_campaign: dts = "" for t in ts: dt = t.test.strip() t.crc = crc32(dt) dts += "\0" + dt ts.crc = crc32(dts) dc += "\0\x01" + dts test_campaign.crc = crc32(dc) with open(test_campaign.filename) as fdesc: test_campaign.sha = sha1(fdesc.read()) # FILTER CAMPAIGN # def filter_tests_on_numbers(test_campaign, num): if num: for ts in test_campaign: ts.tests = [t for t in ts.tests if t.num in num] test_campaign.campaign = [ts for ts in test_campaign.campaign if ts.tests] def _filter_tests_kw(test_campaign, kw, keep): def kw_match(lst, kw): return any(k for k in lst if kw == k) if kw: kw = kw.lower() if keep: cond = lambda x: x else: cond = lambda x: not x for ts in test_campaign: ts.tests = [t for t in ts.tests if cond(kw_match(t.keywords, kw))] def filter_tests_keep_on_keywords(test_campaign, kw): return _filter_tests_kw(test_campaign, kw, True) def filter_tests_remove_on_keywords(test_campaign, kw): return _filter_tests_kw(test_campaign, kw, False) def remove_empty_testsets(test_campaign): test_campaign.campaign = [ts for ts in test_campaign.campaign if ts.tests] # RUN TEST # def run_test(test, get_interactive_session, verb=3, ignore_globals=None, my_globals=None): """An internal UTScapy function to run a single test""" start_time = time.time() test.output, res = get_interactive_session(test.test.strip(), ignore_globals=ignore_globals, verb=verb, my_globals=my_globals) test.result = "failed" try: if res is None or res: test.result = "passed" if test.output.endswith('KeyboardInterrupt\n'): test.result = "interrupted" raise KeyboardInterrupt except Exception: test.output += "UTscapy: Error during result interpretation:\n" test.output += "".join(traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2],)) finally: test.duration = time.time() - start_time if test.result == "failed": from scapy.sendrecv import debug # Add optional debugging data to log if debug.crashed_on: cls, val = debug.crashed_on test.output += "\n\nPACKET DISSECTION FAILED ON:\n %s(hex_bytes('%s'))" % (cls.__name__, plain_str(bytes_hex(val))) debug.crashed_on = None test.decode() if verb > 2: print("%(result)6s %(crc)s %(duration)06.2fs %(name)s" % test, file=sys.stderr) elif verb > 1: print("%(result)6s %(crc)s %(name)s" % test, file=sys.stderr) return bool(test) # RUN CAMPAIGN # def import_UTscapy_tools(ses): """Adds UTScapy tools directly to a session""" ses["retry_test"] = retry_test ses["Bunch"] = Bunch if WINDOWS: from scapy.arch.windows import _route_add_loopback, IFACES _route_add_loopback() ses["IFACES"] = IFACES ses["conf"].route.routes = conf.route.routes ses["conf"].route6.routes = conf.route6.routes def run_campaign(test_campaign, get_interactive_session, drop_to_interpreter=False, verb=3, ignore_globals=None): # noqa: E501 passed = failed = 0 scapy_ses = importlib.import_module(".all", "scapy").__dict__ import_UTscapy_tools(scapy_ses) if test_campaign.preexec: test_campaign.preexec_output = get_interactive_session(test_campaign.preexec.strip(), ignore_globals=ignore_globals, my_globals=scapy_ses)[0] # Drop def drop(scapy_ses): code.interact(banner="Test '%s' failed. " "exit() to stop, Ctrl-D to leave " "this interpreter and continue " "with the current test campaign" % t.name, local=scapy_ses) try: for i, testset in enumerate(test_campaign): for j, t in enumerate(testset): if run_test(t, get_interactive_session, verb, my_globals=scapy_ses): passed += 1 else: failed += 1 if drop_to_interpreter: drop(scapy_ses) test_campaign.duration += t.duration except KeyboardInterrupt: failed += 1 testset.trunc(j + 1) test_campaign.trunc(i + 1) test_campaign.interrupted = True if verb: print("Campaign interrupted!", file=sys.stderr) if drop_to_interpreter: drop(scapy_ses) test_campaign.passed = passed test_campaign.failed = failed if verb > 2: print("Campaign CRC=%(crc)s in %(duration)06.2fs SHA=%(sha)s" % test_campaign, file=sys.stderr) # noqa: E501 print("PASSED=%i FAILED=%i" % (passed, failed), file=sys.stderr) elif verb: print("Campaign CRC=%(crc)s SHA=%(sha)s" % test_campaign, file=sys.stderr) # noqa: E501 print("PASSED=%i FAILED=%i" % (passed, failed), file=sys.stderr) return failed # INFO LINES # def info_line(test_campaign): filename = test_campaign.filename if filename is None: return "Run %s by UTscapy" % time.ctime() else: return "Run %s from [%s] by UTscapy" % (time.ctime(), filename) def html_info_line(test_campaign): filename = test_campaign.filename if filename is None: return """Run %s by UTscapy
""" % time.ctime() # noqa: E501 else: return """Run %s from [%s] by UTscapy
""" % (time.ctime(), filename) # noqa: E501 # CAMPAIGN TO something # def campaign_to_TEXT(test_campaign): output = "%(title)s\n" % test_campaign output += "-- " + info_line(test_campaign) + "\n\n" output += "Passed=%(passed)i\nFailed=%(failed)i\n\n%(headcomments)s\n" % test_campaign for testset in test_campaign: if any(t.expand for t in testset): output += "######\n## %(name)s\n######\n%(comments)s\n\n" % testset for t in testset: if t.expand: output += "###(%(num)03i)=[%(result)s] %(name)s\n%(comments)s\n%(output)s\n\n" % t # noqa: E501 return output def campaign_to_ANSI(test_campaign): return campaign_to_TEXT(test_campaign) def campaign_to_xUNIT(test_campaign): output = '\n\n' for testset in test_campaign: for t in testset: output += ' %(title)s

""" % test_campaign if test_campaign.crc is not None and test_campaign.sha is not None: output += "CRC=%(crc)s SHA=%(sha)s
" % test_campaign output += "" + html_info_line(test_campaign) + "" output += "".join([ test_campaign.headcomments, "\n

", "PASSED=%(passed)i FAILED=%(failed)i" % test_campaign, " INTERRUPTED!" if test_campaign.interrupted else "", "

\n\n", ]) for testset in test_campaign: output += "

" % testset if testset.crc is not None: output += "%(crc)s " % testset output += "%(name)s

\n%(comments)s\n
    \n" % testset for t in testset: output += """
  • \n""" % t if t.expand == 2: output += """ -%(num)03i- """ % t else: output += """ +%(num)03i+ """ % t if t.crc is not None: output += "%(crc)s\n" % t output += """%(name)s\n """ % t output += "\n
\n\n" return output def pack_html_campaigns(runned_campaigns, data, local=False, title=None): output = """ %(title)s

UTScapy tests

Shrink All Expand All Expand Passed Expand Failed

""" for test_campaign in runned_campaigns: for ts in test_campaign: for t in ts: output += """%(num)03i\n""" % t output += """

\n\n %(data)s """ out_dict = {'data': data, 'title': title if title else "UTScapy tests"} if local: dirname = os.path.dirname(test_campaign.output_file) External_Files.UTscapy_js.write(dirname) External_Files.UTscapy_css.write(dirname) out_dict.update(External_Files.get_local_dict()) else: out_dict.update(External_Files.get_URL_dict()) output %= out_dict return output def campaign_to_LATEX(test_campaign): output = r"""\documentclass{report} \usepackage{alltt} \usepackage{xcolor} \usepackage{a4wide} \usepackage{hyperref} \title{%(title)s} \date{%%s} \begin{document} \maketitle \tableofcontents \begin{description} \item[Passed:] %(passed)i \item[Failed:] %(failed)i \end{description} %(headcomments)s """ % test_campaign output %= info_line(test_campaign) for testset in test_campaign: output += "\\chapter{%(name)s}\n\n%(comments)s\n\n" % testset for t in testset: if t.expand: output += r"""\section{%(name)s} [%(num)03i] [%(result)s] %(comments)s \begin{alltt} %(output)s \end{alltt} """ % t output += "\\end{document}\n" return output # USAGE # def usage(): print("""Usage: UTscapy [-m module] [-f {text|ansi|HTML|LaTeX|live}] [-o output_file] [-t testfile] [-T testfile] [-k keywords [-k ...]] [-K keywords [-K ...]] [-l] [-b] [-d|-D] [-F] [-q[q]] [-i] [-P preexecute_python_code] [-c configfile] -t\t\t: provide test files (can be used many times) -T\t\t: if -t is used with *, remove a specific file (can be used many times) -l\t\t: generate local .js and .css files -F\t\t: expand only failed tests -b\t\t: don't stop at the first failed campaign -d\t\t: dump campaign -D\t\t: dump campaign and stop -R\t\t: dump campaign as reStructuredText -C\t\t: don't calculate CRC and SHA -c\t\t: load a .utsc config file -i\t\t: drop into Python interpreter if test failed -q\t\t: quiet mode -qq\t\t: [silent mode] -x\t\t: use pyannotate -n \t: only tests whose numbers are given (eg. 1,3-7,12) -N\t\t: force non root -m \t: additional module to put in the namespace -k ,,...\t: include only tests with one of those keywords (can be used many times) -K ,,...\t: remove tests with one of those keywords (can be used many times) -P """, file=sys.stderr) raise SystemExit # MAIN # def execute_campaign(TESTFILE, OUTPUTFILE, PREEXEC, NUM, KW_OK, KW_KO, DUMP, DOCS, FORMAT, VERB, ONLYFAILED, CRC, INTERPRETER, autorun_func, pos_begin=0, ignore_globals=None): # noqa: E501 # Parse test file test_campaign = parse_campaign_file(TESTFILE) # Report parameters if PREEXEC: test_campaign.preexec = PREEXEC # Compute campaign CRC and SHA if CRC: compute_campaign_digests(test_campaign) # Filter out unwanted tests filter_tests_on_numbers(test_campaign, NUM) for k in KW_OK: filter_tests_keep_on_keywords(test_campaign, k) for k in KW_KO: filter_tests_remove_on_keywords(test_campaign, k) remove_empty_testsets(test_campaign) # Dump campaign if DUMP: dump_campaign(test_campaign) if DUMP > 1: sys.exit() # Dump campaign as reStructuredText if DOCS: docs_campaign(test_campaign) sys.exit() # Run tests test_campaign.output_file = OUTPUTFILE result = run_campaign(test_campaign, autorun_func[FORMAT], drop_to_interpreter=INTERPRETER, verb=VERB, ignore_globals=None) # noqa: E501 # Shrink passed if ONLYFAILED: for t in test_campaign.all_tests(): if t: t.expand = 0 else: t.expand = 2 # Generate report if FORMAT == Format.TEXT: output = campaign_to_TEXT(test_campaign) elif FORMAT == Format.ANSI: output = campaign_to_ANSI(test_campaign) elif FORMAT == Format.HTML: test_campaign.startNum(pos_begin) output = campaign_to_HTML(test_campaign) elif FORMAT == Format.LATEX: output = campaign_to_LATEX(test_campaign) elif FORMAT == Format.XUNIT: output = campaign_to_xUNIT(test_campaign) elif FORMAT == Format.LIVE: output = "" return output, (result == 0), test_campaign def resolve_testfiles(TESTFILES): for tfile in TESTFILES[:]: if "*" in tfile: TESTFILES.remove(tfile) TESTFILES.extend(glob.glob(tfile)) return TESTFILES def main(): argv = sys.argv[1:] logger = logging.getLogger("scapy") logger.addHandler(logging.StreamHandler()) ignore_globals = list(six.moves.builtins.__dict__) # Parse arguments FORMAT = Format.ANSI OUTPUTFILE = sys.stdout LOCAL = 0 NUM = None NON_ROOT = False KW_OK = [] KW_KO = [] DUMP = 0 DOCS = 0 CRC = True BREAKFAILED = True ONLYFAILED = False VERB = 3 GLOB_PREEXEC = "" PREEXEC_DICT = {} MODULES = [] TESTFILES = [] ANNOTATIONS_MODE = False INTERPRETER = False try: opts = getopt.getopt(argv, "o:t:T:c:f:hbln:m:k:K:DRdCiFqNP:s:x") for opt, optarg in opts[0]: if opt == "-h": usage() elif opt == "-b": BREAKFAILED = False elif opt == "-F": ONLYFAILED = True elif opt == "-q": VERB -= 1 elif opt == "-D": DUMP = 2 elif opt == "-R": DOCS = 1 elif opt == "-d": DUMP = 1 elif opt == "-C": CRC = False elif opt == "-i": INTERPRETER = True elif opt == "-x": ANNOTATIONS_MODE = True elif opt == "-P": GLOB_PREEXEC += "\n" + optarg elif opt == "-f": try: FORMAT = Format.from_string(optarg) except KeyError as msg: raise getopt.GetoptError("Unknown output format %s" % msg) elif opt == "-t": TESTFILES.append(optarg) TESTFILES = resolve_testfiles(TESTFILES) elif opt == "-T": TESTFILES.remove(optarg) elif opt == "-c": data = parse_config_file(optarg, VERB) BREAKFAILED = data.breakfailed ONLYFAILED = data.onlyfailed VERB = data.verb DUMP = data.dump CRC = data.crc PREEXEC_DICT = data.preexec GLOB_PREEXEC = data.global_preexec OUTPUTFILE = data.outfile TESTFILES = data.testfiles LOCAL = 1 if data.local else 0 NUM = data.num MODULES = data.modules KW_OK.extend(data.kw_ok) KW_KO.extend(data.kw_ko) try: FORMAT = Format.from_string(data.format) except KeyError as msg: raise getopt.GetoptError("Unknown output format %s" % msg) TESTFILES = resolve_testfiles(TESTFILES) for testfile in resolve_testfiles(data.remove_testfiles): try: TESTFILES.remove(testfile) except ValueError: error_m = "Cannot remove %s from test files" % testfile raise getopt.GetoptError(error_m) elif opt == "-o": OUTPUTFILE = optarg if not os.access(os.path.dirname(os.path.abspath(OUTPUTFILE)), os.W_OK): raise getopt.GetoptError("Cannot write to file %s" % OUTPUTFILE) elif opt == "-l": LOCAL = 1 elif opt == "-n": NUM = [] for v in (x.strip() for x in optarg.split(",")): try: NUM.append(int(v)) except ValueError: v1, v2 = [int(e) for e in v.split('-', 1)] NUM.extend(range(v1, v2 + 1)) elif opt == "-N": NON_ROOT = True elif opt == "-m": MODULES.append(optarg) elif opt == "-k": KW_OK.extend(optarg.split(",")) elif opt == "-K": KW_KO.extend(optarg.split(",")) # Disable tests if needed # Discard Python3 tests when using Python2 if six.PY2: KW_KO.append("python3_only") if VERB > 2: print("### Python 2 mode ###") try: if NON_ROOT or os.getuid() != 0: # Non root # Discard root tests KW_KO.append("netaccess") KW_KO.append("needs_root") if VERB > 2: print("### Non-root mode ###") except AttributeError: pass if conf.use_pcap: KW_KO.append("not_pcapdnet") if VERB > 2: print("### libpcap mode ###") # Process extras if six.PY3: KW_KO.append("FIXME_py3") if ANNOTATIONS_MODE: try: from pyannotate_runtime import collect_types except ImportError: raise ImportError("Please install pyannotate !") collect_types.init_types_collection() collect_types.start() if VERB > 2: print("### Booting scapy...", file=sys.stderr) try: from scapy import all as scapy except Exception as e: print("[CRITICAL]: Cannot import Scapy: %s" % e, file=sys.stderr) traceback.print_exc() sys.exit(1) # Abort the tests for m in MODULES: try: mod = import_module(m) six.moves.builtins.__dict__.update(mod.__dict__) except ImportError as e: raise getopt.GetoptError("cannot import [%s]: %s" % (m, e)) # Add SCAPY_ROOT_DIR environment variable, used for tests os.environ['SCAPY_ROOT_DIR'] = os.environ.get("PWD", os.getcwd()) except getopt.GetoptError as msg: print("ERROR:", msg, file=sys.stderr) raise SystemExit autorun_func = { Format.TEXT: scapy.autorun_get_text_interactive_session, Format.ANSI: scapy.autorun_get_ansi_interactive_session, Format.HTML: scapy.autorun_get_html_interactive_session, Format.LATEX: scapy.autorun_get_latex_interactive_session, Format.XUNIT: scapy.autorun_get_text_interactive_session, Format.LIVE: scapy.autorun_get_live_interactive_session, } if VERB > 2: print("### Starting tests...", file=sys.stderr) glob_output = "" glob_result = 0 glob_title = None UNIQUE = len(TESTFILES) == 1 # Resolve tags and asterix for prex in six.iterkeys(copy.copy(PREEXEC_DICT)): if "*" in prex: pycode = PREEXEC_DICT[prex] del PREEXEC_DICT[prex] for gl in glob.iglob(prex): _pycode = pycode.replace("%name%", os.path.splitext(os.path.split(gl)[1])[0]) # noqa: E501 PREEXEC_DICT[gl] = _pycode pos_begin = 0 runned_campaigns = [] # Execute all files for TESTFILE in TESTFILES: if VERB > 2: print("### Loading:", TESTFILE, file=sys.stderr) PREEXEC = PREEXEC_DICT[TESTFILE] if TESTFILE in PREEXEC_DICT else GLOB_PREEXEC with open(TESTFILE) as testfile: output, result, campaign = execute_campaign(testfile, OUTPUTFILE, PREEXEC, NUM, KW_OK, KW_KO, DUMP, DOCS, FORMAT, VERB, ONLYFAILED, CRC, INTERPRETER, autorun_func, pos_begin, ignore_globals) runned_campaigns.append(campaign) pos_begin = campaign.end_pos if UNIQUE: glob_title = campaign.title glob_output += output if not result: glob_result = 1 if BREAKFAILED: break if VERB > 2: print("### Writing output...", file=sys.stderr) if ANNOTATIONS_MODE: collect_types.stop() collect_types.dump_stats("pyannotate_results") # Concenate outputs if FORMAT == Format.HTML: glob_output = pack_html_campaigns(runned_campaigns, glob_output, LOCAL, glob_title) # Write the final output # Note: on Python 2, we force-encode to ignore ascii errors # on Python 3, we need to detect the type of stream if OUTPUTFILE == sys.stdout: OUTPUTFILE.write(glob_output.encode("utf8", "ignore") if 'b' in OUTPUTFILE.mode or six.PY2 else glob_output) else: with open(OUTPUTFILE, "wb") as f: f.write(glob_output.encode("utf8", "ignore") if 'b' in f.mode or six.PY2 else glob_output) # Delete scapy's test environment vars del os.environ['SCAPY_ROOT_DIR'] # Return state return glob_result if __name__ == "__main__": if sys.warnoptions: with warnings.catch_warnings(record=True) as cw: warnings.resetwarnings() # Let's discover the garbage waste warnings.simplefilter('error') print("### Warning mode enabled ###") res = main() if cw: res = 1 sys.exit(res) else: sys.exit(main())