import re import struct import copy import json import espytrace.apptrace as apptrace SYSVIEW_EVTID_NOP = 0 # Dummy packet. SYSVIEW_EVTID_OVERFLOW = 1 SYSVIEW_EVTID_ISR_ENTER = 2 SYSVIEW_EVTID_ISR_EXIT = 3 SYSVIEW_EVTID_TASK_START_EXEC = 4 SYSVIEW_EVTID_TASK_STOP_EXEC = 5 SYSVIEW_EVTID_TASK_START_READY = 6 SYSVIEW_EVTID_TASK_STOP_READY = 7 SYSVIEW_EVTID_TASK_CREATE = 8 SYSVIEW_EVTID_TASK_INFO = 9 SYSVIEW_EVTID_TRACE_START = 10 SYSVIEW_EVTID_TRACE_STOP = 11 SYSVIEW_EVTID_SYSTIME_CYCLES = 12 SYSVIEW_EVTID_SYSTIME_US = 13 SYSVIEW_EVTID_SYSDESC = 14 SYSVIEW_EVTID_USER_START = 15 SYSVIEW_EVTID_USER_STOP = 16 SYSVIEW_EVTID_IDLE = 17 SYSVIEW_EVTID_ISR_TO_SCHEDULER = 18 SYSVIEW_EVTID_TIMER_ENTER = 19 SYSVIEW_EVTID_TIMER_EXIT = 20 SYSVIEW_EVTID_STACK_INFO = 21 SYSVIEW_EVTID_MODULEDESC = 22 SYSVIEW_EVTID_INIT = 24 SYSVIEW_EVENT_ID_PREDEF_LEN_MAX = SYSVIEW_EVTID_INIT SYSVIEW_EVTID_NAME_RESOURCE = 25 SYSVIEW_EVTID_PRINT_FORMATTED = 26 SYSVIEW_EVTID_NUMMODULES = 27 SYSVIEW_EVENT_ID_PREDEF_MAX = SYSVIEW_EVTID_NUMMODULES SYSVIEW_EVENT_ID_MAX = 200 SYSVIEW_MODULE_EVENT_OFFSET = 512 SYSVIEW_SYNC_LEN = 10 _sysview_events_map = { "SYS_NOP": SYSVIEW_EVTID_NOP, "SYS_OVERFLOW": SYSVIEW_EVTID_OVERFLOW, "SYS_ISR_ENTER": SYSVIEW_EVTID_ISR_ENTER, "SYS_ISR_EXIT": SYSVIEW_EVTID_ISR_EXIT, "SYS_TASK_START_EXEC": SYSVIEW_EVTID_TASK_START_EXEC, "SYS_TASK_STOP_EXEC": SYSVIEW_EVTID_TASK_STOP_EXEC, "SYS_TASK_START_READY": SYSVIEW_EVTID_TASK_START_READY, "SYS_TASK_STOP_READY": SYSVIEW_EVTID_TASK_STOP_READY, "SYS_TASK_CREATE": SYSVIEW_EVTID_TASK_CREATE, "SYS_TASK_INFO": SYSVIEW_EVTID_TASK_INFO, "SYS_TRACE_START": SYSVIEW_EVTID_TRACE_START, "SYS_TRACE_STOP": SYSVIEW_EVTID_TRACE_STOP, "SYS_SYSTIME_CYCLES": SYSVIEW_EVTID_SYSTIME_CYCLES, "SYS_SYSTIME_US": SYSVIEW_EVTID_SYSTIME_US, "SYS_SYSDESC": SYSVIEW_EVTID_SYSDESC, "SYS_USER_START": SYSVIEW_EVTID_USER_START, "SYS_USER_STOP": SYSVIEW_EVTID_USER_STOP, "SYS_IDLE": SYSVIEW_EVTID_IDLE, "SYS_ISR_TO_SCHEDULER": SYSVIEW_EVTID_ISR_TO_SCHEDULER, "SYS_TIMER_ENTER": SYSVIEW_EVTID_TIMER_ENTER, "SYS_TIMER_EXIT": SYSVIEW_EVTID_TIMER_EXIT, "SYS_STACK_INFO": SYSVIEW_EVTID_STACK_INFO, "SYS_MODULEDESC": SYSVIEW_EVTID_INIT, "SYS_INIT": SYSVIEW_EVTID_INIT, "SYS_NAME_RESOURCE": SYSVIEW_EVTID_NAME_RESOURCE, "SYS_PRINT_FORMATTED": SYSVIEW_EVTID_PRINT_FORMATTED, "SYS_NUMMODULES": SYSVIEW_EVTID_NUMMODULES } _os_events_map = {} def parse_trace(reader, parser, os_evt_map_file=''): """ Parses trace. Parameters ---------- reader : apptrace.Reader Trace reader object. parser : SysViewTraceDataParser Top level parser object. os_evt_map_file : string Path to file containg events format description. """ global _os_events_map # parse OS events formats file _os_events_map = _read_events_map(os_evt_map_file) _read_file_header(reader) _read_init_seq(reader) while True: event = parser.read_event(reader, _os_events_map) parser.on_new_event(event) def _read_events_map(os_evt_map_file): """ Reads SystemView events format description from file. Parameters ---------- os_evt_map_file : string Path to file containg events format description. Returns ------- dict a dict with event IDs as keys and values as tuples containg event name and a list of parameters. """ os_evt_map = {} with open(os_evt_map_file) as f: for line in f: comps = line.split() if len(comps) < 2: continue params = [] if len(comps) > 2: for p in comps[2:]: sp = p.split('=') if sp[1].startswith('%'): sp[1] = sp[1][1:] if sp[1] == 'u': params.append(SysViewEventParamSimple(sp[0], _decode_u32)) elif sp[1] == 's': params.append(SysViewEventParamSimple(sp[0], _decode_str)) elif sp[1] == 't' or sp[1] == 'T' or sp[1] == 'I' or sp[1] == 'p': # TODO: handle shrinked task/queue ID and addresses params.append(SysViewEventParamSimple(sp[0], _decode_u32)) os_evt_map[int(comps[0])] = (comps[1], params) return os_evt_map def _read_file_header(reader): """ Reads SystemView trace file header. Parameters ---------- reader : apptrace.Reader Trace reader object. Returns ------- list a list of header lines. """ empty_count = 0 lines = [] while empty_count < 2: lines.append(reader.readline(linesep='\n')) if lines[-1] == ';\n': empty_count += 1 return lines def _read_init_seq(reader): """ Reads SystemView trace initial synchronisation sequence of bytes. Parameters ---------- reader : apptrace.Reader Trace reader object. Raises ------- SysViewTraceParseError If sync sequence is broken. """ SYNC_SEQ_FMT = '<%dB' % SYSVIEW_SYNC_LEN sync_bytes = struct.unpack(SYNC_SEQ_FMT, reader.read(struct.calcsize(SYNC_SEQ_FMT))) for b in sync_bytes: if b != 0: raise SysViewTraceParseError("Invalid sync sequense!") def _decode_u32(reader): """ Reads and decodes unsigned 32-bit integer. Parameters ---------- reader : apptrace.Reader Trace reader object. Returns ------- tuple a tuple containg number of read bytes and decoded value. """ sz = 0 val = 0 while True: b, = struct.unpack('= SYSVIEW_EVENT_ID_PREDEF_LEN_MAX: self.plen = _decode_plen(reader) if events_fmt_map: self._read_payload(reader, events_fmt_map) else: reader.forward(self.plen) _,self.ts = _decode_u32(reader) def _read_payload(self, reader, events_fmt_map): """ Reads event's payload and populates its parameters dictionary. Parameters ---------- reader : apptrace.Reader Trace reader object. events_fmt_map : dict see return value of _read_events_map() Raises ------- SysViewTraceParseError if event has unknown or invalid format. """ if self.id not in events_fmt_map: raise SysViewTraceParseError("Unknown event ID %d!" % self.id) self.name = events_fmt_map[self.id][0] evt_params_templates = events_fmt_map[self.id][1] params_len = 0 for i in range(len(evt_params_templates)): event_param = copy.deepcopy(evt_params_templates[i]) try: cur_pos = reader.get_pos() sz,param_val = event_param.decode(reader, self.plen - params_len) except Exception as e: raise SysViewTraceParseError("Failed to decode event {}({:d}) {:d} param @ 0x{:x}! {}".format(self.name, self.id, self.plen, cur_pos, e)) event_param.idx = i event_param.value = param_val self.params[event_param.name] = event_param params_len += sz if self.id >= SYSVIEW_EVENT_ID_PREDEF_LEN_MAX and self.plen != params_len: raise SysViewTraceParseError("Invalid event {}({:d}) payload len {:d}! Must be {:d}.".format(self.name, self.id, self.plen, params_len)) def __str__(self): params = '' for param in sorted(self.params.values(), key=lambda x: x.idx): params += '{}, '.format(param) if len(params): params = params[:-2] # remove trailing ', ' return '{:.9f} - core[{:d}].{}({:d}), plen {:d}: [{}]'.format(self.ts, self.core_id, self.name, self.id, self.plen, params) class SysViewEventParam: """ Abstract base SystemView event's parameter class. This is a base class for all event's parameters. """ def __init__(self, name, decode_func): """ Constructor. Parameters ---------- name : string Event parameter name. decode_func : callable object Parameter decoding function. """ self.name = name self.decode_func = decode_func self.value = None # positional index, used for sorting parameters before printing to make them looking as they appear in the event self.idx = 0 def decode(self, reader, max_sz): """ Reads and decodes events parameter. Parameters ---------- reader : apptrace.Reader Trace reader object. max_sz : int Maximum number of bytes to read. Returns ------- tuple a tuple containg number of read bytes and decoded value. """ pass def __str__(self): return '{}: {}'.format(self.name, self.value) def to_jsonable(self): return {self.name: self.value} class SysViewEventParamSimple(SysViewEventParam): """ Simple SystemView event's parameter class. """ def decode(self, reader, max_sz): """ see SysViewEventParam.decode() """ return self.decode_func(reader) class SysViewEventParamArray(SysViewEventParamSimple): """ Array SystemView event's parameter class. """ def __init__(self, name, decode_func, size=-1): """ Constructor. Parameters ---------- name : string see SysViewEventParam.__init__() decode_func : callable object see SysViewEventParam.__init__() size : int Array's size. If -1 decode() will try to read all bytes from reader. """ SysViewEventParamSimple.__init__(self, name, decode_func) self.arr_size = size def decode(self, reader, max_sz): """ see SysViewEventParam.decode() """ tottal_sz = 0 vals = [] i = 0 while tottal_sz < max_sz: sz,val = self.decode_func(reader) vals.append(val) tottal_sz += sz i += 1 if self.arr_size != -1 and i == self.arr_size: break return tottal_sz,vals class SysViewPredefinedEvent(SysViewEvent): """ Pre-defined SystemView events class. """ _predef_events_fmt = { SYSVIEW_EVTID_NOP: ('svNop', []), SYSVIEW_EVTID_OVERFLOW: ('svOverflow', [SysViewEventParamSimple('drop_cnt', _decode_u32)]), SYSVIEW_EVTID_ISR_ENTER: ('svIsrEnter', [SysViewEventParamSimple('irq_num', _decode_u32)]), SYSVIEW_EVTID_ISR_EXIT: ('svIsrExit', []), SYSVIEW_EVTID_TASK_START_EXEC: ('svTaskStartExec', [SysViewEventParamSimple('tid', _decode_id)]), SYSVIEW_EVTID_TASK_STOP_EXEC: ('svTaskStopExec', []), SYSVIEW_EVTID_TASK_START_READY: ('svTaskStartReady', [SysViewEventParamSimple('tid', _decode_id)]), SYSVIEW_EVTID_TASK_STOP_READY: ('svTaskStopReady', [SysViewEventParamSimple('tid', _decode_id), SysViewEventParamSimple('cause', _decode_u32)]), SYSVIEW_EVTID_TASK_CREATE: ('svTaskCreate', [SysViewEventParamSimple('tid', _decode_id)]), SYSVIEW_EVTID_TASK_INFO: ('svTaskInfo', [SysViewEventParamSimple('tid', _decode_id), SysViewEventParamSimple('prio', _decode_u32), SysViewEventParamSimple('name', _decode_str)]), SYSVIEW_EVTID_TRACE_START: ('svTraceStart', []), SYSVIEW_EVTID_TRACE_STOP: ('svTraceStop', []), SYSVIEW_EVTID_SYSTIME_CYCLES: ('svSysTimeCycles', [SysViewEventParamSimple('cycles', _decode_u32)]), SYSVIEW_EVTID_SYSTIME_US: ('svSysTimeUs', [SysViewEventParamSimple('time', _decode_u64)]), SYSVIEW_EVTID_SYSDESC: ('svSysDesc', [SysViewEventParamSimple('desc', _decode_str)]), SYSVIEW_EVTID_USER_START: ('svUserStart', [SysViewEventParamSimple('user_id', _decode_u32)]), SYSVIEW_EVTID_USER_STOP: ('svUserStart', [SysViewEventParamSimple('user_id', _decode_u32)]), SYSVIEW_EVTID_IDLE: ('svIdle', []), SYSVIEW_EVTID_ISR_TO_SCHEDULER: ('svExitIsrToScheduler', []), SYSVIEW_EVTID_TIMER_ENTER: ('svTimerEnter', [SysViewEventParamSimple('tim_id', _decode_u32)]), SYSVIEW_EVTID_TIMER_EXIT: ('svTimerExit', []), SYSVIEW_EVTID_STACK_INFO: ('svStackInfo', [SysViewEventParamSimple('tid', _decode_id), SysViewEventParamSimple('base', _decode_u32), SysViewEventParamSimple('sz', _decode_u32), SysViewEventParamSimple('unused', _decode_u32)]), SYSVIEW_EVTID_MODULEDESC: ('svModuleDesc', [SysViewEventParamSimple('mod_id', _decode_u32), SysViewEventParamSimple('evt_off', _decode_u32), SysViewEventParamSimple('desc', _decode_str)]), SYSVIEW_EVTID_INIT: ('svInit', [SysViewEventParamSimple('sys_freq', _decode_u32), SysViewEventParamSimple('cpu_freq', _decode_u32), SysViewEventParamSimple('ram_base', _decode_u32), SysViewEventParamSimple('id_shift', _decode_u32)]), SYSVIEW_EVTID_NAME_RESOURCE: ('svNameResource', [SysViewEventParamSimple('res_id', _decode_u32), SysViewEventParamSimple('name', _decode_str)]), SYSVIEW_EVTID_PRINT_FORMATTED: ('svPrint', [SysViewEventParamSimple('msg', _decode_str), SysViewEventParamSimple('lvl', _decode_u32), SysViewEventParamSimple('unused', _decode_u32)]), SYSVIEW_EVTID_NUMMODULES: ('svNumModules', [SysViewEventParamSimple('mod_cnt', _decode_u32)]), } def __init__(self, evt_id, reader, core_id): """ see SysViewEvent.__init__() """ SysViewEvent.__init__(self, evt_id, reader, core_id, self._predef_events_fmt) # self.name = 'SysViewPredefinedEvent' class SysViewOSEvent(SysViewEvent): """ OS related SystemView events class. """ def __init__(self, evt_id, reader, core_id, events_fmt_map): """ see SysViewEvent.__init__() """ SysViewEvent.__init__(self, evt_id, reader, core_id, events_fmt_map) # self.name = 'SysViewOSEvent' class SysViewHeapEvent(SysViewEvent): """ Heap related SystemView events class. Attributes ---------- events_fmt : dict see return value of _read_events_map() """ events_fmt = { 0: ('esp_sysview_heap_trace_alloc', [SysViewEventParamSimple('addr', _decode_u32), SysViewEventParamSimple('size', _decode_u32), SysViewEventParamArray('callers', _decode_u32)]), 1: ('esp_sysview_heap_trace_free', [SysViewEventParamSimple('addr', _decode_u32), SysViewEventParamArray('callers', _decode_u32)]), } def __init__(self, evt_id, events_off, reader, core_id): """ Constructor. Reads and optionally decodes event. Parameters ---------- evt_id : int see SysViewEvent.__init__() events_off : int Offset for heap events IDs. Greater or equal to SYSVIEW_MODULE_EVENT_OFFSET. reader : apptrace.Reader see SysViewEvent.__init__() core_id : int see SysViewEvent.__init__() """ cur_events_map = {} for id in self.events_fmt: cur_events_map[events_off + id] = self.events_fmt[id] SysViewEvent.__init__(self, evt_id, reader, core_id, cur_events_map) # self.name = 'SysViewHeapEvent' class SysViewTraceDataParser(apptrace.TraceDataProcessor): """ Base SystemView trace data parser class. Attributes ---------- STREAMID_SYS : int system events stream ID. Reserved for internal uses. STREAMID_LOG : int log events stream ID. STREAMID_HEAP : int heap events stream ID. """ STREAMID_SYS = -1 STREAMID_LOG = 0 STREAMID_HEAP = 1 def __init__(self, print_events=False, core_id=0): """ Constructor. Parameters ---------- print_events : bool see apptrace.TraceDataProcessor.__init__() core_id : int id of the core this parser object relates to. """ apptrace.TraceDataProcessor.__init__(self, print_events=print_events, keep_all_events=True) self.sys_info = None self._last_ts = 0 self.irqs_info = {} self.tasks_info = {} self.core_id = core_id def _parse_irq_desc(self, desc): """ Parses IRQ description. Parameters ---------- desc : string IRQ description string. Returns ------- tuple a tuple with IRQ number and name or None on error. """ m = re.match('I#([0-9]+)=(.+)', desc) if m: return m.group(2),m.group(1) return None def _update_ts(self, ts): """ Calculates real event timestamp. Parameters ---------- ts : int Event timestamp offset. Returns ------- float real event timestamp. """ self._last_ts += ts return float(self._last_ts) / self.sys_info.params['sys_freq'].value def read_extension_event(self, evt_id, reader): """ Reads extension event. Default implementation which just reads out event. Parameters ---------- evt_id : int Event ID. reader : apptrace.Reader Trace reader object. Returns ------- SysViewEvent if this is top level parser returns object for generic event, otherwise returns None indicating to the calling top level parser that extension event are not supported. """ if self.root_proc == self: # by default just read out and skip unknown event return SysViewEvent(evt_id, reader, self.core_id) return None # let decide to root parser def read_event(self, reader, os_evt_map): """ Reads pre-defined or OS-related event. Parameters ---------- reader : apptrace.Reader Trace reader object. os_evt_map : dict see return value of _read_events_map() Returns ------- SysViewEvent pre-defined, OS-related or extension event object. """ evt_hdr, = struct.unpack('= (self.events_off + self.events_num)) else True class SysViewMultiTraceDataParser(SysViewTraceDataParser): """ SystemView trace data parser supporting multiple event streams. """ def __init__(self, print_events=False, core_id=0): """ see SysViewTraceDataParser.__init__() """ SysViewTraceDataParser.__init__(self, print_events, core_id) self.stream_parsers = {} def add_stream_parser(self, stream_id, parser): """ Assigns parser for events stream. Parameters ---------- stream_id : int stream ID. See SysViewTraceDataParser.STREAMID_xxx. Parsers for SysViewTraceDataParser.STREAMID_SYS are ignored. Top level parser is the default for SysViewTraceDataParser.STREAMID_SYS. parser : SysViewTraceDataParser parser object. """ if stream_id == SysViewTraceDataParser.STREAMID_SYS: return parser.root_proc = self self.stream_parsers[stream_id] = parser def read_extension_event(self, evt_id, reader): """ Reads extension event. Iterates over registered stream parsers trying to find one which supports that type of event. Parameters ---------- evt_id : int see SysViewTraceDataParser.read_extension_event() reader : apptrace.Reader see SysViewTraceDataParser.read_extension_event() Returns ------- SysViewEvent object for extension event, if extension event is not supported return SysViewEvent instance. """ for stream_id in self.stream_parsers: evt = self.stream_parsers[stream_id].read_extension_event(evt_id, reader) if evt: return evt return SysViewTraceDataParser.read_extension_event(self, evt_id, reader) def on_new_event(self, event): """ Iterates over registered stream parsers allowing them to do essential processing of event. Must be called for every read event. Parameters ---------- event : SysViewEvent Event object. """ SysViewTraceDataParser.on_new_event(self, event) for stream_id in self.stream_parsers: self.stream_parsers[stream_id].on_new_event(event) class SysViewEventContext(): """ SystemView event context. """ def __init__(self, handle, irq, name=''): """ Constructor. Parameters ---------- handle : int handle of the context: task ID or IRQ number. irq : bool flag indicating whether this is IRQ or task context. name : string name of the context: task or IRQ name. Empty if context is unknown. """ self.handle = handle self.irq = irq self.name = name class SysViewTraceDataProcessor(apptrace.TraceDataProcessor): """ Base SystemView trace data processor class. """ def __init__(self, traces, root_proc=None, print_events=False, keep_all_events=False): """ Constructor. Parameters ---------- traces : list list of parsers to process data from. print_events : bool see apptrace.TraceDataProcessor.__init__() keep_all_events : bool see apptrace.TraceDataProcessor.__init__() """ apptrace.TraceDataProcessor.__init__(self, print_events, keep_all_events) self.event_ids = {} self.name = "" self.root_proc = root_proc if root_proc else self self.traces = {} self.ctx_stack = {} self.prev_ctx = {} self.no_ctx_events = [] for t in traces: self.traces[t.core_id] = t # current context item is a tuple of task ID or IRQ num and 'in_irq' flag # empty list means IDLE context or self.start_ctx self.ctx_stack[t.core_id] = [] # context is undefined, we do not know have we started the tracing in task/IDLE or IRQ context # in general there are three scenarious when we can start tracing: when core is in task, IDLE task or IRQ context self.prev_ctx[t.core_id] = None def _get_curr_context(self, core_id): """ Returns current context. Parameters ---------- core_id : int core ID for requested context. Returns ------- SysViewEventContext context object None if there current is undefined """ if len(self.root_proc.ctx_stack[core_id]): return self.root_proc.ctx_stack[core_id][-1] if self._get_prev_context(core_id): return SysViewEventContext(None, False, 'IDLE%d' % core_id) return None def _get_prev_context(self, core_id): """ Returns current context. Parameters ---------- core_id : int core ID for requested context. Returns ------- SysViewEventContext context object """ return self.root_proc.prev_ctx[core_id] def get_trace_stream(self, core_id, stream_id): """ Retrieves parser for specified stream and core. Parameters ---------- core_id : int Parser's core ID. stream_id : int Parser's stream ID. Returns ------- SysViewTraceDataParser parser object for specified stream and core """ if self.root_proc == self: return self.traces[core_id] return self.root_proc.get_trace_stream(core_id, stream_id) def event_supported(self, e): """ Should be overriden in child class. """ return False def handle_event(self, e): """ Should be overriden in child class. """ pass def print_report(self): """ see apptrace.TraceDataProcessor.print_report() """ apptrace.TraceDataProcessor.print_report(self) def _process_event(self, event): """ Processes event. Keeps track of execution context on every core. Parameters ---------- event : SysViewEvent Event object. Raises ---------- SysViewTraceParseError if there is no parser for event's core or if SYSVIEW_EVTID_ISR_ENTER is received for unknown IRQ or if SYSVIEW_EVTID_TASK_START_EXEC or SYSVIEW_EVTID_TASK_STOP_READY is received for unknown task. """ if event.core_id not in self.traces: raise SysViewTraceParseError("Event for unknown core %d" % event.core_id) trace = self.traces[event.core_id] if event.id == SYSVIEW_EVTID_ISR_ENTER: if event.params['irq_num'].value not in trace.irqs_info: raise SysViewTraceParseError("Enter unknown ISR %d" % event.params['irq_num'].value) if len(self.ctx_stack[event.core_id]): self.prev_ctx[event.core_id] = self.ctx_stack[event.core_id][-1] else: # the 1st context switching event after trace start is SYSVIEW_EVTID_ISR_ENTER, so we have been in IDLE context self.prev_ctx[event.core_id] = SysViewEventContext(None, False, 'IDLE%d' % event.core_id) # put new ISR context on top of the stack (the last in the list) self.ctx_stack[event.core_id].append(SysViewEventContext(event.params['irq_num'].value, True, trace.irqs_info[event.params['irq_num'].value])) elif event.id == SYSVIEW_EVTID_ISR_EXIT or event.id == SYSVIEW_EVTID_ISR_TO_SCHEDULER: if len(self.ctx_stack[event.core_id]): # return to the previous context (the last in the list) self.prev_ctx[event.core_id] = self.ctx_stack[event.core_id].pop() else: # the 1st context switching event after trace start is SYSVIEW_EVTID_ISR_EXIT, so we have been in ISR context, # but we do not know which one because SYSVIEW_EVTID_ISR_EXIT do not include the IRQ number self.prev_ctx[event.core_id] = SysViewEventContext(None, True, 'IRQ_oncore%d' % event.core_id) elif event.id == SYSVIEW_EVTID_TASK_START_EXEC: if event.params['tid'].value not in trace.tasks_info: raise SysViewTraceParseError("Start exec unknown task 0x%x" % event.params['tid'].value) if len(self.ctx_stack[event.core_id]): # return to the previous context (the last in the list) self.prev_ctx[event.core_id] = self.ctx_stack[event.core_id][-1] else: # the 1st context switching event after trace start is SYSVIEW_EVTID_TASK_START_EXEC, so we have been in IDLE context self.prev_ctx[event.core_id] = SysViewEventContext(None, False, 'IDLE%d' % event.core_id) # only one task at a time in context stack (can be interrupted by a bunch of ISRs) self.ctx_stack[event.core_id] = [SysViewEventContext(event.params['tid'].value, False, trace.tasks_info[event.params['tid'].value])] elif event.id == SYSVIEW_EVTID_TASK_STOP_EXEC: # delete task from context stack for ctx in self.ctx_stack[event.core_id]: if not ctx.irq: if len(self.ctx_stack[event.core_id]) == 1: # if this is the only ctx in context stack self.prev_ctx[event.core_id] = ctx del ctx break elif event.id == SYSVIEW_EVTID_TASK_STOP_READY: if event.params['tid'].value not in trace.tasks_info: raise SysViewTraceParseError("Stop ready unknown task 0x%x" % event.params['tid'].value) if len(self.ctx_stack[event.core_id]): if (not self.ctx_stack[event.core_id][-1].irq and event.params['tid'].value == self.ctx_stack[event.core_id][-1].handle): # return to the previous context (the last in the list) self.prev_ctx[event.core_id] = self.ctx_stack[event.core_id].pop() else: # the 1st context switching event after trace start is SYSVIEW_EVTID_TASK_STOP_READY, so we have been in task context self.prev_ctx[event.core_id] = SysViewEventContext(event.params['tid'].value, False, trace.tasks_info[event.params['tid'].value]) def on_new_event(self, event): """ Processes heap events. """ if self.root_proc == self: SysViewTraceDataProcessor._process_event(self, event) curr_ctx = self._get_curr_context(event.core_id) if not curr_ctx: # postpone events handling till their context is known self.no_ctx_events.append(event) return event.in_irq = curr_ctx.irq event.ctx_name = curr_ctx.name # here we know the previous context: we switched from it or implied upon the 1st context switching event prev_ctx = self._get_prev_context(event.core_id) if len(self.no_ctx_events): for cached_evt in self.no_ctx_events: cached_evt.ctx_name = prev_ctx.name cached_evt.in_irq = prev_ctx.irq # count and save the event apptrace.TraceDataProcessor.on_new_event(self, cached_evt) if self.event_supported(event): self.handle_event(event) del self.no_ctx_events[:] # count and save the event apptrace.TraceDataProcessor.on_new_event(self, event) if self.event_supported(event): self.handle_event(event) def merge_and_process(self): """ Merges events from all registered parsers, sorts them by timestamp and processes them. """ all_events = [] for t in self.traces.values(): all_events.extend(t.events) all_events.sort(key=lambda x: x.ts) for event in all_events: self.on_new_event(event) class SysViewMultiStreamTraceDataProcessor(SysViewTraceDataProcessor): """ SystemView trace data processor supporting multiple event streams. """ def __init__(self, traces, print_events=False, keep_all_events=False): """ see SysViewTraceDataProcessor.__init__() """ SysViewTraceDataProcessor.__init__(self, traces, print_events=print_events, keep_all_events=keep_all_events) self.stream_procs = {} def add_stream_processor(self, stream_id, proc): """ Assigns processor for events stream. Parameters ---------- stream_id : int stream ID. See SysViewTraceDataParser.STREAMID_xxx. Parsers for SysViewTraceDataParser.STREAMID_SYS are ignored. Top level parser is the default for SysViewTraceDataParser.STREAMID_SYS. proc : SysViewTraceDataProcessor processor object. """ if stream_id == SysViewTraceDataParser.STREAMID_SYS: return proc.root_proc = self # common processing data is kept by this processor self.stream_procs[stream_id] = proc def get_trace_stream(self, core_id, stream_id): """ Retrieves parser for specified stream and core. Parameters ---------- core_id : int Parser's core ID. stream_id : int Parser's stream ID. Returns ------- SysViewTraceDataParser parser object for specified stream and core """ trace = self.traces[core_id] if stream_id == SysViewTraceDataParser.STREAMID_SYS: return trace if isinstance(trace, SysViewMultiTraceDataParser): return trace.stream_parsers[stream_id] return trace def print_report(self): """ Iterates over registered stream processors and prints their reports. """ SysViewTraceDataProcessor.print_report(self) # need to sort stream procs by keys to print reports in the same order regardless of Python version stream_ids = list(self.stream_procs.keys()) stream_ids.sort() for stream_id in stream_ids: self.stream_procs[stream_id].print_report() def cleanup(self): """ Iterates over registered stream processors and cleans them up. """ for stream_id in self.stream_procs: self.stream_procs[stream_id].cleanup() SysViewTraceDataProcessor.cleanup(self) def on_new_event(self, event): """ Iterates over registered stream processors allowing them to do the processing of event. Parameters ---------- event : SysViewEvent Event object. """ SysViewTraceDataProcessor.on_new_event(self, event) for stream_id in self.stream_procs: self.stream_procs[stream_id].on_new_event(event) class SysViewTraceDataJsonEncoder(json.JSONEncoder): JSON_TRACE_VER = "1.0" def default(self, obj): global _sysview_events_map global _os_events_map if isinstance(obj, SysViewMultiStreamTraceDataProcessor): json_event_ids = {"system": _sysview_events_map, "os": {}} for eid in _os_events_map: ename = _os_events_map[eid][0] json_event_ids['os'][ename] = eid for stream in obj.stream_procs.values(): json_event_ids[stream.name] = stream.event_ids json_events = [] for e in obj.events: for stream in obj.stream_procs.values(): if stream.event_supported(e): json_events.append(e) break # include also OS and pre-defined events if isinstance(e, SysViewPredefinedEvent) or isinstance(e, SysViewOSEvent): json_events.append(e) return {"version": self.JSON_TRACE_VER, "streams": json_event_ids, "events": json_events} if isinstance(obj, SysViewHeapEvent): blk_size = 0 if "size" in obj.params: blk_size = obj.params["size"].value blk_addr = "0x{:x}".format(obj.params["addr"].value) callers = [] for addr in obj.params['callers'].value: callers.append('0x{:x}'.format(addr)) return {"ctx_name": obj.ctx_name, "in_irq": obj.in_irq, "id": obj.id, "core_id": obj.core_id, "ts": obj.ts, "addr": blk_addr, "size": blk_size, "callers": callers} if isinstance(obj, SysViewPredefinedEvent) and obj.id == SYSVIEW_EVTID_PRINT_FORMATTED: return {"ctx_name": obj.ctx_name, "in_irq": obj.in_irq, "id": obj.id, "core_id": obj.core_id, "ts": obj.ts, "msg": obj.params["msg"].value, "lvl": obj.params["lvl"].value} if isinstance(obj, SysViewEvent): jobj = obj.to_jsonable() # remove unused fields if 'name' in jobj: del jobj['name'] if 'plen' in jobj: del jobj['plen'] return jobj # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) class SysViewHeapTraceDataParser(SysViewTraceDataExtEventParser): """ SystemView trace data parser supporting heap events. """ def __init__(self, print_events=False, core_id=0): """ SystemView trace data parser supporting multiple event streams. see SysViewTraceDataExtEventParser.__init__() """ SysViewTraceDataExtEventParser.__init__(self, events_num=len(SysViewHeapEvent.events_fmt.keys()), core_id=core_id, print_events=print_events) def read_extension_event(self, evt_id, reader): """ Reads heap event. see SysViewTraceDataParser.read_extension_event() """ if (self.events_off >= SYSVIEW_MODULE_EVENT_OFFSET and evt_id >= self.events_off and evt_id < self.events_off + self.events_num): return SysViewHeapEvent(evt_id, self.events_off, reader, self.core_id) return SysViewTraceDataParser.read_extension_event(self, evt_id, reader) def on_new_event(self, event): """ Keeps track of heap module descriptions. """ if self.root_proc == self: SysViewTraceDataParser.on_new_event(self, event) if event.id == SYSVIEW_EVTID_MODULEDESC and event.params['desc'].value == 'ESP32 SystemView Heap Tracing Module': self.events_off = event.params['evt_off'].value class SysViewHeapTraceDataProcessor(SysViewTraceDataProcessor, apptrace.BaseHeapTraceDataProcessorImpl): """ SystemView trace data processor supporting heap events. """ def __init__(self, toolchain_pref, elf_path, root_proc=None, traces=[], print_events=False, print_heap_events=False): """ Constructor. see SysViewTraceDataProcessor.__init__() see apptrace.BaseHeapTraceDataProcessorImpl.__init__() """ SysViewTraceDataProcessor.__init__(self, traces, root_proc=root_proc, print_events=print_events) apptrace.BaseHeapTraceDataProcessorImpl.__init__(self, print_heap_events) self.toolchain = toolchain_pref self.elf_path = elf_path # self.no_ctx_events = [] self.name = "heap" stream = self.root_proc.get_trace_stream(0, SysViewTraceDataParser.STREAMID_HEAP) self.event_ids = {"alloc": stream.events_off, "free": stream.events_off + 1} def event_supported(self, event): heap_stream = self.root_proc.get_trace_stream(event.core_id, SysViewTraceDataParser.STREAMID_HEAP) return heap_stream.event_supported(event) def handle_event(self, event): heap_stream = self.root_proc.get_trace_stream(event.core_id, SysViewTraceDataParser.STREAMID_HEAP) if (event.id - heap_stream.events_off) == 0: heap_event = apptrace.HeapTraceEvent(event, True, toolchain=self.toolchain, elf_path=self.elf_path) else: heap_event = apptrace.HeapTraceEvent(event, False, toolchain=self.toolchain, elf_path=self.elf_path) apptrace.BaseHeapTraceDataProcessorImpl.on_new_event(self, heap_event) def print_report(self): """ see apptrace.TraceDataProcessor.print_report() """ if self.root_proc == self: SysViewTraceDataProcessor.print_report(self) apptrace.BaseHeapTraceDataProcessorImpl.print_report(self) class SysViewLogTraceEvent(apptrace.LogTraceEvent): """ SystemView log event. """ def __init__(self, ts, msg): """ Constructor. Parameters ---------- msg : string Log message string. """ self.msg = msg self.ts = ts def get_message(self, unused): """ Retrieves log message. Returns ------- string formatted log message """ return '[{:.9f}] LOG: {}'.format(self.ts, self.msg) class SysViewLogTraceDataParser(SysViewTraceDataParser): """ SystemView trace data parser supporting log events. """ def event_supported(self, event): return event.id == SYSVIEW_EVTID_PRINT_FORMATTED def on_new_event(self, event): """ see SysViewTraceDataParser.on_new_event() """ if self.root_proc == self: SysViewTraceDataParser.on_new_event(self, event) class SysViewLogTraceDataProcessor(SysViewTraceDataProcessor, apptrace.BaseLogTraceDataProcessorImpl): """ SystemView trace data processor supporting heap events. """ def __init__(self, traces=[], root_proc=None, print_events=False, print_log_events=False): """ Constructor. see SysViewTraceDataProcessor.__init__() see apptrace.BaseLogTraceDataProcessorImpl.__init__() """ SysViewTraceDataProcessor.__init__(self, traces, root_proc=root_proc, print_events=print_events) apptrace.BaseLogTraceDataProcessorImpl.__init__(self, print_log_events) self.name = "log" self.event_ids = {"print": SYSVIEW_EVTID_PRINT_FORMATTED} def event_supported(self, event): return event.id == SYSVIEW_EVTID_PRINT_FORMATTED def on_new_event(self, event): """ Processes log events. """ if self.root_proc == self: SysViewTraceDataProcessor.on_new_event(self, event) if event.id == SYSVIEW_EVTID_PRINT_FORMATTED: log_evt = SysViewLogTraceEvent(event.ts, event.params['msg'].value) apptrace.BaseLogTraceDataProcessorImpl.on_new_event(self, log_evt) def print_report(self): """ see apptrace.TraceDataProcessor.print_report() """ if self.root_proc == self: SysViewTraceDataProcessor.print_report(self) apptrace.BaseLogTraceDataProcessorImpl.print_report(self)