Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-License-Identifier: Apache-2.0 

2 

3# Copyright 2020 Contributors to OpenLEADR 

4 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8 

9# http://www.apache.org/licenses/LICENSE-2.0 

10 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17from lxml import etree 

18import xmltodict 

19from jinja2 import Environment, PackageLoader 

20from signxml import XMLSigner, XMLVerifier, methods 

21from uuid import uuid4 

22from lxml.etree import Element 

23from openleadr import errors 

24from datetime import datetime, timezone, timedelta 

25import os 

26 

27from openleadr import utils 

28from .preflight import preflight_message 

29 

30import logging 

31logger = logging.getLogger('openleadr') 

32 

33SIGNER = XMLSigner(method=methods.detached, 

34 c14n_algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315") 

35SIGNER.namespaces['oadr'] = "http://openadr.org/oadr-2.0b/2012/07" 

36VERIFIER = XMLVerifier() 

37 

38XML_SCHEMA_LOCATION = os.path.join(os.path.dirname(__file__), 'schema', 'oadr_20b.xsd') 

39 

40with open(XML_SCHEMA_LOCATION) as file: 

41 XML_SCHEMA = etree.XMLSchema(etree.parse(file)) 

42XML_PARSER = etree.XMLParser(schema=XML_SCHEMA) 

43 

44 

45def parse_message(data): 

46 """ 

47 Parse a message and distill its usable parts. Returns a message type and payload. 

48 :param data str: The XML string that is received 

49 

50 Returns a message type (str) and a message payload (dict) 

51 """ 

52 message_dict = xmltodict.parse(data, process_namespaces=True, namespaces=NAMESPACES) 

53 message_type, message_payload = message_dict['oadrPayload']['oadrSignedObject'].popitem() 

54 message_payload = utils.normalize_dict(message_payload) 

55 return message_type, message_payload 

56 

57 

58def create_message(message_type, cert=None, key=None, passphrase=None, **message_payload): 

59 """ 

60 Create and optionally sign an OpenADR message. Returns an XML string. 

61 """ 

62 message_payload = preflight_message(message_type, message_payload) 

63 template = TEMPLATES.get_template(f'{message_type}.xml') 

64 signed_object = utils.flatten_xml(template.render(**message_payload)) 

65 envelope = TEMPLATES.get_template('oadrPayload.xml') 

66 if cert and key: 

67 tree = etree.fromstring(signed_object) 

68 signature_tree = SIGNER.sign(tree, 

69 key=key, 

70 cert=cert, 

71 passphrase=utils.ensure_bytes(passphrase), 

72 reference_uri="#oadrSignedObject", 

73 signature_properties=_create_replay_protect()) 

74 signature = etree.tostring(signature_tree).decode('utf-8') 

75 else: 

76 signature = None 

77 

78 msg = envelope.render(template=f'{message_type}', 

79 signature=signature, 

80 signed_object=signed_object) 

81 return msg 

82 

83 

84def validate_xml_schema(content): 

85 """ 

86 Validates the XML tree against the schema. Return the XML tree. 

87 """ 

88 if isinstance(content, str): 

89 content = content.encode('utf-8') 

90 tree = etree.fromstring(content, XML_PARSER) 

91 return tree 

92 

93 

94def validate_xml_signature(xml_tree, cert_fingerprint=None): 

95 """ 

96 Validate the XMLDSIG signature and the ReplayProtect element. 

97 """ 

98 cert = utils.extract_pem_cert(xml_tree) 

99 if cert_fingerprint: 

100 fingerprint = utils.certificate_fingerprint(cert) 

101 if fingerprint != cert_fingerprint: 

102 raise errors.FingerprintMismatch("The certificate fingerprint was incorrect. " 

103 f"Expected: {cert_fingerprint};" 

104 f"Received: {fingerprint}") 

105 VERIFIER.verify(xml_tree, x509_cert=utils.ensure_bytes(cert), expect_references=2) 

106 _verify_replay_protect(xml_tree) 

107 

108 

109async def authenticate_message(request, message_tree, message_payload, fingerprint_lookup=None, ven_lookup=None): 

110 if request.secure and 'ven_id' in message_payload: 

111 connection_fingerprint = utils.get_cert_fingerprint_from_request(request) 

112 if connection_fingerprint is None: 

113 msg = ("Your request must use a client side SSL certificate, of which the " 

114 "fingerprint must match the fingerprint that you have given to this VTN.") 

115 raise errors.NotRegisteredOrAuthorizedError(msg) 

116 

117 try: 

118 ven_id = message_payload.get('ven_id') 

119 if fingerprint_lookup: 

120 expected_fingerprint = await utils.await_if_required(fingerprint_lookup(ven_id)) 

121 if not expected_fingerprint: 

122 raise ValueError 

123 elif ven_lookup: 

124 ven_info = await utils.await_if_required(ven_lookup(ven_id)) 

125 if not ven_info: 

126 raise ValueError 

127 expected_fingerprint = ven_info.get('fingerprint') 

128 except ValueError: 

129 msg = (f"Your venID {ven_id} is not known to this VTN. Make sure you use the venID " 

130 "that you receive from this VTN during the registration step") 

131 raise errors.NotRegisteredOrAuthorizedError(msg) 

132 

133 if expected_fingerprint is None: 

134 msg = ("This VTN server does not know what your certificate fingerprint is. Please " 

135 "deliver your fingerprint to the VTN (outside of OpenADR). You used the " 

136 "following fingerprint to make this request:") 

137 raise errors.NotRegisteredOrAuthorizedError(msg) 

138 

139 if connection_fingerprint != expected_fingerprint: 

140 msg = (f"The fingerprint of your HTTPS certificate '{connection_fingerprint}' " 

141 f"does not match the expected fingerprint '{expected_fingerprint}'") 

142 raise errors.NotRegisteredOrAuthorizedError(msg) 

143 

144 message_cert = utils.extract_pem_cert(message_tree) 

145 message_fingerprint = utils.certificate_fingerprint(message_cert) 

146 if message_fingerprint != expected_fingerprint: 

147 msg = (f"The fingerprint of the certificate used to sign the message " 

148 f"{message_fingerprint} did not match the fingerprint that this " 

149 f"VTN has for you {expected_fingerprint}. Make sure you use the correct " 

150 "certificate to sign your messages.") 

151 raise errors.NotRegisteredOrAuthorizedError(msg) 

152 

153 try: 

154 validate_xml_signature(message_tree) 

155 except ValueError: 

156 msg = ("The message signature did not match the message contents. Please make sure " 

157 "you are using the correct XMLDSig algorithm and C14n canonicalization.") 

158 raise errors.NotRegisteredOrAuthorizedError(msg) 

159 

160 

161def _create_replay_protect(): 

162 dt_element = Element("{http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties}timestamp") 

163 dt_element.text = utils.datetimeformat(datetime.now(timezone.utc)) 

164 

165 nonce_element = Element("{http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties}nonce") 

166 nonce_element.text = uuid4().hex 

167 

168 el = Element("{http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties}ReplayProtect", 

169 nsmap={'dsp': 'http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties'}, 

170 attrib={'Id': 'myid', 'Target': '#mytarget'}) 

171 el.append(dt_element) 

172 el.append(nonce_element) 

173 return el 

174 

175 

176def _verify_replay_protect(xml_tree): 

177 try: 

178 ns = "{http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties}" 

179 timestamp = utils.parse_datetime(xml_tree.findtext(f".//{ns}timestamp")) 

180 nonce = xml_tree.findtext(f".//{ns}nonce") 

181 except Exception: 

182 raise ValueError("Missing or malformed ReplayProtect element in the message signature.") 

183 else: 

184 if nonce is None: 

185 raise ValueError("Missing 'nonce' element in ReplayProtect in incoming message.") 

186 if timestamp < datetime.now(timezone.utc) - REPLAY_PROTECT_MAX_TIME_DELTA: 

187 raise ValueError("The message was signed too long ago.") 

188 elif (timestamp, nonce) in NONCE_CACHE: 

189 raise ValueError("This combination of timestamp and nonce was already used.") 

190 _update_nonce_cache(timestamp, nonce) 

191 

192 

193def _update_nonce_cache(timestamp, nonce): 

194 NONCE_CACHE.add((timestamp, nonce)) 

195 for timestamp, nonce in list(NONCE_CACHE): 

196 if timestamp < datetime.now(timezone.utc) - REPLAY_PROTECT_MAX_TIME_DELTA: 

197 NONCE_CACHE.remove((timestamp, nonce)) 

198 

199 

200# Replay protect settings 

201REPLAY_PROTECT_MAX_TIME_DELTA = timedelta(seconds=5) 

202NONCE_CACHE = set() 

203 

204# Settings for jinja2 

205TEMPLATES = Environment(loader=PackageLoader('openleadr', 'templates')) 

206TEMPLATES.filters['datetimeformat'] = utils.datetimeformat 

207TEMPLATES.filters['timedeltaformat'] = utils.timedeltaformat 

208TEMPLATES.filters['booleanformat'] = utils.booleanformat 

209TEMPLATES.trim_blocks = True 

210TEMPLATES.lstrip_blocks = True 

211 

212# Settings for xmltodict 

213NAMESPACES = { 

214 'http://docs.oasis-open.org/ns/energyinterop/201110': None, 

215 'http://openadr.org/oadr-2.0b/2012/07': None, 

216 'urn:ietf:params:xml:ns:icalendar-2.0': None, 

217 'http://docs.oasis-open.org/ns/energyinterop/201110/payloads': None, 

218 'http://docs.oasis-open.org/ns/emix/2011/06': None, 

219 'urn:ietf:params:xml:ns:icalendar-2.0:stream': None, 

220 'http://docs.oasis-open.org/ns/emix/2011/06/power': None, 

221 'http://docs.oasis-open.org/ns/emix/2011/06/siscale': None, 

222 'http://www.w3.org/2000/09/xmldsig#': None, 

223 'http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties': None 

224}