-
Notifications
You must be signed in to change notification settings - Fork 0
/
verifyjwt.py
47 lines (38 loc) · 1.45 KB
/
verifyjwt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from ssl import PEM_cert_to_DER_cert
import jwt
from tornado.httpclient import AsyncHTTPClient
from tornado.escape import to_unicode
from Crypto.Util.asn1 import DerSequence
from Crypto.PublicKey import RSA
class GoogleIdToken:
_GOOGLE_CERTS_URI = 'https://www.googleapis.com/oauth2/v1/certs'
_GOOGLE_ISS_URI = 'accounts.google.com'
def __init__(self, jwt):
self._jwt = jwt
self.token = None
def is_valid(self, aud, iss=_GOOGLE_ISS_URI):
def validate(res):
certs = json.loads(to_unicode(res.body))
for pem in certs.values():
try:
token = jwt.decode(self._jwt, key=self._get_pubkey(pem))
except (jwt.DecodeError, jwt.ExpiredSignature): pass
else:
if token['aud'] == aud and token['iss'] == iss:
self.token = token
return True
return False
return self._get_certs(validate)
def _get_certs(self, callback):
return AsyncHTTPClient().fetch(GoogleIdToken._GOOGLE_CERTS_URI, callback)
def _get_pubkey(self, pem):
der = PEM_cert_to_DER_cert(pem)
cert = DerSequence()
cert.decode(der)
tbs_cert = DerSequence()
tbs_cert.decode(cert[0]) # TBSCertiFicate
pubkey_info = tbs_cert[6] # SubjectPublicKeyInfo
return RSA.importKey(pubkey_info)