URI: 
       tverify payment requests with tlslite and pyasn1 (pure python) instead of m2Crypto - electrum - Electrum Bitcoin wallet
  HTML git clone https://git.parazyd.org/electrum
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
       ---
   DIR commit 958f764daeee14327bcc3474aaf6e03b2db05cdf
   DIR parent 440f972fd3eab0cac7f486e1de8e028a34df3fb9
  HTML Author: ThomasV <thomasv@gitorious>
       Date:   Tue, 10 Jun 2014 14:32:17 +0200
       
       verify payment requests with tlslite and pyasn1 (pure python) instead of m2Crypto
       
       Diffstat:
         M lib/paymentrequest.py               |     163 +++++++++++++++++--------------
         A lib/x509.py                         |     214 +++++++++++++++++++++++++++++++
       
       2 files changed, 302 insertions(+), 75 deletions(-)
       ---
   DIR diff --git a/lib/paymentrequest.py b/lib/paymentrequest.py
       t@@ -1,3 +1,22 @@
       +#!/usr/bin/env python
       +#
       +# Electrum - lightweight Bitcoin client
       +# Copyright (C) 2014 Thomas Voegtlin
       +#
       +# This program is free software: you can redistribute it and/or modify
       +# it under the terms of the GNU General Public License as published by
       +# the Free Software Foundation, either version 3 of the License, or
       +# (at your option) any later version.
       +#
       +# This program is distributed in the hope that it will be useful,
       +# but WITHOUT ANY WARRANTY; without even the implied warranty of
       +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
       +# GNU General Public License for more details.
       +#
       +# You should have received a copy of the GNU General Public License
       +# along with this program. If not, see <http://www.gnu.org/licenses/>.
       +
       +
        import hashlib
        import httplib
        import os.path
       t@@ -24,6 +43,7 @@ except ImportError:
        import bitcoin
        import util
        import transaction
       +import x509
        
        
        REQUEST_HEADERS = {'Accept': 'application/bitcoin-paymentrequest', 'User-Agent': 'Electrum'}
       t@@ -39,12 +59,10 @@ PR_ERROR   = 4     # could not parse
        
        ca_list = {}
        
       +
       +
       +
        def load_certificates():
       -    try:
       -        from M2Crypto import X509
       -    except:
       -        print_error("ERROR: Could not import M2Crypto")
       -        return False
        
            ca_path = os.path.expanduser("~/.electrum/ca/ca-bundle.crt")
            try:
       t@@ -62,14 +80,20 @@ def load_certificates():
                else:
                    c += line
                if line == "-----END CERTIFICATE-----\n":
       -            x = X509.load_cert_string(c)
       -            ca_list[x.get_fingerprint()] = x
       +            x = x509.X509()
       +            try:
       +                x.parse(c)
       +            except Exception as e:
       +                print "cannot parse cert:", e
       +            ca_list[x.getFingerprint()] = x
            ca_f.close()
       +    util.print_error("%d certificates"%len(ca_list))
            return True
        
        load_certificates()
        
        
       +
        class PaymentRequest:
            def __init__(self, config):
                self.config = config
       t@@ -130,19 +154,13 @@ class PaymentRequest:
        
        
            def verify(self):
       -        try:
       -            from M2Crypto import X509
       -        except:
       -            self.error = "cannot import M2Crypto"
       -            return False
        
                if not ca_list:
                    self.error = "Trusted certificate authorities list not found"
                    return False
        
                paymntreq = self.data
       -        sig = paymntreq.signature
       -        if not sig:
       +        if not paymntreq.signature:
                    self.error = "No signature"
                    return 
        
       t@@ -150,86 +168,79 @@ class PaymentRequest:
                cert.ParseFromString(paymntreq.pki_data)
                cert_num = len(cert.certificate)
        
       -        x509_1 = X509.load_cert_der_string(cert.certificate[0])
       -        if self.domain != x509_1.get_subject().CN:
       -            validcert = False
       -            try:
       -                SANs = x509_1.get_ext("subjectAltName").get_value().split(",")
       -                for s in SANs:
       -                    s = s.strip()
       -                    if s.startswith("DNS:") and s[4:] == self.domain:
       -                        validcert = True
       -                        print "Match SAN DNS"
       -                    elif s.startswith("IP:") and s[3:] == self.domain:
       -                        validcert = True
       -                        print "Match SAN IP"
       -                    elif s.startswith("email:") and s[6:] == self.domain:
       -                        validcert = True
       -                        print "Match SAN email"
       -            except Exception, e:
       -                print "ERROR: No SAN data"
       -            if not validcert:
       -                ###TODO: check for wildcards
       -                self.error = "ERROR: Certificate Subject Domain Mismatch and SAN Mismatch"
       -                return
       -
       -        x509 = []
       -        CA_OU = ''
       -
       -        if cert_num > 1:
       -            for i in range(cert_num - 1):
       -                x509.append(X509.load_cert_der_string(cert.certificate[i+1]))
       -                if x509[i].check_ca() == 0:
       +        x509_chain = []
       +        for i in range(cert_num):
       +            x = x509.X509()
       +            x.parseBinary(bytearray(cert.certificate[i]))
       +            x.slow_parse()
       +            x509_chain.append(x)
       +            if i == 0:
       +                if not x.check_name(self.domain):
       +                    self.error = "Certificate Domain Mismatch"
       +                    return
       +            else:
       +                if not x.check_ca():
                            self.error = "ERROR: Supplied CA Certificate Error"
                            return
       -            for i in range(cert_num - 1):
       -                if i == 0:
       -                    if x509_1.verify(x509[i].get_pubkey()) != 1:
       -                        self.error = "ERROR: Certificate not Signed by Provided CA Certificate Chain"
       -                        return
       -                else:
       -                    if x509[i-1].verify(x509[i].get_pubkey()) != 1:
       -                        self.error = "ERROR: CA Certificate not Signed by Provided CA Certificate Chain"
       -                        return
       -
       -            supplied_CA_fingerprint = x509[cert_num-2].get_fingerprint()
       -            supplied_CA_CN = x509[cert_num-2].get_subject().CN
       -            CA_match = False
        
       -            x = ca_list.get(supplied_CA_fingerprint)
       -            if x:
       -                CA_OU = x.get_subject().OU
       -                CA_match = True
       -                if x.get_subject().CN != supplied_CA_CN:
       -                    print "ERROR: Trusted CA CN Mismatch; however CA has trusted fingerprint"
       -                    print "Payment will continue with manual verification."
       -            else:
       -                print "ERROR: Supplied CA Not Found in Trusted CA Store."
       -                print "Payment will continue with manual verification."
       -        else:
       +        if not cert_num > 1:
                    self.error = "ERROR: CA Certificate Chain Not Provided by Payment Processor"
                    return False
        
       +        for i in range(1, cert_num):
       +            x = x509_chain[i]
       +            prev_x = x509_chain[i-1]
       +
       +            algo, sig, data = prev_x.extract_sig()
       +            if algo.getComponentByName('algorithm') != x509.ALGO_RSA_SHA1:
       +                self.error = "Algorithm not suported"
       +                return
       +
       +            sig = bytearray(sig[5:])
       +            pubkey = x.publicKey
       +            verify = pubkey.hashAndVerify(sig, data)
       +            if not verify:
       +                self.error = "Certificate not Signed by Provided CA Certificate Chain"
       +                return
       +
       +        ca = x509_chain[cert_num-1]
       +        supplied_CA_fingerprint = ca.getFingerprint()
       +        supplied_CA_names = ca.extract_names()
       +        CA_OU = supplied_CA_names['OU']
       +
       +        x = ca_list.get(supplied_CA_fingerprint)
       +        if x:
       +            x.slow_parse()
       +            names = x.extract_names()
       +            CA_match = True
       +            if names['CN'] != supplied_CA_names['CN']:
       +                print "ERROR: Trusted CA CN Mismatch; however CA has trusted fingerprint"
       +                print "Payment will continue with manual verification."
       +        else:
       +            CA_match = False
       +
       +        pubkey0 = x509_chain[0].publicKey
       +        sig = paymntreq.signature
                paymntreq.signature = ''
                s = paymntreq.SerializeToString()
       -        pubkey_1 = x509_1.get_pubkey()
       +        sigBytes = bytearray(sig)
       +        msgBytes = bytearray(s)
        
                if paymntreq.pki_type == "x509+sha256":
       -            pubkey_1.reset_context(md="sha256")
       +            hashBytes = bytearray(hashlib.sha256(msgBytes).digest())
       +            prefixBytes = bytearray([0x30,0x31,0x30,0x0d,0x06,0x09,0x60,0x86,0x48,0x01,0x65,0x03,0x04,0x02,0x01,0x05,0x00,0x04,0x20])
       +            verify = pubkey0.verify(sigBytes, prefixBytes + hashBytes)
                elif paymntreq.pki_type == "x509+sha1":
       -            pubkey_1.reset_context(md="sha1")
       +            verify = pubkey0.hashAndVerify(sigBytes, msgBytes)
                else:
                    self.error = "ERROR: Unsupported PKI Type for Message Signature"
                    return False
        
       -        pubkey_1.verify_init()
       -        pubkey_1.verify_update(s)
       -        if pubkey_1.verify_final(sig) != 1:
       +        if not verify:
                    self.error = "ERROR: Invalid Signature for Payment Request Data"
                    return False
        
                ### SIG Verified
       -
                self.details = pay_det = paymentrequest_pb2.PaymentDetails()
                self.details.ParseFromString(paymntreq.serialized_payment_details)
        
       t@@ -241,6 +252,8 @@ class PaymentRequest:
        
                if CA_match:
                    self.status = 'Signed by Trusted CA:\n' + CA_OU
       +        else:
       +            self.status = "Supplied CA Not Found in Trusted CA Store."
        
                self.payment_url = self.details.payment_url
        
   DIR diff --git a/lib/x509.py b/lib/x509.py
       t@@ -0,0 +1,214 @@
       +#!/usr/bin/env python
       +#
       +# Electrum - lightweight Bitcoin client
       +# Copyright (C) 2014 Thomas Voegtlin
       +#
       +# This program is free software: you can redistribute it and/or modify
       +# it under the terms of the GNU General Public License as published by
       +# the Free Software Foundation, either version 3 of the License, or
       +# (at your option) any later version.
       +#
       +# This program is distributed in the hope that it will be useful,
       +# but WITHOUT ANY WARRANTY; without even the implied warranty of
       +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
       +# GNU General Public License for more details.
       +#
       +# You should have received a copy of the GNU General Public License
       +# along with this program. If not, see <http://www.gnu.org/licenses/>.
       +
       +
       +from datetime import datetime, timedelta
       +
       +try:
       +    import pyasn1
       +except ImportError:
       +    sys.exit("Error: pyasn1 does not seem to be installed. Try 'sudo pip install pyasn1'")
       +
       +try:
       +    import tlslite
       +except ImportError:
       +    sys.exit("Error: tlslite does not seem to be installed. Try 'sudo pip install tlslite'")
       +
       +
       +
       +from pyasn1.codec.der import decoder, encoder
       +from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
       +from pyasn1.type.char import BMPString, IA5String, UTF8String
       +from pyasn1.type.useful import GeneralizedTime
       +from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
       +                                    SubjectAltName, GeneralNames,
       +                                    GeneralName)
       +from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
       +from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
       +from pyasn1_modules.rfc2459 import id_at_organizationalUnitName as OU_NAME
       +from pyasn1_modules.rfc2459 import id_ce_basicConstraints, BasicConstraints
       +XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
       +SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
       +ALGO_RSA_SHA1 = ObjectIdentifier('1.2.840.113549.1.1.5')
       +
       +
       +class CertificateError(Exception):
       +    pass
       +
       +def decode_str(data):
       +    encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
       +    return bytes(data).decode(encoding)
       +
       +
       +class X509(tlslite.X509):
       +    """ Child class of tlslite.X509 that uses pyasn1 """
       +
       +    def slow_parse(self):
       +        self.cert = decoder.decode(str(self.bytes), asn1Spec=Certificate())[0]
       +        self.tbs = self.cert.getComponentByName('tbsCertificate')
       +        self.subject = self.tbs.getComponentByName('subject')
       +        self.extensions = self.tbs.getComponentByName('extensions') or []
       +
       +    def extract_names(self):
       +        results = {'CN': None,
       +                   'DNS': set(),
       +                   'SRV': set(),
       +                   'URI': set(),
       +                   'XMPPAddr': set(), 
       +                   'OU': None,}
       +  
       +        # Extract the CommonName(s) from the cert.
       +        for rdnss in self.subject:
       +            for rdns in rdnss:
       +                for name in rdns:
       +                    oid = name.getComponentByName('type')
       +                    value = name.getComponentByName('value')
       +  
       +                    if oid == COMMON_NAME:
       +                        value = decoder.decode(value, asn1Spec=DirectoryString())[0]
       +                        value = decode_str(value.getComponent())
       +                        results['CN'] = value
       +
       +                    elif oid == OU_NAME:
       +                        value = decoder.decode(value, asn1Spec=DirectoryString())[0]
       +                        value = decode_str(value.getComponent())
       +                        results['OU'] = value
       +
       +        # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
       +        for extension in self.extensions:
       +            oid = extension.getComponentByName('extnID')
       +            if oid != SUBJECT_ALT_NAME:
       +                continue
       +  
       +            value = decoder.decode(extension.getComponentByName('extnValue'),
       +                               asn1Spec=OctetString())[0]
       +            sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
       +            for name in sa_names:
       +                name_type = name.getName()
       +                if name_type == 'dNSName':
       +                    results['DNS'].add(decode_str(name.getComponent()))
       +                if name_type == 'uniformResourceIdentifier':
       +                    value = decode_str(name.getComponent())
       +                    if value.startswith('xmpp:'):
       +                        results['URI'].add(value[5:])
       +                elif name_type == 'otherName':
       +                    name = name.getComponent()
       +  
       +                    oid = name.getComponentByName('type-id')
       +                    value = name.getComponentByName('value')
       +  
       +                    if oid == XMPP_ADDR:
       +                        value = decoder.decode(value, asn1Spec=UTF8String())[0]
       +                        results['XMPPAddr'].add(decode_str(value))
       +                    elif oid == SRV_NAME:
       +                        value = decoder.decode(value, asn1Spec=IA5String())[0]
       +                        results['SRV'].add(decode_str(value))
       +        return results
       +
       +
       +    def check_ca(self):
       +        for extension in self.extensions:
       +            oid = extension.getComponentByName('extnID')
       +            if oid != id_ce_basicConstraints:
       +                continue
       +            value = decoder.decode(extension.getComponentByName('extnValue'),
       +                               asn1Spec=OctetString())[0]
       +            constraints = decoder.decode(value, asn1Spec=BasicConstraints())[0]
       +            return bool(constraints[0])
       +
       +    def extract_sig(self):
       +        signature = self.cert.getComponentByName('signatureValue')
       +        algorithm = self.cert.getComponentByName('signatureAlgorithm')
       +        data = encoder.encode(self.tbs)
       +        s = encoder.encode(signature)
       +        return algorithm, s, data
       +
       +
       +    def extract_pubkey(self):
       +        pki = self.tbs.getComponentByName('subjectPublicKeyInfo')
       +        algo = pki.getComponentByName('algorithm')
       +        algorithm = algo.getComponentByName('algorithm')
       +        parameters = algo.getComponentByName('parameters')
       +        subjectPublicKey = pki.getComponentByName('subjectPublicKey')
       +        return algorithm, parameters, encoder.encode(subjectPublicKey)
       +
       +
       +    def extract_dates(self):
       +        validity = self.tbs.getComponentByName('validity')
       +        not_before = validity.getComponentByName('notBefore')
       +        not_before = str(not_before.getComponent())
       +        not_after = validity.getComponentByName('notAfter')
       +        not_after = str(not_after.getComponent())
       +        if isinstance(not_before, GeneralizedTime):
       +            not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
       +        else:
       +            not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ')
       +        if isinstance(not_after, GeneralizedTime):
       +            not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
       +        else:
       +            not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ')
       +        return not_before, not_after
       +
       +    def get_ttl(self):
       +        not_before, not_after = self.extract_dates()
       +        if not_after is None:
       +            return None
       +        return not_after - datetime.utcnow()
       +
       +    def check_name(self, expected):
       +        not_before, not_after = self.extract_dates()
       +        cert_names = self.extract_names()
       +        now = datetime.utcnow()
       +        if not_before > now:
       +            raise CertificateError(
       +                'Certificate has not entered its valid date range.')
       +        if not_after <= now:
       +            raise CertificateError(
       +                'Certificate has expired.')
       +        if '.' in expected:
       +            expected_wild = expected[expected.index('.'):]
       +        else:
       +            expected_wild = expected
       +        expected_srv = '_xmpp-client.%s' % expected
       +        for name in cert_names['XMPPAddr']:
       +            if name == expected:
       +                return True
       +        for name in cert_names['SRV']:
       +            if name == expected_srv or name == expected:
       +                return True
       +        for name in cert_names['DNS']:
       +            if name == expected:
       +                return True
       +            if name.startswith('*'):
       +                if '.' in name:
       +                    name_wild = name[name.index('.'):]
       +                else:
       +                    name_wild = name
       +                if expected_wild == name_wild:
       +                    return True
       +        for name in cert_names['URI']:
       +            if name == expected:
       +                return True
       +        if cert_names['CN'] == expected:
       +            return True
       +        raise CertificateError(
       +            'Could not match certficate against hostname: %s' % expected)
       +
       +
       +class X509CertChain(tlslite.X509CertChain):
       +    pass