URI: 
       tparse certificates without pyasn1 - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit ed5258e4063edcfc1085d0f4e4d80e2483ae65e8
   DIR parent 6bf1dc6f33a8a3949e4d5b6d46c677c30fc7a4e6
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Tue, 14 Apr 2015 15:04:04 +0200
       
       parse certificates without pyasn1
       
       Diffstat:
         A lib/asn1tinydecoder.py              |     123 +++++++++++++++++++++++++++++++
         M lib/paymentrequest.py               |      34 +++++++++----------------------
         M lib/x509.py                         |     318 +++++++++++++------------------
       
       3 files changed, 265 insertions(+), 210 deletions(-)
       ---
   DIR diff --git a/lib/asn1tinydecoder.py b/lib/asn1tinydecoder.py
       t@@ -0,0 +1,123 @@
       +#  This program is free software; you can redistribute it and/or modify
       +#  it under the terms of the GNU General Public License as published by
       +#  the Free Software Foundation; either version 2 of the License, or
       +#  (at your option) any later version.
       +#  
       +#  This program is distributed in the hope that it will be useful,
       +#  but WITHOUT ANY WARRANTY; without even the implied warranty of
       +#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
       +#  GNU General Public License for more details.
       +#  
       +#  You should have received a copy of the GNU General Public License
       +#  along with this program; if not, write to the Free Software
       +#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
       +#  MA 02110-1301, USA.
       +  
       +
       +# This is a simple and fast ASN1 decoder without external libraries.
       +#
       +# In order to browse through the ASN1 structure you need only 3 
       +# functions allowing you to navigate:
       +#    asn1_node_root(...), asn1_node_next(...) and asn1_node_first_child(...) 
       +#
       +####################### BEGIN ASN1 DECODER ############################
       +
       +# Author: Jens Getreu, 8.11.2014
       +
       +##### NAVIGATE
       +
       +# The following 4 functions are all you need to parse an ASN1 structure
       +
       +# gets the first ASN1 structure in der
       +def asn1_node_root(der):
       +        return asn1_read_length(der,0)
       +
       +# gets the next ASN1 structure following (ixs,ixf,ixl) 
       +def asn1_node_next(der, (ixs,ixf,ixl)):
       +        return asn1_read_length(der,ixl+1)
       +
       +# opens the container (ixs,ixf,ixl) and returns the first ASN1 inside 
       +def asn1_node_first_child(der, (ixs,ixf,ixl)):
       +        if ord(der[ixs]) & 0x20 != 0x20:
       +                raise ValueError('Error: can only open constructed types. '
       +                                +'Found type: 0x'+der[ixs].encode("hex"))
       +        return asn1_read_length(der,ixf)
       +
       +# is true if one ASN1 chunk is inside another chunk. 
       +def asn1_node_is_child_of((ixs,ixf,ixl), (jxs,jxf,jxl)):
       +        return ( (ixf <= jxs ) and (jxl <= ixl) ) or \
       +           ( (jxf <= ixs ) and (ixl <= jxl) )  
       +
       +##### END NAVIGATE
       +
       +
       +
       +##### ACCESS PRIMITIVES
       +
       +# get content and verify type byte
       +def asn1_get_value_of_type(der,(ixs,ixf,ixl),asn1_type):
       +        asn1_type_table = {
       +        'BOOLEAN':           0x01,        'INTEGER':           0x02,
       +        'BIT STRING':        0x03,        'OCTET STRING':      0x04,
       +        'NULL':              0x05,        'OBJECT IDENTIFIER': 0x06,
       +        'SEQUENCE':          0x70,        'SET':               0x71,
       +        'PrintableString':   0x13,        'IA5String':         0x16,
       +        'UTCTime':           0x17,        'ENUMERATED':        0x0A,
       +        'UTF8String':        0x0C,        'PrintableString':   0x13,
       +        }
       +        if asn1_type_table[asn1_type] != ord(der[ixs]):
       +                raise ValueError('Error: Expected type was: '+
       +                        hex(asn1_type_table[asn1_type])+ 
       +                        ' Found: 0x'+der[ixs].encode('hex')) 
       +        return der[ixf:ixl+1]
       +
       +# get value
       +def asn1_get_value(der,(ixs,ixf,ixl)):
       +        return der[ixf:ixl+1]
       +
       +# get type+length+value
       +def asn1_get_all(der,(ixs,ixf,ixl)):
       +        return der[ixs:ixl+1]
       +
       +##### END ACCESS PRIMITIVES
       +
       +
       +
       +##### HELPER FUNCTIONS
       +
       +# converter
       +def bitstr_to_bytestr(bitstr):
       +        if bitstr[0] != '\x00':
       +                raise ValueError('Error: only 00 padded bitstr can be converted to bytestr!')
       +        return bitstr[1:]
       +
       +# converter
       +def bytestr_to_int(s):
       +        # converts bytestring to integer
       +        i = 0
       +        for char in s:
       +                i <<= 8
       +                i |= ord(char)
       +        return i
       +
       +# ix points to the first byte of the asn1 structure
       +# Returns first byte pointer, first content byte pointer and last.
       +def asn1_read_length(der,ix):
       +        first= ord(der[ix+1])
       +        if  (ord(der[ix+1]) & 0x80) == 0:
       +                length = first
       +                ix_first_content_byte = ix+2
       +                ix_last_content_byte = ix_first_content_byte + length -1
       +        else:
       +                lengthbytes = first & 0x7F
       +                length = bytestr_to_int(der[ix+2:ix+2+lengthbytes])
       +                ix_first_content_byte = ix+2+lengthbytes
       +                ix_last_content_byte = ix_first_content_byte + length -1
       +        return (ix,ix_first_content_byte,ix_last_content_byte)
       +
       +##### END HELPER FUNCTIONS
       +
       +
       +####################### END ASN1 DECODER ############################
       +
       +
   DIR diff --git a/lib/paymentrequest.py b/lib/paymentrequest.py
       t@@ -123,7 +123,6 @@ class PaymentRequest:
                for i in range(cert_num):
                    x = x509.X509()
                    x.parseBinary(bytearray(cert.certificate[i]))
       -            x.slow_parse()
                    x509_chain.append(x)
                    if i == 0:
                        try:
       t@@ -143,23 +142,10 @@ class PaymentRequest:
                    return False
                # if the root CA is not supplied, add it to the chain
                ca = x509_chain[cert_num-1]
       -        supplied_CA_fingerprint = ca.getFingerprint()
       -        x = ca_list.get(supplied_CA_fingerprint)
       -        if x:
       -            x.slow_parse()
       -            assert x.get_common_name() == ca.get_common_name()
       -        else:
       -            issuer = ca.get_issuer()
       -            for x in ca_list.values():
       -                try:
       -                    x.slow_parse()
       -                    names = x.extract_names()
       -                except Exception as e:
       -                    util.print_error("cannot parse cert:", e)
       -                    continue
       -                if names.get('CN') == issuer.get('CN'):
       -                    x509_chain.append(x)
       -                    break
       +        if ca.get_common_name() not in ca_list:
       +            x = ca_list.get(ca.get_issuer())
       +            if x:
       +                x509_chain.append(x)
                    else:
                        self.error = "Supplied CA Not Found in Trusted CA Store."
                        return False
       t@@ -168,18 +154,18 @@ class PaymentRequest:
                for i in range(1, cert_num):
                    x = x509_chain[i]
                    prev_x = x509_chain[i-1]
       -            algo, sig, data = prev_x.extract_sig()
       -            sig = bytearray(sig[5:])
       +            algo, sig, data = prev_x.get_signature()
       +            sig = bytearray(sig)
                    pubkey = x.publicKey
       -            if algo.getComponentByName('algorithm') == x509.ALGO_RSA_SHA1:
       +            if algo == x509.ALGO_RSA_SHA1:
                        verify = pubkey.hashAndVerify(sig, data)
       -            elif algo.getComponentByName('algorithm') == x509.ALGO_RSA_SHA256:
       +            elif algo == x509.ALGO_RSA_SHA256:
                        hashBytes = bytearray(hashlib.sha256(data).digest())
                        verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA256 + hashBytes)
       -            elif algo.getComponentByName('algorithm') == x509.ALGO_RSA_SHA384:
       +            elif algo == x509.ALGO_RSA_SHA384:
                        hashBytes = bytearray(hashlib.sha384(data).digest())
                        verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA384 + hashBytes)
       -            elif algo.getComponentByName('algorithm') == x509.ALGO_RSA_SHA512:
       +            elif algo == x509.ALGO_RSA_SHA512:
                        hashBytes = bytearray(hashlib.sha512(data).digest())
                        verify = pubkey.verify(sig, x509.PREFIX_RSA_SHA512 + hashBytes)
                    else:
   DIR diff --git a/lib/x509.py b/lib/x509.py
       t@@ -20,34 +20,26 @@
        from datetime import datetime
        import sys
        
       -import pyasn1
       -import pyasn1_modules
        import tlslite
        import util
       +from util import profiler, print_error
       +
       +from asn1tinydecoder import asn1_node_root, asn1_get_all, asn1_get_value, \
       +                        asn1_get_value_of_type, asn1_node_next, asn1_node_first_child, \
       +                        asn1_read_length, asn1_node_is_child_of, \
       +                        bytestr_to_int, bitstr_to_bytestr
        
        # workaround https://github.com/trevp/tlslite/issues/15
        tlslite.utils.cryptomath.pycryptoLoaded = False
        
        
       -from pyasn1.codec.der import decoder, encoder
       -from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
       -from pyasn1.type.char import BMPString, IA5String, UTF8String
       -from pyasn1.type.useful import GeneralizedTime
       -from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
       -                                    SubjectAltName, GeneralNames,
       -                                    GeneralName)
       -from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
       -from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
       -from pyasn1_modules.rfc2459 import id_at_organizationalUnitName as OU_NAME
       -from pyasn1_modules.rfc2459 import id_ce_basicConstraints, BasicConstraints
       -XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
       -SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
       -
        # algo OIDs
       -ALGO_RSA_SHA1 = ObjectIdentifier('1.2.840.113549.1.1.5')
       -ALGO_RSA_SHA256 = ObjectIdentifier('1.2.840.113549.1.1.11')
       -ALGO_RSA_SHA384 = ObjectIdentifier('1.2.840.113549.1.1.12')
       -ALGO_RSA_SHA512 = ObjectIdentifier('1.2.840.113549.1.1.13')
       +ALGO_RSA_SHA1   = '1.2.840.113549.1.1.5'
       +ALGO_RSA_SHA256 = '1.2.840.113549.1.1.11'
       +ALGO_RSA_SHA384 = '1.2.840.113549.1.1.12'
       +ALGO_RSA_SHA512 = '1.2.840.113549.1.1.13'
       +
       +
        
        # prefixes, see http://stackoverflow.com/questions/3713774/c-sharp-how-to-calculate-asn-1-der-encoding-of-a-particular-hash-algorithm
        PREFIX_RSA_SHA256 = bytearray([0x30,0x31,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01,0x65,0x03,0x04,0x02,0x01,0x05,0x00,0x04,0x20])
       t@@ -62,6 +54,21 @@ def decode_str(data):
            encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
            return bytes(data).decode(encoding)
        
       +def decode_OID(s):
       +    s = map(ord, s)
       +    r = []
       +    r.append(s[0] / 40)
       +    r.append(s[0] % 40)
       +    k = 0 
       +    for i in s[1:]:
       +        if i<128:
       +            r.append(i + 128*k)
       +            k = 0
       +        else:
       +            k = (i - 128) + 128*k
       +    return '.'.join(map(str,r))
       +
       +
        
        class X509(tlslite.X509):
            """Child class of tlslite.X509 that uses pyasn1 to parse cert
       t@@ -69,182 +76,116 @@ class X509(tlslite.X509):
            should try to do everything in tlslite.
            """
        
       -    def slow_parse(self):
       -        self.cert = decoder.decode(str(self.bytes), asn1Spec=Certificate())[0]
       -        self.tbs = self.cert.getComponentByName('tbsCertificate')
       -        self.subject = self.tbs.getComponentByName('subject')
       -        self.extensions = self.tbs.getComponentByName('extensions') or []
       +    def get_children(self, der, i):
       +        nodes = []
       +        ii = asn1_node_first_child(der,i)
       +        while True:
       +            nodes.append(ii)
       +            ii = asn1_node_next(der,ii)
       +            if ii[0] > i[2]:
       +                break
       +        return nodes
       +    
       +    def get_dict(self, der, i):
       +        p = {}
       +        for ii in self.get_children(der, i):
       +            for iii in self.get_children(der, ii):
       +                iiii = asn1_node_first_child(der, iii)
       +                oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER'))
       +                iiii = asn1_node_next(der, iiii)
       +                value = asn1_get_value(der, iiii)
       +                p[oid] = value
       +        return p
       +
       +    def parseBinary(self, b):
       +
       +        # call tlslite method first
       +        tlslite.X509.parseBinary(self, b)
       +
       +        der = str(b)
       +        root = asn1_node_root(der)
       +        cert = asn1_node_first_child(der, root)
       +        # data for signature
       +        self.data = asn1_get_all(der, cert)
       +
       +        # optional version field
       +        if asn1_get_value(der, cert)[0] == chr(0xa0):
       +            version = asn1_node_first_child(der, cert)
       +            serial_number = asn1_node_next(der, version)
       +        else:
       +            serial_number = asn1_node_first_child(der, cert)
       +        self.serial_number = bytestr_to_int(asn1_get_value_of_type(der, serial_number, 'INTEGER'))
       +
       +        # signature algorithm
       +        sig_algo = asn1_node_next(der, serial_number)
       +        ii = asn1_node_first_child(der, sig_algo)
       +        self.sig_algo = decode_OID(asn1_get_value_of_type(der, ii, 'OBJECT IDENTIFIER'))
       +
       +        # issuer
       +        issuer = asn1_node_next(der, sig_algo)
       +        self.issuer = self.get_dict(der, issuer)
       +
       +        # validity
       +        validity = asn1_node_next(der, issuer)
       +        ii = asn1_node_first_child(der, validity)
       +        self.notBefore = asn1_get_value_of_type(der, ii, 'UTCTime')
       +        ii = asn1_node_next(der,ii)
       +        self.notAfter = asn1_get_value_of_type(der, ii, 'UTCTime')
       +
       +        # subject
       +        subject = asn1_node_next(der, validity)
       +        self.subject = self.get_dict(der, subject)
       +        subject_pki = asn1_node_next(der, subject)
       +
       +        # optional fields: issuer_uid, subject_uid, extensions
       +        i = subject_pki
       +        self.CA = False
       +        while True:
       +            i = asn1_node_next(der, i)
       +            if i[0] > cert[2]:
       +                break
       +            for ii in self.get_children(der, i):
       +                for iii in self.get_children(der, ii):
       +                    iiii = asn1_node_first_child(der, iii)
       +                    oid = decode_OID(asn1_get_value_of_type(der, iiii, 'OBJECT IDENTIFIER'))
       +                    iiii = asn1_node_next(der, iiii)
       +                    value = asn1_get_value(der, iiii)
       +                    if oid == '2.5.29.19':   # basic constraints
       +                        self.CA = value
       +                    else:
       +                        pass
       +
       +        # cert signature
       +        cert_sig_algo = asn1_node_next(der, cert)
       +        ii = asn1_node_first_child(der, cert_sig_algo)
       +        self.cert_sig_algo = decode_OID(asn1_get_value_of_type(der, ii, 'OBJECT IDENTIFIER'))
       +        cert_sig = asn1_node_next(der, cert_sig_algo)
       +        self.signature = asn1_get_value(der, cert_sig)[1:]
       +        
        
            def get_common_name(self):
       -        return self.extract_names()['CN']
       +        return self.subject.get('2.5.4.3')
        
            def get_issuer(self):
       -        results = {'CN': None, 'OU': None,}
       -        issuer = self.tbs.getComponentByName('issuer')
       -        # Extract the CommonName(s) from the cert.
       -        for rdnss in issuer:
       -            for rdns in rdnss:
       -                for name in rdns:
       -                    oid = name.getComponentByName('type')
       -                    value = name.getComponentByName('value')
       -
       -                    if oid == COMMON_NAME:
       -                        value = decoder.decode(value, asn1Spec=DirectoryString())[0]
       -                        value = decode_str(value.getComponent())
       -                        results['CN'] = value
       -
       -                    elif oid == OU_NAME:
       -                        value = decoder.decode(value, asn1Spec=DirectoryString())[0]
       -                        value = decode_str(value.getComponent())
       -                        results['OU'] = value
       -        return results
       -
       -    def extract_names(self):
       -        results = {'CN': None,
       -                   'DNS': set(),
       -                   'SRV': set(),
       -                   'URI': set(),
       -                   'XMPPAddr': set(),
       -                   'OU': None,}
       -
       -        # Extract the CommonName(s) from the cert.
       -        for rdnss in self.subject:
       -            for rdns in rdnss:
       -                for name in rdns:
       -                    oid = name.getComponentByName('type')
       -                    value = name.getComponentByName('value')
       -
       -                    if oid == COMMON_NAME:
       -                        value = decoder.decode(value, asn1Spec=DirectoryString())[0]
       -                        value = decode_str(value.getComponent())
       -                        results['CN'] = value
       -
       -                    elif oid == OU_NAME:
       -                        value = decoder.decode(value, asn1Spec=DirectoryString())[0]
       -                        value = decode_str(value.getComponent())
       -                        results['OU'] = value
       -
       -        # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
       -        for extension in self.extensions:
       -            oid = extension.getComponentByName('extnID')
       -            if oid != SUBJECT_ALT_NAME:
       -                continue
       -
       -            value = decoder.decode(extension.getComponentByName('extnValue'),
       -                               asn1Spec=OctetString())[0]
       -            sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
       -            for name in sa_names:
       -                name_type = name.getName()
       -                if name_type == 'dNSName':
       -                    results['DNS'].add(decode_str(name.getComponent()))
       -                if name_type == 'uniformResourceIdentifier':
       -                    value = decode_str(name.getComponent())
       -                    if value.startswith('xmpp:'):
       -                        results['URI'].add(value[5:])
       -                elif name_type == 'otherName':
       -                    name = name.getComponent()
       -
       -                    oid = name.getComponentByName('type-id')
       -                    value = name.getComponentByName('value')
       -
       -                    if oid == XMPP_ADDR:
       -                        value = decoder.decode(value, asn1Spec=UTF8String())[0]
       -                        results['XMPPAddr'].add(decode_str(value))
       -                    elif oid == SRV_NAME:
       -                        value = decoder.decode(value, asn1Spec=IA5String())[0]
       -                        results['SRV'].add(decode_str(value))
       -        return results
       +        return self.issuer.get('2.5.4.3')
        
       +    def get_signature(self):
       +        return self.cert_sig_algo, self.signature, self.data
        
            def check_ca(self):
       -        for extension in self.extensions:
       -            oid = extension.getComponentByName('extnID')
       -            if oid != id_ce_basicConstraints:
       -                continue
       -            value = decoder.decode(extension.getComponentByName('extnValue'),
       -                               asn1Spec=OctetString())[0]
       -            constraints = decoder.decode(value, asn1Spec=BasicConstraints())[0]
       -            return bool(constraints[0])
       -
       -    def extract_sig(self):
       -        signature = self.cert.getComponentByName('signatureValue')
       -        algorithm = self.cert.getComponentByName('signatureAlgorithm')
       -        data = encoder.encode(self.tbs)
       -        s = encoder.encode(signature)
       -        return algorithm, s, data
       -
       -
       -    def extract_pubkey(self):
       -        pki = self.tbs.getComponentByName('subjectPublicKeyInfo')
       -        algo = pki.getComponentByName('algorithm')
       -        algorithm = algo.getComponentByName('algorithm')
       -        parameters = algo.getComponentByName('parameters')
       -        subjectPublicKey = pki.getComponentByName('subjectPublicKey')
       -        return algorithm, parameters, encoder.encode(subjectPublicKey)
       -
       -
       -    def extract_dates(self):
       -        validity = self.tbs.getComponentByName('validity')
       -        not_before = validity.getComponentByName('notBefore')
       -        not_before = str(not_before.getComponent())
       -        not_after = validity.getComponentByName('notAfter')
       -        not_after = str(not_after.getComponent())
       -        if isinstance(not_before, GeneralizedTime):
       -            not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
       -        else:
       -            not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ')
       -        if isinstance(not_after, GeneralizedTime):
       -            not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
       -        else:
       -            not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ')
       -        return not_before, not_after
       -
       -    def get_ttl(self):
       -        not_before, not_after = self.extract_dates()
       -        if not_after is None:
       -            return None
       -        return not_after - datetime.utcnow()
       +        return self.CA
        
            def check_date(self):
       -        not_before, not_after = self.extract_dates()
       -        now = datetime.utcnow()
       +        import time
       +        now = time.time()
       +        TIMESTAMP_FMT = '%y%m%d%H%M%SZ'
       +        not_before = time.mktime(time.strptime(self.notBefore, TIMESTAMP_FMT))
       +        not_after = time.mktime(time.strptime(self.notAfter, TIMESTAMP_FMT))
                if not_before > now:
       -            raise CertificateError(
       -                'Certificate has not entered its valid date range.')
       +            raise CertificateError('Certificate has not entered its valid date range.')
                if not_after <= now:
       -            raise CertificateError(
       -                'Certificate has expired.')
       +            raise CertificateError('Certificate has expired.')
        
       -    def check_name(self, expected):
       -        cert_names = self.extract_names()
       -        if '.' in expected:
       -            expected_wild = expected[expected.index('.'):]
       -        else:
       -            expected_wild = expected
       -        expected_srv = '_xmpp-client.%s' % expected
       -        for name in cert_names['XMPPAddr']:
       -            if name == expected:
       -                return True
       -        for name in cert_names['SRV']:
       -            if name == expected_srv or name == expected:
       -                return True
       -        for name in cert_names['DNS']:
       -            if name == expected:
       -                return True
       -            if name.startswith('*'):
       -                if '.' in name:
       -                    name_wild = name[name.index('.'):]
       -                else:
       -                    name_wild = name
       -                if expected_wild == name_wild:
       -                    return True
       -        for name in cert_names['URI']:
       -            if name == expected:
       -                return True
       -        if cert_names['CN'] == expected:
       -            return True
       -        raise CertificateError(
       -            'Could not match certficate against hostname: %s' % expected)
        
        
        class X509CertChain(tlslite.X509CertChain):
       t@@ -253,6 +194,8 @@ class X509CertChain(tlslite.X509CertChain):
        
        
        
       +
       +@profiler
        def load_certificates(ca_path):
            ca_list = {}
            with open(ca_path, 'r') as f:
       t@@ -262,8 +205,11 @@ def load_certificates(ca_path):
                x = X509()
                try:
                    x.parseBinary(b)
       +            x.check_date()
                except Exception as e:
       -            util.print_error("cannot parse cert:", e)
       +            util.print_error("cert error:", e)
                    continue
       -        ca_list[x.getFingerprint()] = x
       +        ca_list[x.get_common_name()] = x
       +
       +
            return ca_list