tx509.py - electrum - Electrum Bitcoin wallet
HTML git clone https://git.parazyd.org/electrum
DIR Log
DIR Files
DIR Refs
DIR Submodules
---
tx509.py (11489B)
---
1 #!/usr/bin/env python
2 #
3 # Electrum - lightweight Bitcoin client
4 # Copyright (C) 2014 Thomas Voegtlin
5 #
6 # Permission is hereby granted, free of charge, to any person
7 # obtaining a copy of this software and associated documentation files
8 # (the "Software"), to deal in the Software without restriction,
9 # including without limitation the rights to use, copy, modify, merge,
10 # publish, distribute, sublicense, and/or sell copies of the Software,
11 # and to permit persons to whom the Software is furnished to do so,
12 # subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be
15 # included in all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24 # SOFTWARE.
25
26 import hashlib
27 import time
28 from datetime import datetime
29
30 from . import util
31 from .util import profiler, bh2u
32 from .logging import get_logger
33
34
35 _logger = get_logger(__name__)
36
37
38 # algo OIDs
39 ALGO_RSA_SHA1 = '1.2.840.113549.1.1.5'
40 ALGO_RSA_SHA256 = '1.2.840.113549.1.1.11'
41 ALGO_RSA_SHA384 = '1.2.840.113549.1.1.12'
42 ALGO_RSA_SHA512 = '1.2.840.113549.1.1.13'
43 ALGO_ECDSA_SHA256 = '1.2.840.10045.4.3.2'
44
45 # prefixes, see http://stackoverflow.com/questions/3713774/c-sharp-how-to-calculate-asn-1-der-encoding-of-a-particular-hash-algorithm
46 PREFIX_RSA_SHA256 = bytearray(
47 [0x30, 0x31, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01, 0x05, 0x00, 0x04, 0x20])
48 PREFIX_RSA_SHA384 = bytearray(
49 [0x30, 0x41, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x02, 0x05, 0x00, 0x04, 0x30])
50 PREFIX_RSA_SHA512 = bytearray(
51 [0x30, 0x51, 0x30, 0x0d, 0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03, 0x05, 0x00, 0x04, 0x40])
52
53 # types used in ASN1 structured data
54 ASN1_TYPES = {
55 'BOOLEAN' : 0x01,
56 'INTEGER' : 0x02,
57 'BIT STRING' : 0x03,
58 'OCTET STRING' : 0x04,
59 'NULL' : 0x05,
60 'OBJECT IDENTIFIER': 0x06,
61 'SEQUENCE' : 0x70,
62 'SET' : 0x71,
63 'PrintableString' : 0x13,
64 'IA5String' : 0x16,
65 'UTCTime' : 0x17,
66 'GeneralizedTime' : 0x18,
67 'ENUMERATED' : 0x0A,
68 'UTF8String' : 0x0C,
69 }
70
71
72 class CertificateError(Exception):
73 pass
74
75
76 # helper functions
77 def bitstr_to_bytestr(s):
78 if s[0] != 0x00:
79 raise TypeError('no padding')
80 return s[1:]
81
82
83 def bytestr_to_int(s):
84 i = 0
85 for char in s:
86 i <<= 8
87 i |= char
88 return i
89
90
91 def decode_OID(s):
92 r = []
93 r.append(s[0] // 40)
94 r.append(s[0] % 40)
95 k = 0
96 for i in s[1:]:
97 if i < 128:
98 r.append(i + 128 * k)
99 k = 0
100 else:
101 k = (i - 128) + 128 * k
102 return '.'.join(map(str, r))
103
104
105 def encode_OID(oid):
106 x = [int(i) for i in oid.split('.')]
107 s = chr(x[0] * 40 + x[1])
108 for i in x[2:]:
109 ss = chr(i % 128)
110 while i > 128:
111 i //= 128
112 ss = chr(128 + i % 128) + ss
113 s += ss
114 return s
115
116
117 class ASN1_Node(bytes):
118 def get_node(self, ix):
119 # return index of first byte, first content byte and last byte.
120 first = self[ix + 1]
121 if (first & 0x80) == 0:
122 length = first
123 ixf = ix + 2
124 ixl = ixf + length - 1
125 else:
126 lengthbytes = first & 0x7F
127 length = bytestr_to_int(self[ix + 2:ix + 2 + lengthbytes])
128 ixf = ix + 2 + lengthbytes
129 ixl = ixf + length - 1
130 return ix, ixf, ixl
131
132 def root(self):
133 return self.get_node(0)
134
135 def next_node(self, node):
136 ixs, ixf, ixl = node
137 return self.get_node(ixl + 1)
138
139 def first_child(self, node):
140 ixs, ixf, ixl = node
141 if self[ixs] & 0x20 != 0x20:
142 raise TypeError('Can only open constructed types.', hex(self[ixs]))
143 return self.get_node(ixf)
144
145 def is_child_of(node1, node2):
146 ixs, ixf, ixl = node1
147 jxs, jxf, jxl = node2
148 return ((ixf <= jxs) and (jxl <= ixl)) or ((jxf <= ixs) and (ixl <= jxl))
149
150 def get_all(self, node):
151 # return type + length + value
152 ixs, ixf, ixl = node
153 return self[ixs:ixl + 1]
154
155 def get_value_of_type(self, node, asn1_type):
156 # verify type byte and return content
157 ixs, ixf, ixl = node
158 if ASN1_TYPES[asn1_type] != self[ixs]:
159 raise TypeError('Wrong type:', hex(self[ixs]), hex(ASN1_TYPES[asn1_type]))
160 return self[ixf:ixl + 1]
161
162 def get_value(self, node):
163 ixs, ixf, ixl = node
164 return self[ixf:ixl + 1]
165
166 def get_children(self, node):
167 nodes = []
168 ii = self.first_child(node)
169 nodes.append(ii)
170 while ii[2] < node[2]:
171 ii = self.next_node(ii)
172 nodes.append(ii)
173 return nodes
174
175 def get_sequence(self):
176 return list(map(lambda j: self.get_value(j), self.get_children(self.root())))
177
178 def get_dict(self, node):
179 p = {}
180 for ii in self.get_children(node):
181 for iii in self.get_children(ii):
182 iiii = self.first_child(iii)
183 oid = decode_OID(self.get_value_of_type(iiii, 'OBJECT IDENTIFIER'))
184 iiii = self.next_node(iiii)
185 value = self.get_value(iiii)
186 p[oid] = value
187 return p
188
189 def decode_time(self, ii):
190 GENERALIZED_TIMESTAMP_FMT = '%Y%m%d%H%M%SZ'
191 UTCTIME_TIMESTAMP_FMT = '%y%m%d%H%M%SZ'
192
193 try:
194 return time.strptime(self.get_value_of_type(ii, 'UTCTime').decode('ascii'), UTCTIME_TIMESTAMP_FMT)
195 except TypeError:
196 return time.strptime(self.get_value_of_type(ii, 'GeneralizedTime').decode('ascii'), GENERALIZED_TIMESTAMP_FMT)
197
198 class X509(object):
199 def __init__(self, b):
200
201 self.bytes = bytearray(b)
202
203 der = ASN1_Node(b)
204 root = der.root()
205 cert = der.first_child(root)
206 # data for signature
207 self.data = der.get_all(cert)
208
209 # optional version field
210 if der.get_value(cert)[0] == 0xa0:
211 version = der.first_child(cert)
212 serial_number = der.next_node(version)
213 else:
214 serial_number = der.first_child(cert)
215 self.serial_number = bytestr_to_int(der.get_value_of_type(serial_number, 'INTEGER'))
216
217 # signature algorithm
218 sig_algo = der.next_node(serial_number)
219 ii = der.first_child(sig_algo)
220 self.sig_algo = decode_OID(der.get_value_of_type(ii, 'OBJECT IDENTIFIER'))
221
222 # issuer
223 issuer = der.next_node(sig_algo)
224 self.issuer = der.get_dict(issuer)
225
226 # validity
227 validity = der.next_node(issuer)
228 ii = der.first_child(validity)
229 self.notBefore = der.decode_time(ii)
230 ii = der.next_node(ii)
231 self.notAfter = der.decode_time(ii)
232
233 # subject
234 subject = der.next_node(validity)
235 self.subject = der.get_dict(subject)
236 subject_pki = der.next_node(subject)
237 public_key_algo = der.first_child(subject_pki)
238 ii = der.first_child(public_key_algo)
239 self.public_key_algo = decode_OID(der.get_value_of_type(ii, 'OBJECT IDENTIFIER'))
240
241 if self.public_key_algo != '1.2.840.10045.2.1': # for non EC public key
242 # pubkey modulus and exponent
243 subject_public_key = der.next_node(public_key_algo)
244 spk = der.get_value_of_type(subject_public_key, 'BIT STRING')
245 spk = ASN1_Node(bitstr_to_bytestr(spk))
246 r = spk.root()
247 modulus = spk.first_child(r)
248 exponent = spk.next_node(modulus)
249 rsa_n = spk.get_value_of_type(modulus, 'INTEGER')
250 rsa_e = spk.get_value_of_type(exponent, 'INTEGER')
251 self.modulus = int.from_bytes(rsa_n, byteorder='big', signed=False)
252 self.exponent = int.from_bytes(rsa_e, byteorder='big', signed=False)
253 else:
254 subject_public_key = der.next_node(public_key_algo)
255 spk = der.get_value_of_type(subject_public_key, 'BIT STRING')
256 self.ec_public_key = spk
257
258 # extensions
259 self.CA = False
260 self.AKI = None
261 self.SKI = None
262 i = subject_pki
263 while i[2] < cert[2]:
264 i = der.next_node(i)
265 d = der.get_dict(i)
266 for oid, value in d.items():
267 value = ASN1_Node(value)
268 if oid == '2.5.29.19':
269 # Basic Constraints
270 self.CA = bool(value)
271 elif oid == '2.5.29.14':
272 # Subject Key Identifier
273 r = value.root()
274 value = value.get_value_of_type(r, 'OCTET STRING')
275 self.SKI = bh2u(value)
276 elif oid == '2.5.29.35':
277 # Authority Key Identifier
278 self.AKI = bh2u(value.get_sequence()[0])
279 else:
280 pass
281
282 # cert signature
283 cert_sig_algo = der.next_node(cert)
284 ii = der.first_child(cert_sig_algo)
285 self.cert_sig_algo = decode_OID(der.get_value_of_type(ii, 'OBJECT IDENTIFIER'))
286 cert_sig = der.next_node(cert_sig_algo)
287 self.signature = der.get_value(cert_sig)[1:]
288
289 def get_keyID(self):
290 # http://security.stackexchange.com/questions/72077/validating-an-ssl-certificate-chain-according-to-rfc-5280-am-i-understanding-th
291 return self.SKI if self.SKI else repr(self.subject)
292
293 def get_issuer_keyID(self):
294 return self.AKI if self.AKI else repr(self.issuer)
295
296 def get_common_name(self):
297 return self.subject.get('2.5.4.3', b'unknown').decode()
298
299 def get_signature(self):
300 return self.cert_sig_algo, self.signature, self.data
301
302 def check_ca(self):
303 return self.CA
304
305 def check_date(self):
306 now = time.gmtime()
307 if self.notBefore > now:
308 raise CertificateError('Certificate has not entered its valid date range. (%s)' % self.get_common_name())
309 if self.notAfter <= now:
310 dt = datetime.utcfromtimestamp(time.mktime(self.notAfter))
311 raise CertificateError(f'Certificate ({self.get_common_name()}) has expired (at {dt} UTC).')
312
313 def getFingerprint(self):
314 return hashlib.sha1(self.bytes).digest()
315
316
317 @profiler
318 def load_certificates(ca_path):
319 from . import pem
320 ca_list = {}
321 ca_keyID = {}
322 # ca_path = '/tmp/tmp.txt'
323 with open(ca_path, 'r', encoding='utf-8') as f:
324 s = f.read()
325 bList = pem.dePemList(s, "CERTIFICATE")
326 for b in bList:
327 try:
328 x = X509(b)
329 x.check_date()
330 except BaseException as e:
331 # with open('/tmp/tmp.txt', 'w') as f:
332 # f.write(pem.pem(b, 'CERTIFICATE').decode('ascii'))
333 _logger.info(f"cert error: {e}")
334 continue
335
336 fp = x.getFingerprint()
337 ca_list[fp] = x
338 ca_keyID[x.get_keyID()] = fp
339
340 return ca_list, ca_keyID
341
342
343 if __name__ == "__main__":
344 import certifi
345
346 ca_path = certifi.where()
347 ca_list, ca_keyID = load_certificates(ca_path)