Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: Apache-2.0
3# Copyright 2020 Contributors to OpenLEADR
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9# http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17import asyncio
18import inspect
19import logging
20import ssl
21from datetime import datetime, timedelta, timezone
22from functools import partial
23from http import HTTPStatus
25import aiohttp
26from lxml.etree import XMLSyntaxError
27from signxml.exceptions import InvalidSignature
28from apscheduler.schedulers.asyncio import AsyncIOScheduler
29from openleadr import enums, objects, errors
30from openleadr.messaging import create_message, parse_message, \
31 validate_xml_schema, validate_xml_signature
32from openleadr import utils
34logger = logging.getLogger('openleadr')
37class OpenADRClient:
38 """
39 Main client class. Most of these methods will be called automatically, but
40 you can always choose to call them manually.
41 """
42 def __init__(self, ven_name, vtn_url, debug=False, cert=None, key=None,
43 passphrase=None, vtn_fingerprint=None, show_fingerprint=True, ca_file=None,
44 allow_jitter=True, ven_id=None):
45 """
46 Initializes a new OpenADR Client (Virtual End Node)
48 :param str ven_name: The name for this VEN
49 :param str vtn_url: The URL of the VTN (Server) to connect to
50 :param bool debug: Whether or not to print debugging messages
51 :param str cert: The path to a PEM-formatted Certificate file to use
52 for signing messages.
53 :param str key: The path to a PEM-formatted Private Key file to use
54 for signing messages.
55 :param str fingerprint: The fingerprint for the VTN's certificate to
56 verify incomnig messages
57 :param str show_fingerprint: Whether to print your own fingerprint
58 on startup. Defaults to True.
59 :param str ca_file: The path to the PEM-formatted CA file for validating the VTN server's
60 certificate.
61 :param str ven_id: The ID for this VEN. If you leave this blank,
62 a VEN_ID will be assigned by the VTN.
63 """
65 self.ven_name = ven_name
66 if vtn_url.endswith("/"):
67 vtn_url = vtn_url[:-1]
68 self.vtn_url = vtn_url
69 self.ven_id = ven_id
70 self.registration_id = None
71 self.poll_frequency = None
72 self.vtn_fingerprint = vtn_fingerprint
73 self.debug = debug
75 self.reports = []
76 self.report_callbacks = {} # Holds the callbacks for each specific report
77 self.report_requests = [] # Keep track of the report requests from the VTN
78 self.incomplete_reports = {} # Holds reports that are being populated over time
79 self.pending_reports = asyncio.Queue() # Holds reports that are waiting to be sent
80 self.scheduler = AsyncIOScheduler()
81 self.client_session = None
82 self.report_queue_task = None
84 self.received_events = [] # Holds the events that we received.
85 self.responded_events = {} # Holds the events that we already saw.
87 self.cert_path = cert
88 self.key_path = key
89 self.passphrase = passphrase
90 self.ca_file = ca_file
91 self.allow_jitter = allow_jitter
93 if cert and key:
94 with open(cert, 'rb') as file:
95 cert = file.read()
96 with open(key, 'rb') as file:
97 key = file.read()
98 if show_fingerprint:
99 print("")
100 print("*" * 80)
101 print("Your VEN Certificate Fingerprint is ".center(80))
102 print(f"{utils.certificate_fingerprint(cert).center(80)}".center(80))
103 print("Please deliver this fingerprint to the VTN.".center(80))
104 print("You do not need to keep this a secret.".center(80))
105 print("*" * 80)
106 print("")
108 self._create_message = partial(create_message,
109 cert=cert,
110 key=key,
111 passphrase=passphrase)
113 async def run(self):
114 """
115 Run the client in full-auto mode.
116 """
117 # if not hasattr(self, 'on_event'):
118 # raise NotImplementedError("You must implement on_event.")
119 self.loop = asyncio.get_event_loop()
120 await self.create_party_registration(ven_id=self.ven_id)
122 if not self.ven_id:
123 logger.error("No VEN ID received from the VTN, aborting.")
124 await self.stop()
125 return
127 if self.reports:
128 await self.register_reports(self.reports)
129 self.report_queue_task = self.loop.create_task(self._report_queue_worker())
131 await self._poll()
133 # Set up automatic polling
134 if self.poll_frequency > timedelta(hours=24):
135 logger.warning("Polling with intervals of more than 24 hours is not supported. "
136 "Will use 24 hours as the polling interval.")
137 self.poll_frequency = timedelta(hours=24)
138 cron_config = utils.cron_config(self.poll_frequency, randomize_seconds=self.allow_jitter)
140 self.scheduler.add_job(self._poll,
141 trigger='cron',
142 **cron_config)
143 self.scheduler.add_job(self._event_cleanup,
144 trigger='interval',
145 seconds=300)
146 self.scheduler.start()
148 async def stop(self):
149 """
150 Cleanly stops the client. Run this coroutine before closing your event loop.
151 """
152 if self.scheduler.running:
153 self.scheduler.shutdown()
154 if self.report_queue_task:
155 self.report_queue_task.cancel()
156 await self.client_session.close()
157 await asyncio.sleep(0)
159 def add_handler(self, handler, callback):
160 """
161 Add a callback for the given situation
162 """
163 if handler not in ('on_event', 'on_update_event'):
164 logger.error("'handler' must be either on_event or on_update_event")
165 return
167 setattr(self, handler, callback)
169 def add_report(self, callback, resource_id, measurement=None,
170 data_collection_mode='incremental',
171 report_specifier_id=None, r_id=None,
172 report_name=enums.REPORT_NAME.TELEMETRY_USAGE,
173 reading_type=enums.READING_TYPE.DIRECT_READ,
174 report_type=enums.REPORT_TYPE.READING, sampling_rate=None, data_source=None,
175 scale="none", unit=None, power_ac=True, power_hertz=50, power_voltage=230,
176 market_context=None, end_device_asset_mrid=None, report_data_source=None):
177 """
178 Add a new reporting capability to the client.
180 :param callable callback: A callback or coroutine that will fetch the value for a specific
181 report. This callback will be passed the report_id and the r_id
182 of the requested value.
183 :param str resource_id: A specific name for this resource within this report.
184 :param str measurement: The quantity that is being measured (openleadr.enums.MEASUREMENTS).
185 Optional for TELEMETRY_STATUS reports.
186 :param str data_collection_mode: Whether you want the data to be collected incrementally
187 or at once. If the VTN requests the sampling interval to be
188 higher than the reporting interval, this setting determines
189 if the callback should be called at the sampling rate (with
190 no args, assuming it returns the current value), or at the
191 reporting interval (with date_from and date_to as keyword
192 arguments). Choose 'incremental' for the former case, or
193 'full' for the latter case.
194 :param str report_specifier_id: A unique identifier for this report. Leave this blank for a
195 random generated id, or fill it in if your VTN depends on
196 this being a known value, or if it needs to be constant
197 between restarts of the client.
198 :param str r_id: A unique identifier for a datapoint in a report. The same remarks apply as
199 for the report_specifier_id.
200 :param str report_name: An OpenADR name for this report (one of openleadr.enums.REPORT_NAME)
201 :param str reading_type: An OpenADR reading type (found in openleadr.enums.READING_TYPE)
202 :param str report_type: An OpenADR report type (found in openleadr.enums.REPORT_TYPE)
203 :param datetime.timedelta sampling_rate: The sampling rate for the measurement.
204 :param str unit: The unit for this measurement.
205 :param boolean power_ac: Whether the power is AC (True) or DC (False).
206 Only required when supplying a power-related measurement.
207 :param int power_hertz: Grid frequency of the power.
208 Only required when supplying a power-related measurement.
209 :param int power_voltage: Voltage of the power.
210 Only required when supplying a power-related measurement.
211 :param str market_context: The Market Context that this report belongs to.
212 :param str end_device_asset_mrid: the Meter ID for the end device that is measured by this report.
213 :param report_data_source: A (list of) target(s) that this report is related to.
214 """
216 # Verify input
217 if report_name not in enums.REPORT_NAME.values and not report_name.startswith('x-'):
218 raise ValueError(f"{report_name} is not a valid report_name. Valid options are "
219 f"{', '.join(enums.REPORT_NAME.values)}",
220 " or any name starting with 'x-'.")
221 if reading_type not in enums.READING_TYPE.values and not reading_type.startswith('x-'):
222 raise ValueError(f"{reading_type} is not a valid reading_type. Valid options are "
223 f"{', '.join(enums.READING_TYPE.values)}"
224 " or any name starting with 'x-'.")
225 if report_type not in enums.REPORT_TYPE.values and not report_type.startswith('x-'):
226 raise ValueError(f"{report_type} is not a valid report_type. Valid options are "
227 f"{', '.join(enums.REPORT_TYPE.values)}"
228 " or any name starting with 'x-'.")
229 if scale not in enums.SI_SCALE_CODE.values:
230 raise ValueError(f"{scale} is not a valid scale. Valid options are "
231 f"{', '.join(enums.SI_SCALE_CODE.values)}")
233 if sampling_rate is None:
234 sampling_rate = objects.SamplingRate(min_period=timedelta(seconds=10),
235 max_period=timedelta(hours=24),
236 on_change=False)
237 elif isinstance(sampling_rate, timedelta):
238 sampling_rate = objects.SamplingRate(min_period=sampling_rate,
239 max_period=sampling_rate,
240 on_change=False)
242 if data_collection_mode not in ('incremental', 'full'):
243 raise ValueError("The data_collection_mode should be 'incremental' or 'full'.")
245 if data_collection_mode == 'full':
246 args = inspect.signature(callback).parameters
247 if not ('date_from' in args and 'date_to' in args and 'sampling_interval' in args):
248 raise TypeError("Your callback function must accept the 'date_from', 'date_to' "
249 "and 'sampling_interval' arguments if used "
250 "with data_collection_mode 'full'.")
252 # Determine the correct item name, item description and unit
253 if report_name == 'TELEMETRY_STATUS':
254 item_base = None
255 elif isinstance(measurement, objects.Measurement):
256 item_base = measurement
257 elif isinstance(measurement, dict):
258 utils.validate_report_measurement_dict(measurement)
259 power_attributes = object.PowerAttributes(**measurement.get('power_attributes')) or None
260 item_base = objects.Measurement(name=measurement['name'],
261 description=measurement['description'],
262 unit=measurement['unit'],
263 scale=measurement.get('scale'),
264 power_attributes=power_attributes)
265 elif measurement.upper() in enums.MEASUREMENTS.members:
266 item_base = enums.MEASUREMENTS[measurement.upper()]
267 else:
268 item_base = objects.Measurement(name='customUnit',
269 description=measurement,
270 unit=unit,
271 scale=scale)
273 if report_name != 'TELEMETRY_STATUS' and scale is not None:
274 if item_base.scale is not None:
275 if scale in enums.SI_SCALE_CODE.values:
276 item_base.scale = scale
277 else:
278 raise ValueError("The 'scale' argument must be one of '{'. ',join(enums.SI_SCALE_CODE.values)}")
280 # Check if unit is compatible
281 if unit is not None and unit != item_base.unit and unit not in item_base.acceptable_units:
282 logger.warning(f"The supplied unit {unit} for measurement {measurement} "
283 f"will be ignored, {item_base.unit} will be used instead. "
284 f"Allowed units for this measurement are: "
285 f"{', '.join(item_base.acceptable_units)}")
287 # Get or create the relevant Report
288 if report_specifier_id:
289 report = utils.find_by(self.reports,
290 'report_name', report_name,
291 'report_specifier_id', report_specifier_id)
292 else:
293 report = utils.find_by(self.reports, 'report_name', report_name)
295 if not report:
296 report_specifier_id = report_specifier_id or utils.generate_id()
297 report = objects.Report(created_date_time=datetime.now(),
298 report_name=report_name,
299 report_specifier_id=report_specifier_id,
300 data_collection_mode=data_collection_mode)
301 self.reports.append(report)
303 # Add the new report description to the report
304 target = objects.Target(resource_id=resource_id)
305 r_id = utils.generate_id()
306 report_description = objects.ReportDescription(r_id=r_id,
307 reading_type=reading_type,
308 report_data_source=target,
309 report_subject=target,
310 report_type=report_type,
311 sampling_rate=sampling_rate,
312 measurement=item_base,
313 market_context=market_context)
314 self.report_callbacks[(report.report_specifier_id, r_id)] = callback
315 report.report_descriptions.append(report_description)
316 return report_specifier_id, r_id
318 ###########################################################################
319 # #
320 # POLLING METHODS #
321 # #
322 ###########################################################################
324 async def poll(self):
325 """
326 Request the next available message from the Server. This coroutine is called automatically.
327 """
328 service = 'OadrPoll'
329 message = self._create_message('oadrPoll', ven_id=self.ven_id)
330 response_type, response_payload = await self._perform_request(service, message)
331 return response_type, response_payload
333 ###########################################################################
334 # #
335 # REGISTRATION METHODS #
336 # #
337 ###########################################################################
339 async def query_registration(self):
340 """
341 Request information about the VTN.
342 """
343 request_id = utils.generate_id()
344 service = 'EiRegisterParty'
345 message = self._create_message('oadrQueryRegistration', request_id=request_id)
346 response_type, response_payload = await self._perform_request(service, message)
347 return response_type, response_payload
349 async def create_party_registration(self, http_pull_model=True, xml_signature=False,
350 report_only=False, profile_name='2.0b',
351 transport_name='simpleHttp', transport_address=None,
352 ven_id=None):
353 """
354 Take the neccessary steps to register this client with the server.
356 :param bool http_pull_model: Whether to use the 'pull' model for HTTP.
357 :param bool xml_signature: Whether to sign each XML message.
358 :param bool report_only: Whether or not this is a reporting-only client
359 which does not deal with Events.
360 :param str profile_name: Which OpenADR profile to use.
361 :param str transport_name: The transport name to use. Either 'simpleHttp' or 'xmpp'.
362 :param str transport_address: Which public-facing address the server should use
363 to communicate.
364 :param str ven_id: The ID for this VEN. If you leave this blank,
365 a VEN_ID will be assigned by the VTN.
366 """
367 request_id = utils.generate_id()
368 service = 'EiRegisterParty'
369 payload = {'ven_name': self.ven_name,
370 'http_pull_model': http_pull_model,
371 'xml_signature': xml_signature,
372 'report_only': report_only,
373 'profile_name': profile_name,
374 'transport_name': transport_name,
375 'transport_address': transport_address}
376 if ven_id:
377 payload['ven_id'] = ven_id
378 message = self._create_message('oadrCreatePartyRegistration',
379 request_id=request_id,
380 **payload)
381 response_type, response_payload = await self._perform_request(service, message)
382 if response_type is None:
383 return
384 if response_payload['response']['response_code'] != 200:
385 status_code = response_payload['response']['response_code']
386 status_description = response_payload['response']['response_description']
387 logger.error(f"Got error on Create Party Registration: "
388 f"{status_code} {status_description}")
389 return
390 self.ven_id = response_payload['ven_id']
391 self.registration_id = response_payload['registration_id']
392 self.poll_frequency = response_payload.get('requested_oadr_poll_freq',
393 timedelta(seconds=10))
394 logger.info(f"VEN is now registered with ID {self.ven_id}")
395 logger.info(f"The polling frequency is {self.poll_frequency}")
396 return response_type, response_payload
398 async def cancel_party_registration(self):
399 raise NotImplementedError("Cancel Registration is not yet implemented")
401 ###########################################################################
402 # #
403 # EVENT METHODS #
404 # #
405 ###########################################################################
407 async def request_event(self, reply_limit=None):
408 """
409 Request the next Event from the VTN, if it has any.
410 """
411 payload = {'request_id': utils.generate_id(),
412 'ven_id': self.ven_id,
413 'reply_limit': reply_limit}
414 message = self._create_message('oadrRequestEvent', **payload)
415 service = 'EiEvent'
416 response_type, response_payload = await self._perform_request(service, message)
417 return response_type, response_payload
419 async def created_event(self, request_id, event_id, opt_type, modification_number=0):
420 """
421 Inform the VTN that we created an event.
422 """
423 service = 'EiEvent'
424 payload = {'ven_id': self.ven_id,
425 'response': {'response_code': 200,
426 'response_description': 'OK',
427 'request_id': request_id},
428 'event_responses': [{'response_code': 200,
429 'response_description': 'OK',
430 'request_id': request_id,
431 'event_id': event_id,
432 'modification_number': modification_number,
433 'opt_type': opt_type}]}
434 message = self._create_message('oadrCreatedEvent', **payload)
435 response_type, response_payload = await self._perform_request(service, message)
437 ###########################################################################
438 # #
439 # REPORTING METHODS #
440 # #
441 ###########################################################################
443 async def register_reports(self, reports):
444 """
445 Tell the VTN about our reports. The VTN miht respond with an
446 oadrCreateReport message that tells us which reports are to be sent.
447 """
448 request_id = utils.generate_id()
449 payload = {'request_id': request_id,
450 'ven_id': self.ven_id,
451 'reports': reports,
452 'report_request_id': 0}
454 service = 'EiReport'
455 message = self._create_message('oadrRegisterReport', **payload)
456 response_type, response_payload = await self._perform_request(service, message)
458 # Handle the subscriptions that the VTN is interested in.
459 if 'report_requests' in response_payload:
460 for report_request in response_payload['report_requests']:
461 await self.create_report(report_request)
463 message_type = 'oadrCreatedReport'
464 message_payload = {}
466 return message_type, message_payload
468 async def create_report(self, report_request):
469 """
470 Add the requested reports to the reporting mechanism.
471 This is called when the VTN requests reports from us.
473 :param report_request dict: The oadrReportRequest dict from the VTN.
474 """
475 # Get the relevant variables from the report requests
476 report_request_id = report_request['report_request_id']
477 report_specifier_id = report_request['report_specifier']['report_specifier_id']
478 report_back_duration = report_request['report_specifier'].get('report_back_duration')
479 granularity = report_request['report_specifier']['granularity']
481 # Check if this report actually exists
482 report = utils.find_by(self.reports, 'report_specifier_id', report_specifier_id)
483 if not report:
484 logger.error(f"A non-existant report with report_specifier_id "
485 f"{report_specifier_id} was requested.")
486 return False
488 # Check and collect the requested r_ids for this report
489 requested_r_ids = []
490 for specifier_payload in report_request['report_specifier']['specifier_payloads']:
491 r_id = specifier_payload['r_id']
492 # Check if the requested r_id actually exists
493 rd = utils.find_by(report.report_descriptions, 'r_id', r_id)
494 if not rd:
495 logger.error(f"A non-existant report with r_id {r_id} "
496 f"inside report with report_specifier_id {report_specifier_id} "
497 f"was requested.")
498 continue
500 # Check if the requested measurement exists and if the correct unit is requested
501 if 'measurement' in specifier_payload:
502 measurement = specifier_payload['measurement']
503 if measurement['description'] != rd.measurement.description:
504 logger.error(f"A non-matching measurement description for report with "
505 f"report_request_id {report_request_id} and r_id {r_id} was given "
506 f"by the VTN. Offered: {rd.measurement.description}, "
507 f"requested: {measurement['description']}")
508 continue
509 if measurement['unit'] != rd.measurement.unit:
510 logger.error(f"A non-matching measurement unit for report with "
511 f"report_request_id {report_request_id} and r_id {r_id} was given "
512 f"by the VTN. Offered: {rd.measurement.unit}, "
513 f"requested: {measurement['unit']}")
514 continue
516 if granularity is not None:
517 if not rd.sampling_rate.min_period <= granularity <= rd.sampling_rate.max_period:
518 logger.error(f"An invalid sampling rate {granularity} was requested for report "
519 f"with report_specifier_id {report_specifier_id} and r_id {r_id}. "
520 f"The offered sampling rate was between "
521 f"{rd.sampling_rate.min_period} and "
522 f"{rd.sampling_rate.max_period}")
523 continue
524 else:
525 # If no granularity is specified, set it to the lowest sampling rate.
526 granularity = rd.sampling_rate.max_period
528 requested_r_ids.append(r_id)
530 callback = partial(self.update_report, report_request_id=report_request_id)
532 reporting_interval = report_back_duration or granularity
533 job = self.scheduler.add_job(func=callback,
534 trigger='cron',
535 **utils.cron_config(reporting_interval))
537 self.report_requests.append({'report_request_id': report_request_id,
538 'report_specifier_id': report_specifier_id,
539 'report_back_duration': report_back_duration,
540 'r_ids': requested_r_ids,
541 'granularity': granularity,
542 'job': job})
544 async def create_single_report(self, report_request):
545 """
546 Create a single report in response to a request from the VTN.
547 """
549 async def update_report(self, report_request_id):
550 """
551 Call the previously registered report callback and send the result as a message to the VTN.
552 """
553 logger.debug(f"Running update_report for {report_request_id}")
554 report_request = utils.find_by(self.report_requests, 'report_request_id', report_request_id)
555 granularity = report_request['granularity']
556 report_back_duration = report_request['report_back_duration']
557 report_specifier_id = report_request['report_specifier_id']
558 report = utils.find_by(self.reports, 'report_specifier_id', report_specifier_id)
559 data_collection_mode = report.data_collection_mode
561 if report_request_id in self.incomplete_reports:
562 logger.debug("We were already compiling this report")
563 outgoing_report = self.incomplete_reports[report_request_id]
564 else:
565 logger.debug("There is no report in progress")
566 outgoing_report = objects.Report(report_request_id=report_request_id,
567 report_specifier_id=report.report_specifier_id,
568 report_name=report.report_name,
569 intervals=[])
571 intervals = outgoing_report.intervals or []
572 if data_collection_mode == 'full':
573 if report_back_duration is None:
574 report_back_duration = granularity
575 date_to = datetime.now(timezone.utc)
576 date_from = date_to - max(report_back_duration, granularity)
577 for r_id in report_request['r_ids']:
578 report_callback = self.report_callbacks[(report_specifier_id, r_id)]
579 result = report_callback(date_from=date_from,
580 date_to=date_to,
581 sampling_interval=granularity)
582 if asyncio.iscoroutine(result):
583 result = await result
584 for dt, value in result:
585 report_payload = objects.ReportPayload(r_id=r_id, value=value)
586 intervals.append(objects.ReportInterval(dtstart=dt,
587 report_payload=report_payload))
589 else:
590 for r_id in report_request['r_ids']:
591 report_callback = self.report_callbacks[(report_specifier_id, r_id)]
592 result = report_callback()
593 if asyncio.iscoroutine(result):
594 result = await result
595 if isinstance(result, (int, float)):
596 result = [(datetime.now(timezone.utc), result)]
597 for dt, value in result:
598 logger.info(f"Adding {dt}, {value} to report")
599 report_payload = objects.ReportPayload(r_id=r_id, value=value)
600 intervals.append(objects.ReportInterval(dtstart=dt,
601 report_payload=report_payload))
602 outgoing_report.intervals = intervals
603 logger.info(f"The number of intervals in the report is now {len(outgoing_report.intervals)}")
605 # Figure out if the report is complete after this sampling
606 if data_collection_mode == 'incremental' and report_back_duration is not None\
607 and report_back_duration > granularity:
608 report_interval = report_back_duration.total_seconds()
609 sampling_interval = granularity.total_seconds()
610 expected_len = len(report_request['r_ids']) * int(report_interval / sampling_interval)
611 if len(outgoing_report.intervals) == expected_len:
612 logger.info("The report is now complete with all the values. Will queue for sending.")
613 await self.pending_reports.put(self.incomplete_reports.pop(report_request_id))
614 else:
615 logger.debug("The report is not yet complete, will hold until it is.")
616 self.incomplete_reports[report_request_id] = outgoing_report
617 else:
618 logger.info("Report will be sent now.")
619 await self.pending_reports.put(outgoing_report)
621 async def cancel_report(self, payload):
622 """
623 Cancel this report.
624 """
626 async def _report_queue_worker(self):
627 """
628 A Queue worker that pushes out the pending reports.
629 """
630 try:
631 while True:
632 report = await self.pending_reports.get()
633 service = 'EiReport'
634 message = self._create_message('oadrUpdateReport',
635 ven_id=self.ven_id,
636 request_id=utils.generate_id(),
637 reports=[report])
638 try:
639 response_type, response_payload = await self._perform_request(service, message)
640 except Exception as err:
641 logger.error(f"Unable to send the report to the VTN. Error: {err}")
642 else:
643 if 'cancel_report' in response_payload:
644 await self.cancel_report(response_payload['cancel_report'])
645 except asyncio.CancelledError:
646 return
648 ###########################################################################
649 # #
650 # PLACEHOLDER #
651 # #
652 ###########################################################################
654 async def on_event(self, event):
655 """
656 Placeholder for the on_event handler.
657 """
658 logger.warning("You should implement your own on_event handler. This handler receives "
659 "an Event dict and should return either 'optIn' or 'optOut' based on your "
660 "choice. Will opt out of the event for now.")
661 return 'optOut'
663 async def on_update_event(self, event):
664 """
665 Placeholder for the on_update_event handler.
666 """
667 logger.warning("An Event was updated, but you don't have an on_updated_event handler configured. "
668 "You should implement your own on_update_event handler. This handler receives "
669 "an Event dict and should return either 'optIn' or 'optOut' based on your "
670 "choice. Will re-use the previous opt status for this event_id for now")
671 if event['event_descriptor']['event_id'] in self.events:
672 return self.responded_events['event_id']
674 async def on_register_report(self, report):
675 """
676 Placeholder for the on_register_report handler.
677 """
679 ###########################################################################
680 # #
681 # EMPTY RESPONSES #
682 # #
683 ###########################################################################
685 async def send_response(self, service, response_code=200, response_description="OK", request_id=None):
686 """
687 Send an empty oadrResponse, for instance after receiving oadrRequestReregistration.
688 """
689 msg = self._create_message('oadrResponse',
690 response={'response_code': response_code,
691 'response_description': response_description,
692 'request_id': request_id})
693 await self._perform_request(service, msg)
695 ###########################################################################
696 # #
697 # LOW LEVEL #
698 # #
699 ###########################################################################
701 async def _perform_request(self, service, message):
702 await self._ensure_client_session()
703 logger.debug(f"Client is sending {message}")
704 url = f"{self.vtn_url}/{service}"
705 try:
706 async with self.client_session.post(url, data=message) as req:
707 content = await req.read()
708 if req.status != HTTPStatus.OK:
709 logger.warning(f"Non-OK status {req.status} when performing a request to {url} "
710 f"with data {message}: {req.status} {content.decode('utf-8')}")
711 return None, {}
712 logger.debug(content.decode('utf-8'))
713 except aiohttp.client_exceptions.ClientConnectorError as err:
714 # Could not connect to server
715 logger.error(f"Could not connect to server with URL {self.vtn_url}:")
716 logger.error(f"{err.__class__.__name__}: {str(err)}")
717 return None, {}
718 except Exception as err:
719 logger.error(f"Request error {err.__class__.__name__}:{err}")
720 return None, {}
721 if len(content) == 0:
722 return None
723 try:
724 tree = validate_xml_schema(content)
725 if self.vtn_fingerprint:
726 validate_xml_signature(tree)
727 message_type, message_payload = parse_message(content)
728 except XMLSyntaxError as err:
729 logger.warning(f"Incoming message did not pass XML schema validation: {err}")
730 return None, {}
731 except errors.FingerprintMismatch as err:
732 logger.warning(err)
733 return None, {}
734 except InvalidSignature:
735 logger.warning("Incoming message had invalid signature, ignoring.")
736 return None, {}
737 except Exception as err:
738 logger.error(f"The incoming message could not be parsed or validated: {err}")
739 return None, {}
740 if 'response' in message_payload and 'response_code' in message_payload['response']:
741 if message_payload['response']['response_code'] != 200:
742 logger.warning("We got a non-OK OpenADR response from the server: "
743 f"{message_payload['response']['response_code']}: "
744 f"{message_payload['response']['response_description']}")
745 return message_type, message_payload
747 async def _on_event(self, message):
748 logger.debug("The VEN received an event")
749 events = message['events']
750 try:
751 results = []
752 for event in message['events']:
753 event_id = event['event_descriptor']['event_id']
754 event_status = event['event_descriptor']['event_status']
755 modification_number = event['event_descriptor']['modification_number']
756 received_event = utils.find_by(self.received_events, 'event_descriptor.event_id', event_id)
757 if received_event:
758 if received_event['event_descriptor']['modification_number'] == modification_number:
759 # Re-submit the same opt type as we already had previously
760 result = self.responded_events[event_id]
761 else:
762 # Replace the event with the fresh copy
763 utils.pop_by(self.received_events, 'event_descriptor.event_id', event_id)
764 self.received_events.append(event)
765 # Wait for the result of the on_update_event handler
766 result = await utils.await_if_required(self.on_update_event(event))
767 else:
768 # Wait for the result of the on_event
769 self.received_events.append(event)
770 result = self.on_event(event)
771 if asyncio.iscoroutine(result):
772 result = await result
773 results.append(result)
774 if event_status in (enums.EVENT_STATUS.COMPLETED, enums.EVENT_STATUS.CANCELLED):
775 self.responded_events.pop(event_id)
776 else:
777 self.responded_events[event_id] = result
778 for i, result in enumerate(results):
779 if result not in ('optIn', 'optOut') and events[i]['response_required'] == 'always':
780 logger.error("Your on_event or on_update_event handler must return 'optIn' or 'optOut'; "
781 f"you supplied {result}. Please fix your on_event handler.")
782 results[i] = 'optOut'
783 except Exception as err:
784 logger.error("Your on_event handler encountered an error. Will Opt Out of the event. "
785 f"The error was {err.__class__.__name__}: {str(err)}")
786 results = ['optOut'] * len(events)
788 event_responses = [{'response_code': 200,
789 'response_description': 'OK',
790 'opt_type': results[i],
791 'request_id': message['request_id'],
792 'modification_number': modification_number,
793 'event_id': events[i]['event_descriptor']['event_id']}
794 for i, event in enumerate(events)
795 if event['response_required'] == 'always'
796 and not utils.determine_event_status(event['active_period']) == 'completed']
798 if len(event_responses) > 0:
799 response = {'response_code': 200,
800 'response_description': 'OK',
801 'request_id': message['request_id']}
802 message = self._create_message('oadrCreatedEvent',
803 response=response,
804 event_responses=event_responses,
805 ven_id=self.ven_id)
806 service = 'EiEvent'
807 response_type, response_payload = await self._perform_request(service, message)
808 logger.info(response_type, response_payload)
809 else:
810 logger.info("Not sending any event responses, because a response was not required/allowed by the VTN.")
812 async def _event_cleanup(self):
813 """
814 Periodic task that will clean up completed and cancelled events in our memory.
815 """
816 for event in self.received_events:
817 if event['event_descriptor']['event_status'] == 'cancelled' or \
818 utils.determine_event_status(event['active_period']) == 'completed':
819 logger.info(f"Removing event {event} because it is no longer relevant.")
820 self.received_events.pop(self.received_events.index(event))
822 async def _poll(self):
823 logger.debug("Now polling for new messages")
824 response_type, response_payload = await self.poll()
825 if response_type is None:
826 return
828 elif response_type == 'oadrResponse':
829 logger.debug("Received empty response from the VTN.")
830 return
832 elif response_type == 'oadrRequestReregistration':
833 logger.info("The VTN required us to re-register. Calling the registration procedure.")
834 await self.send_response(service='EiRegisterParty')
835 await self.create_party_registration()
836 if self.reports:
837 await self.register_reports(self.reports)
839 elif response_type == 'oadrDistributeEvent':
840 if 'events' in response_payload and len(response_payload['events']) > 0:
841 await self._on_event(response_payload)
843 elif response_type == 'oadrUpdateReport':
844 await self._on_report(response_payload)
846 elif response_type == 'oadrCreateReport':
847 if 'report_requests' in response_payload:
848 for report_request in response_payload['report_requests']:
849 await self.create_report(report_request)
851 elif response_type == 'oadrRegisterReport':
852 if 'reports' in response_payload and len(response_payload['reports']) > 0:
853 for report in response_payload['reports']:
854 await self.register_report(report)
856 else:
857 logger.warning(f"No handler implemented for incoming message "
858 f"of type {response_type}, ignoring.")
860 # Immediately poll again, because there might be more messages
861 await self._poll()
863 async def _ensure_client_session(self):
864 if not self.client_session:
865 headers = {'content-type': 'application/xml'}
866 if self.cert_path:
867 ssl_context = ssl.create_default_context(cafile=self.ca_file,
868 purpose=ssl.Purpose.CLIENT_AUTH)
869 ssl_context.load_cert_chain(self.cert_path, self.key_path, self.passphrase)
870 ssl_context.check_hostname = False
871 connector = aiohttp.TCPConnector(ssl=ssl_context)
872 self.client_session = aiohttp.ClientSession(connector=connector, headers=headers)
873 else:
874 self.client_session = aiohttp.ClientSession(headers=headers)