import re import struct import copy import espytrace.apptrace as apptrace SYSVIEW_EVTID_NOP = 0 # Dummy packet. SYSVIEW_EVTID_OVERFLOW = 1 SYSVIEW_EVTID_ISR_ENTER = 2 SYSVIEW_EVTID_ISR_EXIT = 3 SYSVIEW_EVTID_TASK_START_EXEC = 4 SYSVIEW_EVTID_TASK_STOP_EXEC = 5 SYSVIEW_EVTID_TASK_START_READY = 6 SYSVIEW_EVTID_TASK_STOP_READY = 7 SYSVIEW_EVTID_TASK_CREATE = 8 SYSVIEW_EVTID_TASK_INFO = 9 SYSVIEW_EVTID_TRACE_START = 10 SYSVIEW_EVTID_TRACE_STOP = 11 SYSVIEW_EVTID_SYSTIME_CYCLES = 12 SYSVIEW_EVTID_SYSTIME_US = 13 SYSVIEW_EVTID_SYSDESC = 14 SYSVIEW_EVTID_USER_START = 15 SYSVIEW_EVTID_USER_STOP = 16 SYSVIEW_EVTID_IDLE = 17 SYSVIEW_EVTID_ISR_TO_SCHEDULER = 18 SYSVIEW_EVTID_TIMER_ENTER = 19 SYSVIEW_EVTID_TIMER_EXIT = 20 SYSVIEW_EVTID_STACK_INFO = 21 SYSVIEW_EVTID_MODULEDESC = 22 SYSVIEW_EVTID_INIT = 24 SYSVIEW_EVENT_ID_PREDEF_LEN_MAX = SYSVIEW_EVTID_INIT SYSVIEW_EVTID_NAME_RESOURCE = 25 SYSVIEW_EVTID_PRINT_FORMATTED = 26 SYSVIEW_EVTID_NUMMODULES = 27 SYSVIEW_EVENT_ID_PREDEF_MAX = SYSVIEW_EVTID_NUMMODULES SYSVIEW_EVENT_ID_MAX = 200 SYSVIEW_MODULE_EVENT_OFFSET = 512 SYSVIEW_SYNC_LEN = 10 def parse_trace(reader, parser, os_evt_map_file=''): """ Parses trace. Parameters ---------- reader : apptrace.Reader Trace reader object. parser : SysViewTraceDataParser Top level parser object. os_evt_map_file : string Path to file containg events format description. """ # parse OS events formats file os_evt_map = _read_events_map(os_evt_map_file) _read_file_header(reader) _read_init_seq(reader) while True: event = parser.read_event(reader, os_evt_map) parser.on_new_event(event) def _read_events_map(os_evt_map_file): """ Reads SystemView events format description from file. Parameters ---------- os_evt_map_file : string Path to file containg events format description. Returns ------- dict a dict with event IDs as keys and values as tuples containg event name and a list of parameters. """ os_evt_map = {} with open(os_evt_map_file) as f: for line in f: comps = line.split() if len(comps) < 2: continue params = [] if len(comps) > 2: for p in comps[2:]: sp = p.split('=') if sp[1].startswith('%'): sp[1] = sp[1][1:] if sp[1] == 'u': params.append(SysViewEventParamSimple(sp[0], _decode_u32)) elif sp[1] == 's': params.append(SysViewEventParamSimple(sp[0], _decode_str)) elif sp[1] == 't' or sp[1] == 'T' or sp[1] == 'I' or sp[1] == 'p': # TODO: handle shrinked task/queue ID and addresses params.append(SysViewEventParamSimple(sp[0], _decode_u32)) os_evt_map[int(comps[0])] = (comps[1], params) return os_evt_map def _read_file_header(reader): """ Reads SystemView trace file header. Parameters ---------- reader : apptrace.Reader Trace reader object. Returns ------- list a list of header lines. """ empty_count = 0 lines = [] while empty_count < 2: lines.append(reader.readline(linesep='\n')) if lines[-1] == ';\n': empty_count += 1 return lines def _read_init_seq(reader): """ Reads SystemView trace initial synchronisation sequence of bytes. Parameters ---------- reader : apptrace.Reader Trace reader object. Raises ------- SysViewTraceParseError If sync sequence is broken. """ SYNC_SEQ_FMT = '<%dB' % SYSVIEW_SYNC_LEN sync_bytes = struct.unpack(SYNC_SEQ_FMT, reader.read(struct.calcsize(SYNC_SEQ_FMT))) for b in sync_bytes: if b != 0: raise SysViewTraceParseError("Invalid sync sequense!") def _decode_u32(reader): """ Reads and decodes unsigned 32-bit integer. Parameters ---------- reader : apptrace.Reader Trace reader object. Returns ------- tuple a tuple containg number of read bytes and decoded value. """ sz = 0 val = 0 while True: b, = struct.unpack('= SYSVIEW_EVENT_ID_PREDEF_LEN_MAX: self.plen = _decode_plen(reader) if events_fmt_map: self._read_payload(reader, events_fmt_map) else: reader.forward(self.plen) _,self.ts = _decode_u32(reader) def _read_payload(self, reader, events_fmt_map): """ Reads event's payload and populates its parameters dictionary. Parameters ---------- reader : apptrace.Reader Trace reader object. events_fmt_map : dict see return value of _read_events_map() Raises ------- SysViewTraceParseError if event has unknown or invalid format. """ if self.id not in events_fmt_map: raise SysViewTraceParseError("Unknown event ID %d!" % self.id) self.name = events_fmt_map[self.id][0] evt_params_templates = events_fmt_map[self.id][1] params_len = 0 for i in range(len(evt_params_templates)): event_param = copy.deepcopy(evt_params_templates[i]) try: cur_pos = reader.get_pos() sz,param_val = event_param.decode(reader, self.plen - params_len) except Exception as e: raise SysViewTraceParseError("Failed to decode event {}({:d}) {:d} param @ 0x{:x}! {}".format(self.name, self.id, self.plen, cur_pos, e)) event_param.idx = i event_param.value = param_val self.params[event_param.name] = event_param params_len += sz if self.id >= SYSVIEW_EVENT_ID_PREDEF_LEN_MAX and self.plen != params_len: raise SysViewTraceParseError("Invalid event {}({:d}) payload len {:d}! Must be {:d}.".format(self.name, self.id, self.plen, params_len)) def __str__(self): params = '' for param in sorted(self.params.values(), key=lambda x: x.idx): params += '{}, '.format(param) if len(params): params = params[:-2] # remove trailing ', ' return '{:.9f} - core[{:d}].{}({:d}), plen {:d}: [{}]'.format(self.ts, self.core_id, self.name, self.id, self.plen, params) class SysViewEventParam: """ Abstract base SystemView event's parameter class. This is a base class for all event's parameters. """ def __init__(self, name, decode_func): """ Constructor. Parameters ---------- name : string Event parameter name. decode_func : callable object Parameter decoding function. """ self.name = name self.decode_func = decode_func self.value = None # positional index, used for sorting parameters before printing to make them looking as they appear in the event self.idx = 0 def decode(self, reader, max_sz): """ Reads and decodes events parameter. Parameters ---------- reader : apptrace.Reader Trace reader object. max_sz : int Maximum number of bytes to read. Returns ------- tuple a tuple containg number of read bytes and decoded value. """ pass def __str__(self): return '{}: {}'.format(self.name, self.value) class SysViewEventParamSimple(SysViewEventParam): """ Simple SystemView event's parameter class. """ def decode(self, reader, max_sz): """ see SysViewEventParam.decode() """ return self.decode_func(reader) class SysViewEventParamArray(SysViewEventParamSimple): """ Array SystemView event's parameter class. """ def __init__(self, name, decode_func, size=-1): """ Constructor. Parameters ---------- name : string see SysViewEventParam.__init__() decode_func : callable object see SysViewEventParam.__init__() size : int Array's size. If -1 decode() will try to read all bytes from reader. """ SysViewEventParamSimple.__init__(self, name, decode_func) self.arr_size = size def decode(self, reader, max_sz): """ see SysViewEventParam.decode() """ tottal_sz = 0 vals = [] i = 0 while tottal_sz < max_sz: sz,val = self.decode_func(reader) vals.append(val) tottal_sz += sz i += 1 if self.arr_size != -1 and i == self.arr_size: break return tottal_sz,vals class SysViewPredefinedEvent(SysViewEvent): """ Pre-defined SystemView events class. """ _predef_events_fmt = { SYSVIEW_EVTID_NOP: ('svNop', []), SYSVIEW_EVTID_OVERFLOW: ('svOverflow', [SysViewEventParamSimple('drop_cnt', _decode_u32)]), SYSVIEW_EVTID_ISR_ENTER: ('svIsrEnter', [SysViewEventParamSimple('irq_num', _decode_u32)]), SYSVIEW_EVTID_ISR_EXIT: ('svIsrExit', []), SYSVIEW_EVTID_TASK_START_EXEC: ('svTaskStartExec', [SysViewEventParamSimple('tid', _decode_id)]), SYSVIEW_EVTID_TASK_STOP_EXEC: ('svTaskStopExec', []), SYSVIEW_EVTID_TASK_START_READY: ('svTaskStartReady', [SysViewEventParamSimple('tid', _decode_id)]), SYSVIEW_EVTID_TASK_STOP_READY: ('svTaskStopReady', [SysViewEventParamSimple('tid', _decode_id), SysViewEventParamSimple('cause', _decode_u32)]), SYSVIEW_EVTID_TASK_CREATE: ('svTaskCreate', [SysViewEventParamSimple('tid', _decode_id)]), SYSVIEW_EVTID_TASK_INFO: ('svTaskInfo', [SysViewEventParamSimple('tid', _decode_id), SysViewEventParamSimple('prio', _decode_u32), SysViewEventParamSimple('name', _decode_str)]), SYSVIEW_EVTID_TRACE_START: ('svTraceStart', []), SYSVIEW_EVTID_TRACE_STOP: ('svTraceStop', []), SYSVIEW_EVTID_SYSTIME_CYCLES: ('svSysTimeCycles', [SysViewEventParamSimple('cycles', _decode_u32)]), SYSVIEW_EVTID_SYSTIME_US: ('svSysTimeUs', [SysViewEventParamSimple('time', _decode_u64)]), SYSVIEW_EVTID_SYSDESC: ('svSysDesc', [SysViewEventParamSimple('desc', _decode_str)]), SYSVIEW_EVTID_USER_START: ('svUserStart', [SysViewEventParamSimple('user_id', _decode_u32)]), SYSVIEW_EVTID_USER_STOP: ('svUserStart', [SysViewEventParamSimple('user_id', _decode_u32)]), SYSVIEW_EVTID_IDLE: ('svIdle', []), SYSVIEW_EVTID_ISR_TO_SCHEDULER: ('svExitIsrToScheduler', []), SYSVIEW_EVTID_TIMER_ENTER: ('svTimerEnter', [SysViewEventParamSimple('tim_id', _decode_u32)]), SYSVIEW_EVTID_TIMER_EXIT: ('svTimerExit', []), SYSVIEW_EVTID_STACK_INFO: ('svStackInfo', [SysViewEventParamSimple('tid', _decode_id), SysViewEventParamSimple('base', _decode_u32), SysViewEventParamSimple('sz', _decode_u32), SysViewEventParamSimple('unused', _decode_u32)]), SYSVIEW_EVTID_MODULEDESC: ('svModuleDesc', [SysViewEventParamSimple('mod_id', _decode_u32), SysViewEventParamSimple('evt_off', _decode_u32), SysViewEventParamSimple('desc', _decode_str)]), SYSVIEW_EVTID_INIT: ('svInit', [SysViewEventParamSimple('sys_freq', _decode_u32), SysViewEventParamSimple('cpu_freq', _decode_u32), SysViewEventParamSimple('ram_base', _decode_u32), SysViewEventParamSimple('id_shift', _decode_u32)]), SYSVIEW_EVTID_NAME_RESOURCE: ('svNameResource', [SysViewEventParamSimple('res_id', _decode_u32), SysViewEventParamSimple('name', _decode_str)]), SYSVIEW_EVTID_PRINT_FORMATTED: ('svPrint', [SysViewEventParamSimple('msg', _decode_str), SysViewEventParamSimple('id', _decode_u32), SysViewEventParamSimple('unused', _decode_u32)]), SYSVIEW_EVTID_NUMMODULES: ('svNumModules', [SysViewEventParamSimple('mod_cnt', _decode_u32)]), } def __init__(self, evt_id, reader, core_id): """ see SysViewEvent.__init__() """ self.name = 'SysViewPredefinedEvent' SysViewEvent.__init__(self, evt_id, reader, core_id, self._predef_events_fmt) class SysViewOSEvent(SysViewEvent): """ OS related SystemView events class. """ def __init__(self, evt_id, reader, core_id, events_fmt_map): """ see SysViewEvent.__init__() """ self.name = 'SysViewOSEvent' SysViewEvent.__init__(self, evt_id, reader, core_id, events_fmt_map) class SysViewHeapEvent(SysViewEvent): """ Heap related SystemView events class. Attributes ---------- events_fmt : dict see return value of _read_events_map() """ events_fmt = { 0: ('esp_sysview_heap_trace_alloc', [SysViewEventParamSimple('addr', _decode_u32), SysViewEventParamSimple('size', _decode_u32), SysViewEventParamArray('callers', _decode_u32)]), 1: ('esp_sysview_heap_trace_free', [SysViewEventParamSimple('addr', _decode_u32), SysViewEventParamArray('callers', _decode_u32)]), } def __init__(self, evt_id, events_off, reader, core_id): """ Constructor. Reads and optionally decodes event. Parameters ---------- evt_id : int see SysViewEvent.__init__() events_off : int Offset for heap events IDs. Greater or equal to SYSVIEW_MODULE_EVENT_OFFSET. reader : apptrace.Reader see SysViewEvent.__init__() core_id : int see SysViewEvent.__init__() """ self.name = 'SysViewHeapEvent' cur_events_map = {} for id in self.events_fmt: cur_events_map[events_off + id] = self.events_fmt[id] SysViewEvent.__init__(self, evt_id, reader, core_id, cur_events_map) class SysViewTraceDataParser(apptrace.TraceDataProcessor): """ Base SystemView trace data parser class. Attributes ---------- STREAMID_SYS : int system events stream ID. Reserved for internal uses. STREAMID_LOG : int log events stream ID. STREAMID_HEAP : int heap events stream ID. """ STREAMID_SYS = -1 STREAMID_LOG = 0 STREAMID_HEAP = 1 def __init__(self, print_events=False, core_id=0): """ Constructor. Parameters ---------- print_events : bool see apptrace.TraceDataProcessor.__init__() core_id : int id of the core this parser object relates to. """ apptrace.TraceDataProcessor.__init__(self, print_events, keep_all_events=True) self.sys_info = None self._last_ts = 0 self.irqs_info = {} self.tasks_info = {} self.core_id = core_id def _parse_irq_desc(self, desc): """ Parses IRQ description. Parameters ---------- desc : string IRQ description string. Returns ------- tuple a tuple with IRQ number and name or None on error. """ m = re.match('I#([0-9]+)=(.+)', desc) if m: return m.group(2),m.group(1) return None def _update_ts(self, ts): """ Calculates real event timestamp. Parameters ---------- ts : int Event timestamp offset. Returns ------- float real event timestamp. """ self._last_ts += ts return float(self._last_ts) / self.sys_info.params['sys_freq'].value def read_extension_event(self, evt_id, reader): """ Reads extension event. Default implementation which just reads out event. Parameters ---------- evt_id : int Event ID. reader : apptrace.Reader Trace reader object. Returns ------- SysViewEvent if this is top level parser returns object for generic event, otherwise returns None indicating to the calling top level parser that extension event are not supported. """ if self.root_proc == self: # by default just read out and skip unknown event return SysViewEvent(evt_id, reader, self.core_id) return None # let decide to root parser def read_event(self, reader, os_evt_map): """ Reads pre-defined or OS-related event. Parameters ---------- reader : apptrace.Reader Trace reader object. os_evt_map : dict see return value of _read_events_map() Returns ------- SysViewEvent pre-defined, OS-related or extension event object. """ evt_hdr, = struct.unpack('= SYSVIEW_MODULE_EVENT_OFFSET and evt_id >= self.events_off and evt_id < self.events_off + self.events_num): return SysViewHeapEvent(evt_id, self.events_off, reader, self.core_id) return SysViewTraceDataParser.read_extension_event(self, evt_id, reader) def on_new_event(self, event): """ Keeps track of heap module descriptions. """ if self.root_proc == self: SysViewTraceDataParser.on_new_event(self, event) if event.id == SYSVIEW_EVTID_MODULEDESC and event.params['desc'].value == 'ESP32 SystemView Heap Tracing Module': self.events_off = event.params['evt_off'].value class SysViewHeapTraceDataProcessor(SysViewTraceDataProcessor, apptrace.BaseHeapTraceDataProcessorImpl): """ SystemView trace data processor supporting heap events. """ def __init__(self, toolchain_pref, elf_path, traces=[], print_events=False, print_heap_events=False): """ Constructor. see SysViewTraceDataProcessor.__init__() see apptrace.BaseHeapTraceDataProcessorImpl.__init__() """ SysViewTraceDataProcessor.__init__(self, traces, print_events) apptrace.BaseHeapTraceDataProcessorImpl.__init__(self, print_heap_events) self.toolchain = toolchain_pref self.elf_path = elf_path self.no_ctx_events = [] def on_new_event(self, event): """ Processes heap events. """ if self.root_proc == self: SysViewTraceDataProcessor.on_new_event(self, event) heap_stream = self.root_proc.get_trace_stream(event.core_id, SysViewTraceDataParser.STREAMID_HEAP) if (heap_stream.events_off < SYSVIEW_MODULE_EVENT_OFFSET or event.id < heap_stream.events_off or event.id >= (heap_stream.events_off + heap_stream.events_num)): return curr_ctx = self._get_curr_context(event.core_id) if curr_ctx: in_irq = curr_ctx.irq ctx_name = curr_ctx.name else: in_irq = False ctx_name = 'None' if (event.id - heap_stream.events_off) == 0: heap_event = apptrace.HeapTraceEvent(ctx_name, in_irq, event.core_id, event.ts, True, event.params['size'].value, event.params['addr'].value, event.params['callers'].value, toolchain=self.toolchain, elf_path=self.elf_path) else: heap_event = apptrace.HeapTraceEvent(ctx_name, in_irq, event.core_id, event.ts, False, 0, event.params['addr'].value, event.params['callers'].value, toolchain=self.toolchain, elf_path=self.elf_path) if not curr_ctx: # postpone events handling till their context is known self.no_ctx_events.append(heap_event) else: # here we know the previous context: we switched from it or implied upon the 1st context switching event prev_ctx = self._get_prev_context(event.core_id) for cached_evt in self.no_ctx_events: cached_evt.ctx_name = prev_ctx.name cached_evt.in_irq = prev_ctx.irq apptrace.BaseHeapTraceDataProcessorImpl.on_new_event(self, cached_evt) del self.no_ctx_events[:] apptrace.BaseHeapTraceDataProcessorImpl.on_new_event(self, heap_event) def print_report(self): """ see apptrace.TraceDataProcessor.print_report() """ if self.root_proc == self: SysViewTraceDataProcessor.print_report(self) apptrace.BaseHeapTraceDataProcessorImpl.print_report(self) class SysViewLogTraceEvent(apptrace.LogTraceEvent): """ SystemView log event. """ def __init__(self, ts, msg): """ Constructor. Parameters ---------- msg : string Log message string. """ self.msg = msg self.ts = ts def get_message(self, unused): """ Retrieves log message. Returns ------- string formatted log message """ return '[{:.9f}] LOG: {}'.format(self.ts, self.msg) class SysViewLogTraceDataParser(SysViewTraceDataParser): """ SystemView trace data parser supporting log events. """ def on_new_event(self, event): """ see SysViewTraceDataParser.on_new_event() """ if self.root_proc == self: SysViewTraceDataParser.on_new_event(self, event) class SysViewLogTraceDataProcessor(SysViewTraceDataProcessor, apptrace.BaseLogTraceDataProcessorImpl): """ SystemView trace data processor supporting heap events. """ def __init__(self, traces=[], print_events=False, print_log_events=False): """ Constructor. see SysViewTraceDataProcessor.__init__() see apptrace.BaseLogTraceDataProcessorImpl.__init__() """ SysViewTraceDataProcessor.__init__(self, traces, print_events) apptrace.BaseLogTraceDataProcessorImpl.__init__(self, print_log_events) def on_new_event(self, event): """ Processes log events. """ if self.root_proc == self: SysViewTraceDataProcessor.on_new_event(self, event) if event.id == SYSVIEW_EVTID_PRINT_FORMATTED: log_evt = SysViewLogTraceEvent(event.ts, event.params['msg'].value) apptrace.BaseLogTraceDataProcessorImpl.on_new_event(self, log_evt) def print_report(self): """ see apptrace.TraceDataProcessor.print_report() """ if self.root_proc == self: SysViewTraceDataProcessor.print_report(self) apptrace.BaseLogTraceDataProcessorImpl.print_report(self)