This repository has been archived by the owner on Jun 4, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
helpers.py
204 lines (178 loc) · 8.29 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import base64
import decimal
import os
from xml.etree.ElementTree import Element, SubElement, ElementTree
import uuid
import cryptography
import signxml as sx
import simplejson as json
from Cryptodome.Hash import SHA256
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import pkcs1_15
from lxml import etree
MOCK_DGCA_PRIVATE_KEY = os.path.join(os.path.dirname(os.path.realpath(__file__)), "Resources", "dgca_private.pem")
MOCK_DGCA_CERTIFICATE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "Resources", "dgca.cert")
LOG_SCHEMA = os.path.join(os.path.join(os.path.dirname(os.path.realpath(__file__)), "LogSchema.json"))
def createArtifact(drone_uin, purpose, payloadWeight, payloadDetails,
startTime, endTime, coords, drone_pub_key_obj):
"""
Create a permission artefact for the required details. The artifact xml
will be generated and the XML root object is returned. Saving tto a file is
left to the user.
Notes -
* Recurring Time permission artefacts are not handled as the spec needs to finalize on that
:param str drone_uin: UIN of the RPAS (Not checked in some cases)
:param str purpose: Flight purpose text
:param str payloadWeight: payload wt in agreed upon units.
:param str payloadDetails: details of payload
:param datetime.datetime startTime: start time for the permission request
:param datetime.datetime endTime: END time for the permission request
:param List coords: List of coords in Long, Lat
:param drone_pub_key_obj: The public key file object of the RPAS for PIN
:return: The permission artefact XML root object. Ready to be signed or stored
"""
uap = Element('UAPermission',
{'lastUpdated': '',
'ttl': '',
'txnId': str(uuid.uuid4()),
'permissionArtifactId': str(uuid.uuid1())})
permission = SubElement(uap, 'Permission')
owner = SubElement(permission, 'Owner', {'operatorId': ''})
pilot = SubElement(owner, 'Pilot', {'uaplNo': '', 'validTo': ''})
flight_details = SubElement(permission, 'FlightDetails')
uad = SubElement(flight_details, 'UADetails', {'uinNo': drone_uin})
purpose = SubElement(flight_details, 'FlightPurpose',
{'shortDesc': purpose,
'frequency': ''})
payload = SubElement(flight_details, 'PayloadDetails',
{'payloadWeight': payloadWeight,
'payloadDetails': payloadDetails})
params = SubElement(flight_details,'FlightParameters',
{'flightStartTime': startTime.isoformat(),
'flightEndTime': endTime.isoformat(),
'frequenciesUsed': ''})
coordinates = SubElement(params, 'Coordinates')
for ordinate in coords:
SubElement(coordinates, 'Coordinate',
{'latitude': str(ordinate[1]),
'longitude': str(ordinate[0])})
return ElementTree(uap)
def sign_permission_artefact(xml_root, private_key=MOCK_DGCA_PRIVATE_KEY, certificate=MOCK_DGCA_CERTIFICATE):
"""
Sign the permission artefact xml with the private key stored .
Optionally pass a different key path to sign with a different key
:param xml_root: the xml root object to sign
:param private_key: the path to the private key to perform signing
:return: the signed XML root object. Note - this needs to be saved as a file
"""
root = xml_root.getroot()
with open(private_key, 'rb') as f, open(certificate) as c:
key = f.read()
cert = c.read()
signed_root = sx.XMLSigner()
ns = {}
ns[None] = signed_root.namespaces['ds']
signed_root.namespaces = ns
signed_root = signed_root.sign(root, key=key, cert=cert)
return signed_root
def verify_xml_signature(xml_file, certificate_path):
"""
Verify the signature of a given xml file against a certificate
:param path xml_file: path to the xml file for verification
:param certificate_path: path to the certificate to be used for verification
:return: bool: the success of verification
"""
# TODO - refactor such that this verifies for generic stuff
tree = etree.parse(xml_file)
root = tree.getroot()
with open(certificate_path) as f:
certificate = f.read()
# for per_tag in root.iter('UAPermission'):
# data_to_sign = per_tag
try:
verified_data = sx.XMLVerifier().verify(data=root, require_x509=True, x509_cert=certificate).signed_xml
# The file signature is authentic
return True
except cryptography.exceptions.InvalidSignature:
# print(verified_data)
# add the type of exception
return False
def verify_flight_log_signature_objs(log_object, public_key_obj):
"""
Verify the signature of the Flight log_object against a public key.
:param log_object: The flight log file object for verification
:param public_key_obj: The public key object to be verified against
:return: bool: True or False on success of verification
"""
json_data = json.loads(log_object, parse_float=decimal.Decimal)
flight_data_for_verification = json.dumps(json_data['FlightLog'], separators=(',',':')).encode()
signature = base64.b64decode(json_data['Signature'])
public_key_obj = RSA.import_key(public_key_obj)
sig_data = SHA256.new(bytes(flight_data_for_verification))
try:
pkcs1_15.new(public_key_obj).verify(sig_data, signature)
# The file signature is authentic
return True
except ValueError:
# The file signature is not authentic.
return False
def verify_flight_log_signature(flight_log, public_key):
"""
wrapper for flight log verification using paths instead of in memory objects
:param path flight_log: path JSON encoded NPNT compliant flight log
:param path public_key: path to RSA 2048 public key pem file
:return: bool: True or False on success of verification
"""
with open(flight_log) as a, open(public_key) as b:
return verify_flight_log_signature_objs(a.read(), b.read())
def sign_log(log_path, private_key_path, out_path=None):
"""
sample function to demonstrate how to sign a flight log.
This complements the verify_flight_log_signature methood
:param log_path: path to log file to be signed
:param private_key_path: path to the private key to be used for signing.
:param out_path: path to save the signed log file -
defaults to '-signed' added to it's name if no path is specified
:return: None
"""
with open(log_path, "rb") as log_obj, open(private_key_path) as key_ob:
jd = json.loads(log_obj.read(), parse_float=decimal.Decimal)
rsa_key = RSA.import_key(key_ob.read())
# print("__signdata to sha256 = __" + json.dumps((jd['FlightLog'])) + "__")
hashed_logdata = SHA256.new(json.dumps((jd['FlightLog']), separators=(',',':')).encode())
log_signature = pkcs1_15.new(rsa_key).sign(hashed_logdata)
# the signature is encoded in base64 for transport
enc = base64.b64encode(log_signature)
# dealing with python's byte string expression
jd['Signature'] = enc.decode('ascii')
if out_path:
save_path = out_path
else:
save_path = log_path[:-5] + "-signed.json"
with open(save_path, 'w') as outfile:
json.dump(jd, outfile, indent=4)
return save_path
def create_keys(folder, keyname):
"""
create a RSA 2048 keypair
:param folder: path to save the keypair
:param str keyname: name of the key pair to save as keyname_private.pem,and keyname_public.pem
:return:
"""
key = RSA.generate(2048)
private_key = key.export_key()
with open(os.path.join(folder, keyname + "_private.pem"), "wb") as file_out:
file_out.write(private_key)
public_key = key.publickey().export_key()
with open(os.path.join(folder, keyname + "_public.pem"), "wb") as file_out:
file_out.write(public_key)
def check_log_schema(logfile, schemafile=LOG_SCHEMA):
from jsonschema import validate, ValidationError
flightlog = json.loads(logfile)
with open(schemafile) as f:
logschema = json.loads(f.read())
try:
validate(instance=flightlog, schema=logschema)
return True
except ValidationError:
return False