#!/usr/bin/env python # # ESP32 x509 certificate bundle generation utility # # Converts PEM and DER certificates to a custom bundle format which stores just the # subject name and public key to reduce space # # The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length; # crt 1 subject name; crt 1 public key; crt 2... # # Copyright 2018-2019 Espressif Systems (Shanghai) PTE LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http:#www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import with_statement import os import sys import struct import argparse import csv import re from io import open try: from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization except ImportError: print('The cryptography package is not installed.' 'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' 'setting up the required packages.') raise ca_bundle_bin_file = 'x509_crt_bundle' quiet = False def status(msg): """ Print status message to stderr """ if not quiet: critical(msg) def critical(msg): """ Print critical message to stderr """ sys.stderr.write('gen_crt_bundle.py: ') sys.stderr.write(msg) sys.stderr.write('\n') class CertificateBundle: def __init__(self): self.certificates = [] self.compressed_crts = [] if os.path.isfile(ca_bundle_bin_file): os.remove(ca_bundle_bin_file) def add_from_path(self, crts_path): found = False for file_path in os.listdir(crts_path): found |= self.add_from_file(os.path.join(crts_path, file_path)) if found is False: raise InputError('No valid x509 certificates found in %s' % crts_path) def add_from_file(self, file_path): try: if file_path.endswith('.pem'): status("Parsing certificates from %s" % file_path) with open(file_path, 'r', encoding='utf-8') as f: crt_str = f.read() self.add_from_pem(crt_str) return True elif file_path.endswith('.der'): status("Parsing certificates from %s" % file_path) with open(file_path, 'rb') as f: crt_str = f.read() self.add_from_der(crt_str) return True except ValueError: critical("Invalid certificate in %s" % file_path) raise InputError("Invalid certificate") return False def add_from_pem(self, crt_str): """ A single PEM file may have multiple certificates """ crt = '' count = 0 start = False for strg in crt_str.splitlines(True): if strg == '-----BEGIN CERTIFICATE-----\n' and start is False: crt = '' start = True elif strg == '-----END CERTIFICATE-----\n' and start is True: crt += strg + '\n' start = False self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend())) count += 1 if start is True: crt += strg if(count == 0): raise InputError("No certificate found") status("Successfully added %d certificates" % count) def add_from_der(self, crt_str): self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend())) status("Successfully added 1 certificate") def create_bundle(self): # Sort certificates in order to do binary search when looking up certificates self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend())) bundle = struct.pack('>H', len(self.certificates)) for crt in self.certificates: """ Read the public key as DER format """ pub_key = crt.public_key() pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) """ Read the subject name as DER format """ sub_name_der = crt.subject.public_bytes(default_backend()) name_len = len(sub_name_der) key_len = len(pub_key_der) len_data = struct.pack('>HH', name_len, key_len) bundle += len_data bundle += sub_name_der bundle += pub_key_der return bundle def add_with_filter(self, crts_path, filter_path): filter_set = set() with open(filter_path, 'r', encoding='utf-8') as f: csv_reader = csv.reader(f, delimiter=',') # Skip header next(csv_reader) for row in csv_reader: filter_set.add(row[1]) status("Parsing certificates from %s" % crts_path) crt_str = [] with open(crts_path, 'r', encoding='utf-8') as f: crt_str = f.read() # Split all certs into a list of (name, certificate string) tuples pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE) filtered_crts = '' for name, crt in pem_crts: if name in filter_set: filtered_crts += crt self.add_from_pem(filtered_crts) class InputError(RuntimeError): def __init__(self, e): super(InputError, self).__init__(e) def main(): global quiet parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility') parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true') parser.add_argument('--input', '-i', nargs='+', required=True, help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files') parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \ that should be included from cacrt_all.pem') args = parser.parse_args() quiet = args.quiet bundle = CertificateBundle() for path in args.input: if os.path.isfile(path): if os.path.basename(path) == "cacrt_all.pem" and args.filter: bundle.add_with_filter(path, args.filter) else: bundle.add_from_file(path) elif os.path.isdir(path): bundle.add_from_path(path) else: raise InputError("Invalid --input=%s, is neither file nor folder" % args.input) status('Successfully added %d certificates in total' % len(bundle.certificates)) crt_bundle = bundle.create_bundle() with open(ca_bundle_bin_file, 'wb') as f: f.write(crt_bundle) if __name__ == '__main__': try: main() except InputError as e: print(e) sys.exit(2)