Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: Apache-2.0
3# Copyright 2020 Contributors to OpenLEADR
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9# http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17from lxml import etree
18import xmltodict
19from jinja2 import Environment, PackageLoader
20from signxml import XMLSigner, XMLVerifier, methods
21from uuid import uuid4
22from lxml.etree import Element
23from openleadr import errors
24from datetime import datetime, timezone, timedelta
25import os
27from openleadr import utils
28from .preflight import preflight_message
30import logging
31logger = logging.getLogger('openleadr')
33SIGNER = XMLSigner(method=methods.detached,
34 c14n_algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315")
35SIGNER.namespaces['oadr'] = "http://openadr.org/oadr-2.0b/2012/07"
36VERIFIER = XMLVerifier()
38XML_SCHEMA_LOCATION = os.path.join(os.path.dirname(__file__), 'schema', 'oadr_20b.xsd')
40with open(XML_SCHEMA_LOCATION) as file:
41 XML_SCHEMA = etree.XMLSchema(etree.parse(file))
42XML_PARSER = etree.XMLParser(schema=XML_SCHEMA)
45def parse_message(data):
46 """
47 Parse a message and distill its usable parts. Returns a message type and payload.
48 :param data str: The XML string that is received
50 Returns a message type (str) and a message payload (dict)
51 """
52 message_dict = xmltodict.parse(data, process_namespaces=True, namespaces=NAMESPACES)
53 message_type, message_payload = message_dict['oadrPayload']['oadrSignedObject'].popitem()
54 message_payload = utils.normalize_dict(message_payload)
55 return message_type, message_payload
58def create_message(message_type, cert=None, key=None, passphrase=None, **message_payload):
59 """
60 Create and optionally sign an OpenADR message. Returns an XML string.
61 """
62 message_payload = preflight_message(message_type, message_payload)
63 template = TEMPLATES.get_template(f'{message_type}.xml')
64 signed_object = utils.flatten_xml(template.render(**message_payload))
65 envelope = TEMPLATES.get_template('oadrPayload.xml')
66 if cert and key:
67 tree = etree.fromstring(signed_object)
68 signature_tree = SIGNER.sign(tree,
69 key=key,
70 cert=cert,
71 passphrase=utils.ensure_bytes(passphrase),
72 reference_uri="#oadrSignedObject",
73 signature_properties=_create_replay_protect())
74 signature = etree.tostring(signature_tree).decode('utf-8')
75 else:
76 signature = None
78 msg = envelope.render(template=f'{message_type}',
79 signature=signature,
80 signed_object=signed_object)
81 return msg
84def validate_xml_schema(content):
85 """
86 Validates the XML tree against the schema. Return the XML tree.
87 """
88 if isinstance(content, str):
89 content = content.encode('utf-8')
90 tree = etree.fromstring(content, XML_PARSER)
91 return tree
94def validate_xml_signature(xml_tree, cert_fingerprint=None):
95 """
96 Validate the XMLDSIG signature and the ReplayProtect element.
97 """
98 cert = utils.extract_pem_cert(xml_tree)
99 if cert_fingerprint:
100 fingerprint = utils.certificate_fingerprint(cert)
101 if fingerprint != cert_fingerprint:
102 raise errors.FingerprintMismatch("The certificate fingerprint was incorrect. "
103 f"Expected: {cert_fingerprint};"
104 f"Received: {fingerprint}")
105 VERIFIER.verify(xml_tree, x509_cert=utils.ensure_bytes(cert), expect_references=2)
106 _verify_replay_protect(xml_tree)
109async def authenticate_message(request, message_tree, message_payload, fingerprint_lookup=None, ven_lookup=None):
110 if request.secure and 'ven_id' in message_payload:
111 connection_fingerprint = utils.get_cert_fingerprint_from_request(request)
112 if connection_fingerprint is None:
113 msg = ("Your request must use a client side SSL certificate, of which the "
114 "fingerprint must match the fingerprint that you have given to this VTN.")
115 raise errors.NotRegisteredOrAuthorizedError(msg)
117 try:
118 ven_id = message_payload.get('ven_id')
119 if fingerprint_lookup:
120 expected_fingerprint = await utils.await_if_required(fingerprint_lookup(ven_id))
121 if not expected_fingerprint:
122 raise ValueError
123 elif ven_lookup:
124 ven_info = await utils.await_if_required(ven_lookup(ven_id))
125 if not ven_info:
126 raise ValueError
127 expected_fingerprint = ven_info.get('fingerprint')
128 except ValueError:
129 msg = (f"Your venID {ven_id} is not known to this VTN. Make sure you use the venID "
130 "that you receive from this VTN during the registration step")
131 raise errors.NotRegisteredOrAuthorizedError(msg)
133 if expected_fingerprint is None:
134 msg = ("This VTN server does not know what your certificate fingerprint is. Please "
135 "deliver your fingerprint to the VTN (outside of OpenADR). You used the "
136 "following fingerprint to make this request:")
137 raise errors.NotRegisteredOrAuthorizedError(msg)
139 if connection_fingerprint != expected_fingerprint:
140 msg = (f"The fingerprint of your HTTPS certificate '{connection_fingerprint}' "
141 f"does not match the expected fingerprint '{expected_fingerprint}'")
142 raise errors.NotRegisteredOrAuthorizedError(msg)
144 message_cert = utils.extract_pem_cert(message_tree)
145 message_fingerprint = utils.certificate_fingerprint(message_cert)
146 if message_fingerprint != expected_fingerprint:
147 msg = (f"The fingerprint of the certificate used to sign the message "
148 f"{message_fingerprint} did not match the fingerprint that this "
149 f"VTN has for you {expected_fingerprint}. Make sure you use the correct "
150 "certificate to sign your messages.")
151 raise errors.NotRegisteredOrAuthorizedError(msg)
153 try:
154 validate_xml_signature(message_tree)
155 except ValueError:
156 msg = ("The message signature did not match the message contents. Please make sure "
157 "you are using the correct XMLDSig algorithm and C14n canonicalization.")
158 raise errors.NotRegisteredOrAuthorizedError(msg)
161def _create_replay_protect():
162 dt_element = Element("{http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties}timestamp")
163 dt_element.text = utils.datetimeformat(datetime.now(timezone.utc))
165 nonce_element = Element("{http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties}nonce")
166 nonce_element.text = uuid4().hex
168 el = Element("{http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties}ReplayProtect",
169 nsmap={'dsp': 'http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties'},
170 attrib={'Id': 'myid', 'Target': '#mytarget'})
171 el.append(dt_element)
172 el.append(nonce_element)
173 return el
176def _verify_replay_protect(xml_tree):
177 try:
178 ns = "{http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties}"
179 timestamp = utils.parse_datetime(xml_tree.findtext(f".//{ns}timestamp"))
180 nonce = xml_tree.findtext(f".//{ns}nonce")
181 except Exception:
182 raise ValueError("Missing or malformed ReplayProtect element in the message signature.")
183 else:
184 if nonce is None:
185 raise ValueError("Missing 'nonce' element in ReplayProtect in incoming message.")
186 if timestamp < datetime.now(timezone.utc) - REPLAY_PROTECT_MAX_TIME_DELTA:
187 raise ValueError("The message was signed too long ago.")
188 elif (timestamp, nonce) in NONCE_CACHE:
189 raise ValueError("This combination of timestamp and nonce was already used.")
190 _update_nonce_cache(timestamp, nonce)
193def _update_nonce_cache(timestamp, nonce):
194 NONCE_CACHE.add((timestamp, nonce))
195 for timestamp, nonce in list(NONCE_CACHE):
196 if timestamp < datetime.now(timezone.utc) - REPLAY_PROTECT_MAX_TIME_DELTA:
197 NONCE_CACHE.remove((timestamp, nonce))
200# Replay protect settings
201REPLAY_PROTECT_MAX_TIME_DELTA = timedelta(seconds=5)
202NONCE_CACHE = set()
204# Settings for jinja2
205TEMPLATES = Environment(loader=PackageLoader('openleadr', 'templates'))
206TEMPLATES.filters['datetimeformat'] = utils.datetimeformat
207TEMPLATES.filters['timedeltaformat'] = utils.timedeltaformat
208TEMPLATES.filters['booleanformat'] = utils.booleanformat
209TEMPLATES.trim_blocks = True
210TEMPLATES.lstrip_blocks = True
212# Settings for xmltodict
213NAMESPACES = {
214 'http://docs.oasis-open.org/ns/energyinterop/201110': None,
215 'http://openadr.org/oadr-2.0b/2012/07': None,
216 'urn:ietf:params:xml:ns:icalendar-2.0': None,
217 'http://docs.oasis-open.org/ns/energyinterop/201110/payloads': None,
218 'http://docs.oasis-open.org/ns/emix/2011/06': None,
219 'urn:ietf:params:xml:ns:icalendar-2.0:stream': None,
220 'http://docs.oasis-open.org/ns/emix/2011/06/power': None,
221 'http://docs.oasis-open.org/ns/emix/2011/06/siscale': None,
222 'http://www.w3.org/2000/09/xmldsig#': None,
223 'http://openadr.org/oadr-2.0b/2012/07/xmldsig-properties': None
224}