espcoredump: simplify handling of temporary files

This commit is contained in:
Ivan Grokhotkov 2020-05-20 20:04:14 +02:00
parent a63f44cfda
commit cbd1a95fd0
3 changed files with 221 additions and 267 deletions

View file

@ -14,22 +14,20 @@ try:
from past.utils import old_div from past.utils import old_div
from builtins import object from builtins import object
except ImportError: except ImportError:
print('Import has failed probably because of the missing "future" package. Please install all the packages for ' sys.stderr.write('Import has failed probably because of the missing "future" package. Please install all the packages for '
'interpreter {} from the $IDF_PATH/requirements.txt file.'.format(sys.executable)) 'interpreter {} from the $IDF_PATH/requirements.txt file.\n'.format(sys.executable))
sys.exit(1) sys.exit(1)
import os import os
import argparse import argparse
import subprocess import subprocess
import tempfile import tempfile
import struct import struct
import errno
import base64 import base64
import binascii import binascii
import logging import logging
import re import re
import time import time
from pygdbmi.gdbcontroller import GdbController, DEFAULT_GDB_TIMEOUT_SEC, \ from pygdbmi.gdbcontroller import GdbController, DEFAULT_GDB_TIMEOUT_SEC
DEFAULT_TIME_TO_CHECK_FOR_ADDITIONAL_OUTPUT_SEC
idf_path = os.getenv('IDF_PATH') idf_path = os.getenv('IDF_PATH')
if idf_path: if idf_path:
@ -39,9 +37,14 @@ try:
import esptool import esptool
except ImportError: except ImportError:
print("esptool is not found! Set proper $IDF_PATH in environment.") sys.stderr.write("esptool is not found! Set proper $IDF_PATH in environment.\n")
sys.exit(2) sys.exit(2)
try:
import typing
except ImportError:
pass # only needed for type annotations, ignore if not found
__version__ = "0.4-dev" __version__ = "0.4-dev"
if os.name == 'nt': if os.name == 'nt':
@ -631,7 +634,10 @@ class ESPCoreDumpLoader(ESPCoreDumpVersion):
"""Base constructor for core dump loader """Base constructor for core dump loader
""" """
super(ESPCoreDumpLoader, self).__init__() super(ESPCoreDumpLoader, self).__init__()
self.fcore = None # Source core file, before converting it into ELF
self.core_src_file = None # type: typing.Optional[typing.BinaryIO]
# Temporary ELF core file, passed to the GDB
self.core_elf_file = None # type: typing.Optional[typing.BinaryIO]
self.hdr = {} self.hdr = {}
def _get_registers_from_stack(self, data, grows_down): def _get_registers_from_stack(self, data, grows_down):
@ -745,24 +751,7 @@ class ESPCoreDumpLoader(ESPCoreDumpVersion):
""" """
return ((addr < 0x3f3fffff and addr >= 0x20000000) or addr >= 0x80000000) return ((addr < 0x3f3fffff and addr >= 0x20000000) or addr >= 0x80000000)
def remove_tmp_file(self, fname): def _extract_elf_corefile(self, off=0, exe_name=None):
"""Silently removes temporary file
"""
try:
os.remove(fname)
except OSError as e:
if e.errno != errno.ENOENT:
logging.warning("Failed to remove temp file '%s' (%d)!" % (fname, e.errno))
def cleanup(self):
"""Cleans up loader resources
"""
if self.fcore:
self.fcore.close()
if self.fcore_name:
self.remove_tmp_file(self.fcore_name)
def _extract_elf_corefile(self, core_fname=None, off=0, exe_name=None):
""" Reads the ELF formatted core dump image and parse it """ Reads the ELF formatted core dump image and parse it
""" """
core_off = off core_off = off
@ -775,183 +764,180 @@ class ESPCoreDumpLoader(ESPCoreDumpVersion):
raise ESPCoreDumpLoaderError("Core dump version '%d' is not supported!" % self.dump_ver) raise ESPCoreDumpLoaderError("Core dump version '%d' is not supported!" % self.dump_ver)
core_elf = ESPCoreDumpElfFile() core_elf = ESPCoreDumpElfFile()
data = self.read_data(core_off, self.hdr['tot_len'] - checksum_len - self.ESP_COREDUMP_HDR_SZ) data = self.read_data(core_off, self.hdr['tot_len'] - checksum_len - self.ESP_COREDUMP_HDR_SZ)
with open(core_fname, 'w+b') as fce:
try:
fce.write(data)
fce.flush()
fce.seek(0)
core_elf._read_elf_file(fce)
if exe_name:
exe_elf = ESPCoreDumpElfFile(exe_name)
# Read note segments from core file which are belong to tasks (TCB or stack)
for ns in core_elf.aux_segments:
if ns.type != ESPCoreDumpElfFile.PT_NOTE:
continue
note_read = 0
while note_read < len(ns.data):
note = Elf32NoteDesc("", 0, None)
note_read += note.read(ns.data[note_read:])
# Check for version info note
if 'ESP_CORE_DUMP_INFO' == note.name and note.type == self.ESP_CORE_DUMP_INFO_TYPE and exe_name:
app_sha256 = binascii.hexlify(exe_elf.sha256())
n_ver_len = struct.calcsize("<L")
n_sha256_len = self.ESP_COREDUMP_SHA256_SZ * 2 # SHA256 as hex string
n_ver,coredump_sha256 = struct.unpack("<L%ds" % (n_sha256_len), note.desc[:n_ver_len + n_sha256_len])
if coredump_sha256 != app_sha256 or ESPCoreDumpVersion(n_ver).dump_ver != self.dump_ver:
raise ESPCoreDumpError("Invalid application image for coredump: app_SHA256(%s) != coredump_SHA256(%s)." %
(app_sha256, coredump_sha256))
except ESPCoreDumpError as e:
logging.warning("Failed to extract ELF core dump image into file %s. (Reason: %s)" % (core_fname, e))
return core_fname
def _extract_bin_corefile(self, core_fname=None, rom_elf=None, off=0): try:
self.core_elf_file.write(data)
self.core_elf_file.flush()
self.core_elf_file.seek(0)
core_elf._read_elf_file(self.core_elf_file)
if exe_name:
exe_elf = ESPCoreDumpElfFile(exe_name)
# Read note segments from core file which are belong to tasks (TCB or stack)
for ns in core_elf.aux_segments:
if ns.type != ESPCoreDumpElfFile.PT_NOTE:
continue
note_read = 0
while note_read < len(ns.data):
note = Elf32NoteDesc("", 0, None)
note_read += note.read(ns.data[note_read:])
# Check for version info note
if 'ESP_CORE_DUMP_INFO' == note.name and note.type == self.ESP_CORE_DUMP_INFO_TYPE and exe_name:
app_sha256 = binascii.hexlify(exe_elf.sha256())
n_ver_len = struct.calcsize("<L")
n_sha256_len = self.ESP_COREDUMP_SHA256_SZ * 2 # SHA256 as hex string
n_ver,coredump_sha256 = struct.unpack("<L%ds" % (n_sha256_len), note.desc[:n_ver_len + n_sha256_len])
if coredump_sha256 != app_sha256 or ESPCoreDumpVersion(n_ver).dump_ver != self.dump_ver:
raise ESPCoreDumpError("Invalid application image for coredump: app_SHA256(%s) != coredump_SHA256(%s)." %
(app_sha256, coredump_sha256))
except ESPCoreDumpError as e:
logging.warning("Failed to extract ELF core dump image into file %s. (Reason: %s)" % (self.core_elf_file.name, e))
def _extract_bin_corefile(self, off=0): # type: (int) -> None
"""Creates core dump ELF file """Creates core dump ELF file
""" """
core_off = off core_off = off
with open(core_fname, 'w+b') as fce: tcbsz_aligned = self.hdr['tcbsz']
tcbsz_aligned = self.hdr['tcbsz'] if tcbsz_aligned % 4:
if tcbsz_aligned % 4: tcbsz_aligned = 4 * (old_div(tcbsz_aligned,4) + 1)
tcbsz_aligned = 4 * (old_div(tcbsz_aligned,4) + 1) core_elf = ESPCoreDumpElfFile()
core_elf = ESPCoreDumpElfFile() notes = b''
notes = b'' core_dump_info_notes = b''
core_dump_info_notes = b'' task_info_notes = b''
task_info_notes = b'' task_status = EspCoreDumpTaskStatus()
task_status = EspCoreDumpTaskStatus() for i in range(self.hdr['task_num']):
for i in range(self.hdr['task_num']): task_status.task_index = i
task_status.task_index = i task_status.task_flags = EspCoreDumpTaskStatus.TASK_STATUS_CORRECT
task_status.task_flags = EspCoreDumpTaskStatus.TASK_STATUS_CORRECT data = self.read_data(core_off, self.ESP_COREDUMP_TSK_HDR_SZ)
data = self.read_data(core_off, self.ESP_COREDUMP_TSK_HDR_SZ) tcb_addr,stack_top,stack_end = struct.unpack_from(self.ESP_COREDUMP_TSK_HDR_FMT, data)
tcb_addr,stack_top,stack_end = struct.unpack_from(self.ESP_COREDUMP_TSK_HDR_FMT, data) if stack_end > stack_top:
if stack_end > stack_top: stack_len = stack_end - stack_top
stack_len = stack_end - stack_top stack_base = stack_top
stack_base = stack_top else:
else: stack_len = stack_top - stack_end
stack_len = stack_top - stack_end stack_base = stack_end
stack_base = stack_end stack_len_aligned = stack_len
stack_len_aligned = stack_len if stack_len_aligned % 4:
if stack_len_aligned % 4: stack_len_aligned = 4 * (old_div(stack_len_aligned,4) + 1)
stack_len_aligned = 4 * (old_div(stack_len_aligned,4) + 1)
core_off += self.ESP_COREDUMP_TSK_HDR_SZ core_off += self.ESP_COREDUMP_TSK_HDR_SZ
logging.debug("Read TCB %d bytes @ 0x%x" % (tcbsz_aligned, tcb_addr)) logging.debug("Read TCB %d bytes @ 0x%x" % (tcbsz_aligned, tcb_addr))
data = self.read_data(core_off, tcbsz_aligned) data = self.read_data(core_off, tcbsz_aligned)
task_status.task_tcb_addr = tcb_addr task_status.task_tcb_addr = tcb_addr
try: try:
if self.tcb_is_sane(tcb_addr, tcbsz_aligned): if self.tcb_is_sane(tcb_addr, tcbsz_aligned):
if self.hdr['tcbsz'] != tcbsz_aligned: if self.hdr['tcbsz'] != tcbsz_aligned:
core_elf.add_program_segment(tcb_addr, data[:self.hdr['tcbsz'] - tcbsz_aligned], core_elf.add_program_segment(tcb_addr, data[:self.hdr['tcbsz'] - tcbsz_aligned],
ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W) ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W)
else: else:
core_elf.add_program_segment(tcb_addr, data, ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W) core_elf.add_program_segment(tcb_addr, data, ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W)
# task_status.task_name = bytearray("%s\0" % task_name_str, encoding='ascii') # task_status.task_name = bytearray("%s\0" % task_name_str, encoding='ascii')
elif tcb_addr and self.addr_is_fake(tcb_addr): elif tcb_addr and self.addr_is_fake(tcb_addr):
task_status.task_flags |= EspCoreDumpTaskStatus.TASK_STATUS_TCB_CORRUPTED task_status.task_flags |= EspCoreDumpTaskStatus.TASK_STATUS_TCB_CORRUPTED
except ESPCoreDumpError as e: except ESPCoreDumpError as e:
logging.warning("Skip TCB %d bytes @ 0x%x. (Reason: %s)" % (tcbsz_aligned, tcb_addr, e)) logging.warning("Skip TCB %d bytes @ 0x%x. (Reason: %s)" % (tcbsz_aligned, tcb_addr, e))
core_off += tcbsz_aligned core_off += tcbsz_aligned
logging.debug("Read stack %d bytes @ 0x%x" % (stack_len_aligned, stack_base)) logging.debug("Read stack %d bytes @ 0x%x" % (stack_len_aligned, stack_base))
data = self.read_data(core_off, stack_len_aligned) data = self.read_data(core_off, stack_len_aligned)
if stack_len != stack_len_aligned: if stack_len != stack_len_aligned:
data = data[:stack_len - stack_len_aligned] data = data[:stack_len - stack_len_aligned]
task_status.task_stack_start = stack_base task_status.task_stack_start = stack_base
task_status.task_stack_len = stack_len_aligned task_status.task_stack_len = stack_len_aligned
try:
if self.stack_is_sane(stack_base):
core_elf.add_program_segment(stack_base, data, ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W)
elif stack_base and self.addr_is_fake(stack_base):
task_status.task_flags |= EspCoreDumpTaskStatus.TASK_STATUS_STACK_CORRUPTED
core_elf.add_program_segment(stack_base, data, ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W)
except ESPCoreDumpError as e:
logging.warning("Skip task's (%x) stack %d bytes @ 0x%x. (Reason: %s)" % (tcb_addr, stack_len_aligned, stack_base, e))
core_off += stack_len_aligned
try:
logging.debug("Stack start_end: 0x%x @ 0x%x" % (stack_top, stack_end))
task_regs,extra_regs = self._get_registers_from_stack(data, stack_end > stack_top)
except Exception as e:
logging.error(e)
return None
task_info_notes += Elf32NoteDesc("TASK_INFO", self.ESP_CORE_DUMP_TASK_INFO_TYPE, task_status.dump()).dump()
prstatus = XtensaPrStatus()
prstatus.pr_cursig = 0 # TODO: set sig only for current/failed task
prstatus.pr_pid = tcb_addr
note = Elf32NoteDesc("CORE", 1, prstatus.dump() + struct.pack("<%dL" % len(task_regs), *task_regs)).dump()
notes += note
if ESPCoreDumpElfFile.REG_EXCCAUSE_IDX in extra_regs and len(core_dump_info_notes) == 0:
# actually there will be only one such note - for crashed task
core_dump_info_notes += Elf32NoteDesc("ESP_CORE_DUMP_INFO", self.ESP_CORE_DUMP_INFO_TYPE, struct.pack("<L", self.hdr['ver'])).dump()
exc_regs = []
for reg_id in extra_regs:
exc_regs.extend([reg_id, extra_regs[reg_id]])
core_dump_info_notes += Elf32NoteDesc("EXTRA_INFO", self.ESP_CORE_DUMP_EXTRA_INFO_TYPE,
struct.pack("<%dL" % (1 + len(exc_regs)), tcb_addr, *exc_regs)).dump()
self.set_version(self.hdr['ver'])
if self.dump_ver == self.ESP_COREDUMP_VERSION_BIN_V2:
for i in range(self.hdr['segs_num']):
data = self.read_data(core_off, self.ESP_COREDUMP_MEM_SEG_HDR_SZ)
core_off += self.ESP_COREDUMP_MEM_SEG_HDR_SZ
mem_start,mem_sz = struct.unpack_from(self.ESP_COREDUMP_MEM_SEG_HDR_FMT, data)
logging.debug("Read memory segment %d bytes @ 0x%x" % (mem_sz, mem_start))
data = self.read_data(core_off, mem_sz)
core_elf.add_program_segment(mem_start, data, ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W)
core_off += mem_sz
# add notes
try: try:
core_elf.add_aux_segment(notes, ESPCoreDumpElfFile.PT_NOTE, 0) if self.stack_is_sane(stack_base):
core_elf.add_program_segment(stack_base, data, ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W)
elif stack_base and self.addr_is_fake(stack_base):
task_status.task_flags |= EspCoreDumpTaskStatus.TASK_STATUS_STACK_CORRUPTED
core_elf.add_program_segment(stack_base, data, ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W)
except ESPCoreDumpError as e: except ESPCoreDumpError as e:
logging.warning("Skip NOTES segment %d bytes @ 0x%x. (Reason: %s)" % (len(notes), 0, e)) logging.warning("Skip task's (%x) stack %d bytes @ 0x%x. (Reason: %s)" % (tcb_addr, stack_len_aligned, stack_base, e))
# add core dump info notes core_off += stack_len_aligned
try: try:
core_elf.add_aux_segment(core_dump_info_notes, ESPCoreDumpElfFile.PT_NOTE, 0) logging.debug("Stack start_end: 0x%x @ 0x%x" % (stack_top, stack_end))
except ESPCoreDumpError as e: task_regs,extra_regs = self._get_registers_from_stack(data, stack_end > stack_top)
logging.warning("Skip core dump info NOTES segment %d bytes @ 0x%x. (Reason: %s)" % (len(core_dump_info_notes), 0, e)) except Exception as e:
try: raise ESPCoreDumpError(str(e))
core_elf.add_aux_segment(task_info_notes, ESPCoreDumpElfFile.PT_NOTE, 0) task_info_notes += Elf32NoteDesc("TASK_INFO", self.ESP_CORE_DUMP_TASK_INFO_TYPE, task_status.dump()).dump()
except ESPCoreDumpError as e: prstatus = XtensaPrStatus()
logging.warning("Skip failed tasks info NOTES segment %d bytes @ 0x%x. (Reason: %s)" % (len(task_info_notes), 0, e)) prstatus.pr_cursig = 0 # TODO: set sig only for current/failed task
# add ROM text sections prstatus.pr_pid = tcb_addr
if rom_elf: note = Elf32NoteDesc("CORE", 1, prstatus.dump() + struct.pack("<%dL" % len(task_regs), *task_regs)).dump()
for ps in rom_elf.program_segments: notes += note
if (ps.flags & ESPCoreDumpSegment.PF_X) == 0: if ESPCoreDumpElfFile.REG_EXCCAUSE_IDX in extra_regs and len(core_dump_info_notes) == 0:
continue # actually there will be only one such note - for crashed task
try: core_dump_info_notes += Elf32NoteDesc("ESP_CORE_DUMP_INFO", self.ESP_CORE_DUMP_INFO_TYPE, struct.pack("<L", self.hdr['ver'])).dump()
core_elf.add_program_segment(ps.addr, ps.data, ESPCoreDumpElfFile.PT_LOAD, ps.flags) exc_regs = []
except ESPCoreDumpError as e: for reg_id in extra_regs:
logging.warning("Skip ROM segment %d bytes @ 0x%x. (Reason: %s)" % (len(ps.data), ps.addr, e)) exc_regs.extend([reg_id, extra_regs[reg_id]])
# dump core ELF core_dump_info_notes += Elf32NoteDesc("EXTRA_INFO", self.ESP_CORE_DUMP_EXTRA_INFO_TYPE,
core_elf.e_type = ESPCoreDumpElfFile.ET_CORE struct.pack("<%dL" % (1 + len(exc_regs)), tcb_addr, *exc_regs)).dump()
core_elf.e_machine = ESPCoreDumpElfFile.EM_XTENSA self.set_version(self.hdr['ver'])
core_elf.dump(fce) if self.dump_ver == self.ESP_COREDUMP_VERSION_BIN_V2:
return core_fname for i in range(self.hdr['segs_num']):
data = self.read_data(core_off, self.ESP_COREDUMP_MEM_SEG_HDR_SZ)
core_off += self.ESP_COREDUMP_MEM_SEG_HDR_SZ
mem_start,mem_sz = struct.unpack_from(self.ESP_COREDUMP_MEM_SEG_HDR_FMT, data)
logging.debug("Read memory segment %d bytes @ 0x%x" % (mem_sz, mem_start))
data = self.read_data(core_off, mem_sz)
core_elf.add_program_segment(mem_start, data, ESPCoreDumpElfFile.PT_LOAD, ESPCoreDumpSegment.PF_R | ESPCoreDumpSegment.PF_W)
core_off += mem_sz
# add notes
try:
core_elf.add_aux_segment(notes, ESPCoreDumpElfFile.PT_NOTE, 0)
except ESPCoreDumpError as e:
logging.warning("Skip NOTES segment %d bytes @ 0x%x. (Reason: %s)" % (len(notes), 0, e))
# add core dump info notes
try:
core_elf.add_aux_segment(core_dump_info_notes, ESPCoreDumpElfFile.PT_NOTE, 0)
except ESPCoreDumpError as e:
logging.warning("Skip core dump info NOTES segment %d bytes @ 0x%x. (Reason: %s)" % (len(core_dump_info_notes), 0, e))
try:
core_elf.add_aux_segment(task_info_notes, ESPCoreDumpElfFile.PT_NOTE, 0)
except ESPCoreDumpError as e:
logging.warning("Skip failed tasks info NOTES segment %d bytes @ 0x%x. (Reason: %s)" % (len(task_info_notes), 0, e))
# dump core ELF
core_elf.e_type = ESPCoreDumpElfFile.ET_CORE
core_elf.e_machine = ESPCoreDumpElfFile.EM_XTENSA
core_elf.dump(self.core_elf_file)
def create_corefile(self, core_fname=None, exe_name=None, rom_elf=None, off=0): def create_corefile(self, exe_name=None): # type: (str) -> None
"""Creates core dump ELF file """Creates core dump ELF file
""" """
off = 0
data = self.read_data(off, self.ESP_COREDUMP_HDR_SZ) data = self.read_data(off, self.ESP_COREDUMP_HDR_SZ)
vals = struct.unpack_from(self.ESP_COREDUMP_HDR_FMT, data) vals = struct.unpack_from(self.ESP_COREDUMP_HDR_FMT, data)
self.hdr = dict(zip(('tot_len', 'ver', 'task_num', 'tcbsz', 'segs_num'), vals)) self.hdr = dict(zip(('tot_len', 'ver', 'task_num', 'tcbsz', 'segs_num'), vals))
if not core_fname: self.core_elf_file = tempfile.NamedTemporaryFile()
fce = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
core_fname = fce.name
self.set_version(self.hdr['ver']) self.set_version(self.hdr['ver'])
if self.chip_ver == ESPCoreDumpVersion.ESP_CORE_DUMP_CHIP_ESP32S2 or self.chip_ver == ESPCoreDumpVersion.ESP_CORE_DUMP_CHIP_ESP32: if self.chip_ver == ESPCoreDumpVersion.ESP_CORE_DUMP_CHIP_ESP32S2 or self.chip_ver == ESPCoreDumpVersion.ESP_CORE_DUMP_CHIP_ESP32:
if self.dump_ver == self.ESP_COREDUMP_VERSION_ELF_CRC32 or self.dump_ver == self.ESP_COREDUMP_VERSION_ELF_SHA256: if self.dump_ver == self.ESP_COREDUMP_VERSION_ELF_CRC32 or self.dump_ver == self.ESP_COREDUMP_VERSION_ELF_SHA256:
return self._extract_elf_corefile(core_fname, off + self.ESP_COREDUMP_HDR_SZ, exe_name) self._extract_elf_corefile(off + self.ESP_COREDUMP_HDR_SZ, exe_name)
elif self.dump_ver == self.ESP_COREDUMP_VERSION_BIN_V2: elif self.dump_ver == self.ESP_COREDUMP_VERSION_BIN_V2:
return self._extract_bin_corefile(core_fname, rom_elf, off + self.ESP_COREDUMP_HDR_SZ) self._extract_bin_corefile(off + self.ESP_COREDUMP_HDR_SZ)
elif self.dump_ver == self.ESP_COREDUMP_VERSION_BIN_V1: elif self.dump_ver == self.ESP_COREDUMP_VERSION_BIN_V1:
return self._extract_bin_corefile(core_fname, rom_elf, off + self.ESP_COREDUMP_BIN_V1_HDR_SZ) self._extract_bin_corefile(off + self.ESP_COREDUMP_BIN_V1_HDR_SZ)
raise ESPCoreDumpLoaderError("Core dump version '0x%x' is not supported!" % (self.dump_ver)) else:
raise ESPCoreDumpLoaderError("Core dump version '0x%x' is not supported!" % (self.dump_ver))
else: else:
raise ESPCoreDumpLoaderError("Core dump chip '0x%x' is not supported!" % (self.chip_ver)) raise ESPCoreDumpLoaderError("Core dump chip '0x%x' is not supported!" % (self.chip_ver))
self.core_elf_file.flush()
def read_data(self, off, sz): def read_data(self, off, sz):
"""Reads data from raw core dump got from flash or UART """Reads data from raw core dump got from flash or UART
""" """
self.fcore.seek(off) self.core_src_file.seek(off)
data = self.fcore.read(sz) data = self.core_src_file.read(sz)
return data return data
def cleanup(self):
if self.core_elf_file:
self.core_elf_file.close()
self.core_elf_file = None
if self.core_src_file:
self.core_src_file.close()
self.core_src_file = None
class ESPCoreDumpFileLoader(ESPCoreDumpLoader): class ESPCoreDumpFileLoader(ESPCoreDumpLoader):
"""Core dump file loader class """Core dump file loader class
@ -960,35 +946,25 @@ class ESPCoreDumpFileLoader(ESPCoreDumpLoader):
"""Constructor for core dump file loader """Constructor for core dump file loader
""" """
super(ESPCoreDumpFileLoader, self).__init__() super(ESPCoreDumpFileLoader, self).__init__()
self.fcore = self._load_coredump(path, b64) self._load_coredump(path, b64)
def _load_coredump(self, path, b64): def _load_coredump(self, path, b64):
"""Loads core dump from (raw binary or base64-encoded) file """Loads core dump from (raw binary or base64-encoded) file
""" """
logging.debug("Load core dump from '%s'", path) logging.debug("Load core dump from '%s', %s format" % (path, "b64" if b64 else "raw"))
self.fcore_name = None if not b64:
if b64: self.core_src_file = open(path, mode="rb")
fhnd,self.fcore_name = tempfile.mkstemp() else:
fcore = os.fdopen(fhnd, 'wb') self.core_src_file = tempfile.NamedTemporaryFile("w+b")
fb64 = open(path, 'rb') with open(path, 'rb') as fb64:
try:
while True: while True:
line = fb64.readline() line = fb64.readline()
if len(line) == 0: if len(line) == 0:
break break
data = base64.standard_b64decode(line.rstrip(b'\r\n')) data = base64.standard_b64decode(line.rstrip(b'\r\n'))
fcore.write(data) self.core_src_file.write(data)
fcore.close() self.core_src_file.flush()
fcore = open(self.fcore_name, 'rb') self.core_src_file.seek(0)
except Exception as e:
if self.fcore_name:
self.remove_tmp_file(self.fcore_name)
raise e
finally:
fb64.close()
else:
fcore = open(path, 'rb')
return fcore
class ESPCoreDumpFlashLoader(ESPCoreDumpLoader): class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
@ -1006,7 +982,7 @@ class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
self.baud = baud self.baud = baud
self.chip = chip self.chip = chip
self.dump_sz = 0 self.dump_sz = 0
self.fcore = self._load_coredump(off) self._load_coredump(off)
def get_tool_path(self, use_esptool=None): def get_tool_path(self, use_esptool=None):
"""Get tool path """Get tool path
@ -1052,32 +1028,26 @@ class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
if self.port: if self.port:
part_tool_args.extend(['--port', self.port]) part_tool_args.extend(['--port', self.port])
part_tool_args.extend(['read_partition', '--partition-type', 'data', '--partition-subtype', 'coredump', '--output']) part_tool_args.extend(['read_partition', '--partition-type', 'data', '--partition-subtype', 'coredump', '--output'])
self.fcore_name = None self.core_src_file = tempfile.NamedTemporaryFile()
f = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
try: try:
part_tool_args.append(f.name) part_tool_args.append(self.core_src_file.name)
self.fcore_name = f.name self.fcore_name = self.core_src_file.name
# read core dump partition # read core dump partition
et_out = subprocess.check_output(part_tool_args) et_out = subprocess.check_output(part_tool_args)
if len(et_out): if len(et_out):
logging.info(et_out.decode('utf-8')) logging.info(et_out.decode('utf-8'))
self.dump_sz = self._read_core_dump_length(f) self.dump_sz = self._read_core_dump_length(self.core_src_file)
f.seek(self.dump_sz) self.core_src_file.seek(self.dump_sz)
# cut free space of the partition # cut free space of the partition
f.truncate() self.core_src_file.truncate()
f.seek(0) self.core_src_file.seek(0)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logging.error("parttool script execution failed with err %d" % e.returncode) logging.error("parttool script execution failed with err %d" % e.returncode)
logging.debug("Command ran: '%s'" % e.cmd) logging.debug("Command ran: '%s'" % e.cmd)
logging.debug("Command out:") logging.debug("Command out:")
logging.debug(e.output) logging.debug(e.output)
if self.fcore_name:
f.close()
self.remove_tmp_file(self.fcore_name)
raise e raise e
return f
def invoke_esptool(self, tool_path=None, off=None): def invoke_esptool(self, tool_path=None, off=None):
"""Loads core dump from flash using elftool """Loads core dump from flash using elftool
""" """
@ -1086,8 +1056,7 @@ class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
tool_args.extend(['-p', self.port]) tool_args.extend(['-p', self.port])
if self.baud: if self.baud:
tool_args.extend(['-b', str(self.baud)]) tool_args.extend(['-b', str(self.baud)])
f = tempfile.NamedTemporaryFile(mode='w+b', delete=False) self.core_src_file = tempfile.NamedTemporaryFile()
self.fcore_name = None
try: try:
(part_offset, part_size) = self.get_core_dump_partition_info(tool_path='') (part_offset, part_size) = self.get_core_dump_partition_info(tool_path='')
if not off: if not off:
@ -1096,13 +1065,12 @@ class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
if part_offset != off: if part_offset != off:
logging.warning("Predefined image offset: %d does not match core dump partition offset: %d", off, part_offset) logging.warning("Predefined image offset: %d does not match core dump partition offset: %d", off, part_offset)
tool_args.extend(['read_flash', str(off), str(self.ESP_COREDUMP_FLASH_LEN_SZ)]) tool_args.extend(['read_flash', str(off), str(self.ESP_COREDUMP_FLASH_LEN_SZ)])
tool_args.append(f.name) tool_args.append(self.core_src_file.name)
self.fcore_name = f.name
# read core dump length # read core dump length
et_out = subprocess.check_output(tool_args) et_out = subprocess.check_output(tool_args)
if len(et_out): if len(et_out):
logging.info(et_out.decode('utf-8')) logging.info(et_out.decode('utf-8'))
self.dump_sz = self._read_core_dump_length(f) self.dump_sz = self._read_core_dump_length(self.core_src_file)
if self.dump_sz == 0 or self.dump_sz > part_size: if self.dump_sz == 0 or self.dump_sz > part_size:
logging.error("Incorrect size of core dump image: %d, use partition size instead: %d", self.dump_sz, part_size) logging.error("Incorrect size of core dump image: %d, use partition size instead: %d", self.dump_sz, part_size)
self.dump_sz = part_size self.dump_sz = part_size
@ -1116,11 +1084,7 @@ class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
logging.debug("Command ran: '%s'" % e.cmd) logging.debug("Command ran: '%s'" % e.cmd)
logging.debug("Command out:") logging.debug("Command out:")
logging.debug(e.output) logging.debug(e.output)
if self.fcore_name:
f.close()
self.remove_tmp_file(self.fcore_name)
raise e raise e
return f
def _load_coredump(self, off=None): def _load_coredump(self, off=None):
"""Loads core dump from flash using parttool or elftool (if offset is set) """Loads core dump from flash using parttool or elftool (if offset is set)
@ -1130,22 +1094,21 @@ class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
if off: if off:
tool_path = '' tool_path = ''
logging.info("Invoke esptool to read image.") logging.info("Invoke esptool to read image.")
f = self.invoke_esptool(tool_path=tool_path, off=off) self.invoke_esptool(tool_path=tool_path, off=off)
else: else:
tool_path = '' tool_path = ''
logging.info("Invoke parttool to read image.") logging.info("Invoke parttool to read image.")
f = self.invoke_parttool(tool_path=tool_path) self.invoke_parttool(tool_path=tool_path)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
if len(e.output): if len(e.output):
logging.info(e.output) logging.info(e.output)
logging.warning("System path is not set. Try to use predefined path.") logging.warning("System path is not set. Try to use predefined path.")
if off: if off:
tool_path = self.get_tool_path(use_esptool=True) tool_path = self.get_tool_path(use_esptool=True)
f = self.invoke_esptool(tool_path=tool_path, off=off) self.invoke_esptool(tool_path=tool_path, off=off)
else: else:
tool_path = self.get_tool_path(use_esptool=False) tool_path = self.get_tool_path(use_esptool=False)
f = self.invoke_parttool(tool_path=tool_path) self.invoke_parttool(tool_path=tool_path)
return f
def _read_core_dump_length(self, f): def _read_core_dump_length(self, f):
"""Reads core dump length """Reads core dump length
@ -1154,7 +1117,7 @@ class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
tot_len, = struct.unpack_from(self.ESP_COREDUMP_FLASH_LEN_FMT, data) tot_len, = struct.unpack_from(self.ESP_COREDUMP_FLASH_LEN_FMT, data)
return tot_len return tot_len
def create_corefile(self, core_fname=None, exe_name=None, rom_elf=None): def create_corefile(self, exe_name=None): # type: (str) -> None
"""Checks flash coredump data integrity and creates ELF file """Checks flash coredump data integrity and creates ELF file
""" """
data = self.read_data(0, self.ESP_COREDUMP_HDR_SZ) data = self.read_data(0, self.ESP_COREDUMP_HDR_SZ)
@ -1180,7 +1143,7 @@ class ESPCoreDumpFlashLoader(ESPCoreDumpLoader):
dump_sha256_str = binascii.hexlify(dump_sha256).decode('ascii') dump_sha256_str = binascii.hexlify(dump_sha256).decode('ascii')
if dump_sha256_str != data_sha256_str: if dump_sha256_str != data_sha256_str:
raise ESPCoreDumpLoaderError("Invalid core dump SHA256 '%s', should be '%s'" % (dump_sha256_str, data_sha256_str)) raise ESPCoreDumpLoaderError("Invalid core dump SHA256 '%s', should be '%s'" % (dump_sha256_str, data_sha256_str))
return super(ESPCoreDumpFlashLoader, self).create_corefile(core_fname, exe_name) super(ESPCoreDumpFlashLoader, self).create_corefile(exe_name)
def load_aux_elf(elf_path): # type: (str) -> (ESPCoreDumpElfFile, str) def load_aux_elf(elf_path): # type: (str) -> (ESPCoreDumpElfFile, str)
@ -1196,62 +1159,65 @@ def load_aux_elf(elf_path): # type: (str) -> (ESPCoreDumpElfFile, str)
return elf, sym_cmd return elf, sym_cmd
def core_prepare(args, rom_elf): def core_prepare(args):
loader = None loader = None
core_fname = None core_filename = None
if not args.core: if not args.core:
# Core file not specified, try to read core dump from flash. # Core file not specified, try to read core dump from flash.
loader = ESPCoreDumpFlashLoader(args.off, port=args.port, baud=args.baud) loader = ESPCoreDumpFlashLoader(args.off, port=args.port, baud=args.baud)
elif args.core_format and args.core_format != "elf": elif args.core_format != "elf":
# Core file specified, but not yet in ELF format. Convert it from raw or base64 into ELF. # Core file specified, but not yet in ELF format. Convert it from raw or base64 into ELF.
loader = ESPCoreDumpFileLoader(args.core, args.core_format == 'b64') loader = ESPCoreDumpFileLoader(args.core, args.core_format == 'b64')
else: else:
# Core file is already in the ELF format # Core file is already in the ELF format
core_fname = args.core core_filename = args.core
# Load/convert the core file # Load/convert the core file
if loader: if loader:
core_fname = loader.create_corefile(args.save_core, exe_name=args.prog, rom_elf=rom_elf) loader.create_corefile(exe_name=args.prog)
if not core_fname: core_filename = loader.core_elf_file.name
loader.cleanup() if args.save_core:
raise RuntimeError("Failed to create corefile!") # We got asked to save the core file, make a copy
with open(args.save_core, "w+b") as f_out:
loader.core_elf_file.seek(0)
f_out.write(loader.core_elf_file.read())
return core_fname, loader return core_filename, loader
def dbg_corefile(args): def dbg_corefile(args):
""" Command to load core dump from file or flash and run GDB debug session with it """ Command to load core dump from file or flash and run GDB debug session with it
""" """
rom_elf, rom_sym_cmd = load_aux_elf(args.rom_elf) rom_elf, rom_sym_cmd = load_aux_elf(args.rom_elf)
core_fname, loader = core_prepare(args, rom_elf) core_filename, loader = core_prepare(args)
p = subprocess.Popen(bufsize=0, p = subprocess.Popen(bufsize=0,
args=[args.gdb, args=[args.gdb,
'--nw', # ignore .gdbinit '--nw', # ignore .gdbinit
'--core=%s' % core_fname, # core file, '--core=%s' % core_filename, # core file,
'-ex', rom_sym_cmd, '-ex', rom_sym_cmd,
args.prog args.prog
], ],
stdin=None, stdout=None, stderr=None, stdin=None, stdout=None, stderr=None,
close_fds=CLOSE_FDS close_fds=CLOSE_FDS
) )
p.wait()
p.wait()
print('Done!')
if loader: if loader:
loader.cleanup() loader.cleanup()
print('Done!')
def gdbmi_filter_responses(responses, resp_message, resp_type): def gdbmi_filter_responses(responses, resp_message, resp_type):
return list(filter(lambda rsp: rsp["message"] == resp_message and rsp["type"] == resp_type, responses)) return list(filter(lambda rsp: rsp["message"] == resp_message and rsp["type"] == resp_type, responses))
def gdbmi_run_cmd_get_responses(p, cmd, resp_message, resp_type, multiple=True, done_message=None, done_type=None): # type: (GdbController, str, typing.Optional[str], str, bool, typing.Optional[str], typing.Optional[str]) -> list def gdbmi_run_cmd_get_responses(p, cmd, resp_message, resp_type, multiple=True, done_message=None, done_type=None): \
# type: (GdbController, str, typing.Optional[str], str, bool, typing.Optional[str], typing.Optional[str]) -> list
p.write(cmd, read_response=False) p.write(cmd, read_response=False)
t_end = time.time() + DEFAULT_GDB_TIMEOUT_SEC t_end = time.time() + DEFAULT_GDB_TIMEOUT_SEC
filtered_response_list = [] filtered_response_list = []
all_responses = [] all_responses = []
found = False
while time.time() < t_end: while time.time() < t_end:
more_responses = p.get_gdb_response(timeout_sec=0, raise_error_on_timeout=False) more_responses = p.get_gdb_response(timeout_sec=0, raise_error_on_timeout=False)
filtered_response_list += filter(lambda rsp: rsp["message"] == resp_message and rsp["type"] == resp_type, more_responses) filtered_response_list += filter(lambda rsp: rsp["message"] == resp_message and rsp["type"] == resp_type, more_responses)
@ -1261,7 +1227,7 @@ def gdbmi_run_cmd_get_responses(p, cmd, resp_message, resp_type, multiple=True,
if done_message and done_type and gdbmi_filter_responses(more_responses, done_message, done_type): if done_message and done_type and gdbmi_filter_responses(more_responses, done_message, done_type):
break break
if not filtered_response_list and not multiple: if not filtered_response_list and not multiple:
raise ValueError("Couldn't find response with message '{}', type '{}' in responses '{}'".format( raise ESPCoreDumpError("Couldn't find response with message '{}', type '{}' in responses '{}'".format(
resp_message, resp_type, str(all_responses) resp_message, resp_type, str(all_responses)
)) ))
return filtered_response_list return filtered_response_list
@ -1290,7 +1256,8 @@ def gdbmi_start(gdb_path, gdb_cmds, core_filename, prog_filename): # type: (str
def gdbmi_cmd_exec_console(p, gdb_cmd): # type: (GdbController, str) -> str def gdbmi_cmd_exec_console(p, gdb_cmd): # type: (GdbController, str) -> str
""" Execute a generic GDB console command via MI2 """ """ Execute a generic GDB console command via MI2 """
filtered_responses = gdbmi_run_cmd_get_responses(p, "-interpreter-exec console \"%s\"" % gdb_cmd, None, "console", multiple=True, done_message="done", done_type="result") filtered_responses = gdbmi_run_cmd_get_responses(p, "-interpreter-exec console \"%s\"" % gdb_cmd, None, "console",
multiple=True, done_message="done", done_type="result")
return "".join([x["payload"] for x in filtered_responses])\ return "".join([x["payload"] for x in filtered_responses])\
.replace('\\n', '\n').replace('\\t', '\t').rstrip("\n") .replace('\\n', '\n').replace('\\t', '\t').rstrip("\n")
@ -1341,10 +1308,10 @@ def info_corefile(args):
""" Command to load core dump from file or flash and print it's data in user friendly form """ Command to load core dump from file or flash and print it's data in user friendly form
""" """
rom_elf, rom_sym_cmd = load_aux_elf(args.rom_elf) rom_elf, rom_sym_cmd = load_aux_elf(args.rom_elf)
core_fname, loader = core_prepare(args, rom_elf) core_filename, loader = core_prepare(args)
exe_elf = ESPCoreDumpElfFile(args.prog) exe_elf = ESPCoreDumpElfFile(args.prog)
core_elf = ESPCoreDumpElfFile(core_fname) core_elf = ESPCoreDumpElfFile(core_filename)
merged_segs = [] merged_segs = []
core_segs = core_elf.program_segments core_segs = core_elf.program_segments
for s in exe_elf.sections: for s in exe_elf.sections:
@ -1388,7 +1355,7 @@ def info_corefile(args):
if not merged: if not merged:
merged_segs.append((s.name, s.addr, len(s.data), s.attr_str(), False)) merged_segs.append((s.name, s.addr, len(s.data), s.attr_str(), False))
p = gdbmi_start(args.gdb, [rom_sym_cmd], core_fname, args.prog) p = gdbmi_start(args.gdb, [rom_sym_cmd], core_filename, args.prog)
extra_note = None extra_note = None
task_info = [] task_info = []
@ -1493,10 +1460,9 @@ def info_corefile(args):
print("===============================================================") print("===============================================================")
p.exit() p.exit()
print('Done!')
if loader: if loader:
loader.cleanup() loader.cleanup()
print('Done!')
def main(): def main():
@ -1576,8 +1542,4 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
try: main()
main()
except ESPCoreDumpError as e:
print('\nA fatal error occurred: %s' % e)
sys.exit(2)

View file

@ -28,24 +28,14 @@ except ImportError:
class TestESPCoreDumpFileLoader(unittest.TestCase): class TestESPCoreDumpFileLoader(unittest.TestCase):
def setUp(self):
self.tmp_file = 'tmp'
self.dloader = espcoredump.ESPCoreDumpFileLoader(path='coredump.b64', b64=True)
self.assertIsInstance(self.dloader, espcoredump.ESPCoreDumpFileLoader)
def tearDown(self):
self.dloader.cleanup()
def testESPCoreDumpFileLoaderWithoutB64(self): def testESPCoreDumpFileLoaderWithoutB64(self):
t = espcoredump.ESPCoreDumpFileLoader(path='coredump.b64', b64=False) loader = espcoredump.ESPCoreDumpFileLoader(path='coredump.b64', b64=False)
self.assertIsInstance(t, espcoredump.ESPCoreDumpFileLoader) # invoke for coverage of open() loader.cleanup()
t.cleanup()
def test_cannot_remove_dir(self):
self.dloader.remove_tmp_file(fname='.') # silent failure (but covers exception inside)
def test_create_corefile(self): def test_create_corefile(self):
self.assertEqual(self.dloader.create_corefile(core_fname=self.tmp_file, off=0, rom_elf=None), self.tmp_file) loader = espcoredump.ESPCoreDumpFileLoader(path='coredump.b64', b64=True)
loader.create_corefile()
loader.cleanup()
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -2,8 +2,10 @@
{ coverage debug sys \ { coverage debug sys \
&& coverage erase \ && coverage erase \
&& coverage run -a --source=espcoredump ../espcoredump.py info_corefile -m -t b64 -c coredump.b64 test.elf &> output \ && coverage run -a --source=espcoredump ../espcoredump.py info_corefile -m -t b64 -c coredump.b64 -s core.elf test.elf &> output \
&& diff expected_output output \ && diff expected_output output \
&& coverage run -a --source=espcoredump ../espcoredump.py info_corefile -m -t elf -c core.elf test.elf &> output2 \
&& diff expected_output output2 \
&& coverage run -a --source=espcoredump ./test_espcoredump.py \ && coverage run -a --source=espcoredump ./test_espcoredump.py \
&& coverage report \ && coverage report \
; } || { echo 'The test for espcoredump has failed!'; exit 1; } ; } || { echo 'The test for espcoredump has failed!'; exit 1; }