tx509: use SKI and AKI to find parent certificate if it is missing - electrum - Electrum Bitcoin wallet HTML git clone https://git.parazyd.org/electrum DIR Log DIR Files DIR Refs DIR Submodules --- DIR commit f82de35bd203e35316f6f245bf2f0cc0f9e0821d DIR parent fbf56c9cfa56b17ba1aa6af7c25117257fe06462 HTML Author: ThomasV <thomasv@gitorious> Date: Fri, 17 Apr 2015 12:30:52 +0200 x509: use SKI and AKI to find parent certificate if it is missing Diffstat: M lib/paymentrequest.py | 14 ++++++++------ M lib/x509.py | 108 ++++++++++++++++--------------- 2 files changed, 65 insertions(+), 57 deletions(-) --- DIR diff --git a/lib/paymentrequest.py b/lib/paymentrequest.py t@@ -44,7 +44,7 @@ REQUEST_HEADERS = {'Accept': 'application/bitcoin-paymentrequest', 'User-Agent': ACK_HEADERS = {'Content-Type':'application/bitcoin-payment','Accept':'application/bitcoin-paymentack','User-Agent':'Electrum'} ca_path = requests.certs.where() -ca_list = x509.load_certificates(ca_path) +ca_list, ca_keyID = x509.load_certificates(ca_path) # status of payment requests t@@ -142,15 +142,17 @@ class PaymentRequest: return False # if the root CA is not supplied, add it to the chain ca = x509_chain[cert_num-1] - if ca.get_common_name() not in ca_list: - x = ca_list.get(ca.get_issuer()) - if x: - x509_chain.append(x) + if ca.getFingerprint() not in ca_list: + keyID = ca.get_issuer_keyID() + f = ca_keyID.get(keyID) + if f: + root = ca_list[f] + x509_chain.append(root) else: self.error = "Supplied CA Not Found in Trusted CA Store." return False # verify the chain of signatures - cert_num = len(cert.certificate) + cert_num = len(x509_chain) for i in range(1, cert_num): x = x509_chain[i] prev_x = x509_chain[i-1] DIR diff --git a/lib/x509.py b/lib/x509.py t@@ -50,9 +50,6 @@ PREFIX_RSA_SHA512 = bytearray([0x30,0x51,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01 class CertificateError(Exception): pass -def decode_str(data): - encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8' - return bytes(data).decode(encoding) def decode_OID(s): s = map(ord, s) t@@ -69,33 +66,31 @@ def decode_OID(s): return '.'.join(map(str,r)) +def asn1_get_children(der, i): + nodes = [] + ii = asn1_node_first_child(der,i) + nodes.append(ii) + while ii[2]<i[2]: + ii = asn1_node_next(der,ii) + nodes.append(ii) + return nodes + +def asn1_get_sequence(s): + return map(lambda j: asn1_get_value(s, j), asn1_get_children(s, asn1_node_root(s))) + +def asn1_get_dict(der, i): + p = {} + for ii in asn1_get_children(der, i): + for iii in asn1_get_children(der, ii): + iiii = asn1_node_first_child(der, iii) + oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER')) + iiii = asn1_node_next(der, iiii) + value = asn1_get_value(der, iiii) + p[oid] = value + return p + class X509(tlslite.X509): - """Child class of tlslite.X509 that uses pyasn1 to parse cert - information. Note: pyasn1 is a lot slower than tlslite, so we - should try to do everything in tlslite. - """ - - def get_children(self, der, i): - nodes = [] - ii = asn1_node_first_child(der,i) - while True: - nodes.append(ii) - ii = asn1_node_next(der,ii) - if ii[0] > i[2]: - break - return nodes - - def get_dict(self, der, i): - p = {} - for ii in self.get_children(der, i): - for iii in self.get_children(der, ii): - iiii = asn1_node_first_child(der, iii) - oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER')) - iiii = asn1_node_next(der, iiii) - value = asn1_get_value(der, iiii) - p[oid] = value - return p def parseBinary(self, b): t@@ -123,7 +118,7 @@ class X509(tlslite.X509): # issuer issuer = asn1_node_next(der, sig_algo) - self.issuer = self.get_dict(der, issuer) + self.issuer = asn1_get_dict(der, issuer) # validity validity = asn1_node_next(der, issuer) t@@ -134,26 +129,31 @@ class X509(tlslite.X509): # subject subject = asn1_node_next(der, validity) - self.subject = self.get_dict(der, subject) + self.subject = asn1_get_dict(der, subject) subject_pki = asn1_node_next(der, subject) - # optional fields: issuer_uid, subject_uid, extensions - i = subject_pki + # extensions self.CA = False - while True: + self.AKI = None + self.SKI = None + i = subject_pki + while i[2] < cert[2]: i = asn1_node_next(der, i) - if i[0] > cert[2]: - break - for ii in self.get_children(der, i): - for iii in self.get_children(der, ii): - iiii = asn1_node_first_child(der, iii) - oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER')) - iiii = asn1_node_next(der, iiii) - value = asn1_get_value(der, iiii) - if oid == '2.5.29.19': # basic constraints - self.CA = value - else: - pass + d = asn1_get_dict(der, i) + for oid, value in d.items(): + if oid == '2.5.29.19': + # Basic Constraints + self.CA = bool(value) + elif oid == '2.5.29.14': + # Subject Key Identifier + r = asn1_node_root(value) + value = asn1_get_value_of_type(value, r, 'OCTET STRING') + self.SKI = value.encode('hex') + elif oid == '2.5.29.35': + # Authority Key Identifier + self.AKI = asn1_get_sequence(value)[0].encode('hex') + else: + pass # cert signature cert_sig_algo = asn1_node_next(der, cert) t@@ -162,13 +162,16 @@ class X509(tlslite.X509): cert_sig = asn1_node_next(der, cert_sig_algo) self.signature = asn1_get_value(der, cert_sig)[1:] + def get_keyID(self): + # http://security.stackexchange.com/questions/72077/validating-an-ssl-certificate-chain-according-to-rfc-5280-am-i-understanding-th + return self.SKI if self.SKI else repr(self.subject) + + def get_issuer_keyID(self): + return self.AKI if self.AKI else repr(self.issuer) def get_common_name(self): return self.subject.get('2.5.4.3', 'unknown') - def get_issuer(self): - return self.issuer.get('2.5.4.3', 'unknown') - def get_signature(self): return self.cert_sig_algo, self.signature, self.data t@@ -198,6 +201,7 @@ class X509CertChain(tlslite.X509CertChain): @profiler def load_certificates(ca_path): ca_list = {} + ca_keyID = {} with open(ca_path, 'r') as f: s = f.read() bList = tlslite.utils.pem.dePemList(s, "CERTIFICATE") t@@ -206,10 +210,12 @@ def load_certificates(ca_path): try: x.parseBinary(b) x.check_date() - except Exception as e: + except BaseException as e: util.print_error("cert error:", e) continue - ca_list[x.get_common_name()] = x + fp = x.getFingerprint() + ca_list[fp] = x + ca_keyID[x.get_keyID()] = fp - return ca_list + return ca_list, ca_keyID