Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: Apache-2.0
3# Copyright 2020 Contributors to OpenLEADR
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9# http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17from datetime import datetime, timedelta, timezone
18from dataclasses import is_dataclass, asdict
19from collections import OrderedDict
20from openleadr import enums, objects
21import asyncio
22import re
23import ssl
24import hashlib
25import uuid
26import logging
28logger = logging.getLogger('openleadr')
30DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
31DATETIME_FORMAT_NO_MICROSECONDS = "%Y-%m-%dT%H:%M:%SZ"
34def generate_id(*args, **kwargs):
35 """
36 Generate a string that can be used as an identifier in OpenADR messages.
37 """
38 return str(uuid.uuid4())
41def flatten_xml(message):
42 """
43 Flatten the entire XML structure.
44 """
45 lines = [line.strip() for line in message.split("\n") if line.strip() != ""]
46 for line in lines:
47 line = re.sub(r'\n', '', line)
48 line = re.sub(r'\s\s+', ' ', line)
49 return "".join(lines)
52def normalize_dict(ordered_dict):
53 """
54 Main conversion function for the output of xmltodict to the OpenLEADR
55 representation of OpenADR contents.
57 :param ordered_dict dict: The OrderedDict, dict or dataclass that you wish to convert.
58 """
59 if is_dataclass(ordered_dict):
60 ordered_dict = asdict(ordered_dict)
62 def normalize_key(key):
63 if key.startswith('oadr'):
64 key = key[4:]
65 elif key.startswith('ei'):
66 key = key[2:]
67 # Don't normalize the measurement descriptions
68 if key in enums._MEASUREMENT_NAMESPACES:
69 return key
70 key = re.sub(r'([a-z])([A-Z])', r'\1_\2', key)
71 if '-' in key:
72 key = key.replace('-', '_')
73 return key.lower()
75 d = {}
76 for key, value in ordered_dict.items():
77 # Interpret values from the dict
78 if key.startswith("@"):
79 continue
80 key = normalize_key(key)
82 if isinstance(value, (OrderedDict, dict)):
83 d[key] = normalize_dict(value)
85 elif isinstance(value, list):
86 d[key] = []
87 for item in value:
88 if isinstance(item, (OrderedDict, dict)):
89 dict_item = normalize_dict(item)
90 d[key].append(normalize_dict(dict_item))
91 else:
92 d[key].append(item)
93 elif key in ("duration", "startafter", "max_period", "min_period"):
94 d[key] = parse_duration(value)
95 elif ("date_time" in key or key == "dtstart") and isinstance(value, str):
96 d[key] = parse_datetime(value)
97 elif value in ('true', 'false'):
98 d[key] = parse_boolean(value)
99 elif isinstance(value, str):
100 if re.match(r'^-?\d+$', value):
101 d[key] = int(value)
102 elif re.match(r'^-?[\d.]+$', value):
103 d[key] = float(value)
104 else:
105 d[key] = value
106 else:
107 d[key] = value
109 # Do our best to make the dictionary structure as pythonic as possible
110 if key.startswith("x_ei_"):
111 d[key[5:]] = d.pop(key)
112 key = key[5:]
114 # Group all targets as a list of dicts under the key "target"
115 if key == 'target':
116 targets = d.pop(key)
117 new_targets = []
118 if targets:
119 for ikey in targets:
120 if isinstance(targets[ikey], list):
121 new_targets.extend([{ikey: value} for value in targets[ikey]])
122 else:
123 new_targets.append({ikey: targets[ikey]})
124 d[key + "s"] = new_targets
125 key = key + "s"
127 # Also add a targets_by_type element to this dict
128 # to access the targets in a more convenient way.
129 d['targets_by_type'] = group_targets_by_type(new_targets)
131 # Group all reports as a list of dicts under the key "pending_reports"
132 if key == "pending_reports":
133 if isinstance(d[key], dict) and 'report_request_id' in d[key] \
134 and isinstance(d[key]['report_request_id'], list):
135 d['pending_reports'] = [{'request_id': rrid}
136 for rrid in d['pending_reports']['report_request_id']]
138 # Group all events al a list of dicts under the key "events"
139 elif key == "event" and isinstance(d[key], list):
140 events = d.pop("event")
141 new_events = []
142 for event in events:
143 new_event = event['event']
144 new_event['response_required'] = event['response_required']
145 new_events.append(new_event)
146 d["events"] = new_events
148 # If there's only one event, also put it into a list
149 elif key == "event" and isinstance(d[key], dict) and "event" in d[key]:
150 oadr_event = d.pop('event')
151 ei_event = oadr_event['event']
152 ei_event['response_required'] = oadr_event['response_required']
153 d['events'] = [ei_event]
155 elif key in ("request_event", "created_event") and isinstance(d[key], dict):
156 d = d[key]
158 # Plurarize some lists
159 elif key in ('report_request', 'report', 'specifier_payload'):
160 if isinstance(d[key], list):
161 d[key + 's'] = d.pop(key)
162 else:
163 d[key + 's'] = [d.pop(key)]
165 elif key in ('report_description', 'event_signal'):
166 descriptions = d.pop(key)
167 if not isinstance(descriptions, list):
168 descriptions = [descriptions]
169 for description in descriptions:
170 # We want to make the identification of the measurement universal
171 for measurement in enums._MEASUREMENT_NAMESPACES:
172 if measurement in description:
173 name, item = measurement, description.pop(measurement)
174 break
175 else:
176 break
177 item['description'] = item.pop('item_description', None)
178 item['unit'] = item.pop('item_units', None)
179 if 'si_scale_code' in item:
180 item['scale'] = item.pop('si_scale_code')
181 if 'pulse_factor' in item:
182 item['pulse_factor'] = item.pop('pulse_factor')
183 description['measurement'] = {'name': name,
184 **item}
185 d[key + 's'] = descriptions
187 # Promote the contents of the Qualified Event ID
188 elif key == "qualified_event_id" and isinstance(d['qualified_event_id'], dict):
189 qeid = d.pop('qualified_event_id')
190 d['event_id'] = qeid['event_id']
191 d['modification_number'] = qeid['modification_number']
193 # Durations are encapsulated in their own object, remove this nesting
194 elif isinstance(d[key], dict) and "duration" in d[key] and len(d[key]) == 1:
195 d[key] = d[key]["duration"]
197 # In general, remove all double nesting
198 elif isinstance(d[key], dict) and key in d[key] and len(d[key]) == 1:
199 d[key] = d[key][key]
201 # In general, remove the double nesting of lists of items
202 elif isinstance(d[key], dict) and key[:-1] in d[key] and len(d[key]) == 1:
203 if isinstance(d[key][key[:-1]], list):
204 d[key] = d[key][key[:-1]]
205 else:
206 d[key] = [d[key][key[:-1]]]
208 # Payload values are wrapped in an object according to their type. We don't need that.
209 elif key in ("signal_payload", "current_value"):
210 value = d[key]
211 if isinstance(d[key], dict):
212 if 'payload_float' in d[key] and 'value' in d[key]['payload_float'] \
213 and d[key]['payload_float']['value'] is not None:
214 d[key] = float(d[key]['payload_float']['value'])
215 elif 'payload_int' in d[key] and 'value' in d[key]['payload_int'] \
216 and d[key]['payload_int'] is not None:
217 d[key] = int(d[key]['payload_int']['value'])
219 # Report payloads contain an r_id and a type-wrapped payload_float
220 elif key == 'report_payload':
221 if 'payload_float' in d[key] and 'value' in d[key]['payload_float']:
222 v = d[key].pop('payload_float')
223 d[key]['value'] = float(v['value'])
224 elif 'payload_int' in d[key] and 'value' in d[key]['payload_int']:
225 v = d[key].pop('payload_float')
226 d[key]['value'] = int(v['value'])
228 # All values other than 'false' must be interpreted as True for testEvent (rule 006)
229 elif key == 'test_event' and not isinstance(d[key], bool):
230 d[key] = True
232 # Promote the 'text' item
233 elif isinstance(d[key], dict) and "text" in d[key] and len(d[key]) == 1:
234 if key == 'uid':
235 d[key] = int(d[key]["text"])
236 else:
237 d[key] = d[key]["text"]
239 # Promote a 'date-time' item
240 elif isinstance(d[key], dict) and "date_time" in d[key] and len(d[key]) == 1:
241 d[key] = d[key]["date_time"]
243 # Promote 'properties' item, discard the unused? 'components' item
244 elif isinstance(d[key], dict) and "properties" in d[key] and len(d[key]) <= 2:
245 d[key] = d[key]["properties"]
247 # Remove all empty dicts
248 elif isinstance(d[key], dict) and len(d[key]) == 0:
249 d.pop(key)
250 return d
253def parse_datetime(value):
254 """
255 Parse an ISO8601 datetime into a datetime.datetime object.
256 """
257 matches = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.?(\d{1,6})?\d*Z', value)
258 if matches:
259 year, month, day, hour, minute, second = (int(value)for value in matches.groups()[:-1])
260 micro = matches.groups()[-1]
261 if micro is None:
262 micro = 0
263 else:
264 micro = int(micro + "0" * (6 - len(micro)))
265 return datetime(year, month, day, hour, minute, second, micro, tzinfo=timezone.utc)
266 else:
267 logger.warning(f"parse_datetime: {value} did not match format")
268 return value
271def parse_duration(value):
272 """
273 Parse a RFC5545 duration.
274 """
275 if isinstance(value, timedelta):
276 return value
277 regex = r'(\+|\-)?P(?:(?:(\d+)Y)?(?:(\d+)M)?(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?)|(?:(\d+)W)'
278 matches = re.match(regex, value)
279 if not matches:
280 raise ValueError(f"The duration '{value}' did not match the requested format")
281 years, months, days, hours, minutes, seconds, weeks = (int(g) if g else 0 for g in matches.groups()[1:])
282 if years != 0:
283 logger.warning("Received a duration that specifies years, which is not a determinate duration. "
284 "It will be interpreted as 1 year = 365 days.")
285 days = days + 365 * years
286 if months != 0:
287 logger.warning("Received a duration that specifies months, which is not a determinate duration "
288 "It will be interpreted as 1 month = 30 days.")
289 days = days + 30 * months
290 duration = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds)
291 if matches.groups()[0] == "-":
292 duration = -1 * duration
293 return duration
296def parse_boolean(value):
297 if value == 'true':
298 return True
299 else:
300 return False
303def datetimeformat(value, format=DATETIME_FORMAT):
304 """
305 Format a given datetime as a UTC ISO3339 string.
306 """
307 if not isinstance(value, datetime):
308 return value
309 return value.astimezone(timezone.utc).strftime(format)
312def timedeltaformat(value):
313 """
314 Format a timedelta to a RFC5545 Duration.
315 """
316 if not isinstance(value, timedelta):
317 return value
318 days = value.days
319 hours, seconds = divmod(value.seconds, 3600)
320 minutes, seconds = divmod(seconds, 60)
321 formatted = "P"
322 if days:
323 formatted += f"{days}D"
324 if hours or minutes or seconds:
325 formatted += "T"
326 if hours:
327 formatted += f"{hours}H"
328 if minutes:
329 formatted += f"{minutes}M"
330 if seconds:
331 formatted += f"{seconds}S"
332 return formatted
335def booleanformat(value):
336 """
337 Format a boolean value
338 """
339 if isinstance(value, bool):
340 if value is True:
341 return "true"
342 elif value is False:
343 return "false"
344 elif value in ("true", "false"):
345 return value
346 else:
347 raise ValueError(f"A boolean value must be provided, not {value}.")
350def ensure_bytes(obj):
351 """
352 Converts a utf-8 str object to bytes.
353 """
354 if obj is None:
355 return obj
356 if isinstance(obj, bytes):
357 return obj
358 if isinstance(obj, str):
359 return bytes(obj, 'utf-8')
360 else:
361 raise TypeError("Must be bytes or str")
364def ensure_str(obj):
365 """
366 Converts bytes to a utf-8 string.
367 """
368 if obj is None:
369 return None
370 if isinstance(obj, str):
371 return obj
372 if isinstance(obj, bytes):
373 return obj.decode('utf-8')
374 else:
375 raise TypeError("Must be bytes or str")
378def certificate_fingerprint_from_der(der_bytes):
379 hash = hashlib.sha256(der_bytes).digest().hex()
380 return ":".join([hash[i-2:i].upper() for i in range(-20, 0, 2)])
383def certificate_fingerprint(certificate_str):
384 """
385 Calculate the fingerprint for the given certificate, as defined by OpenADR.
386 """
387 der_bytes = ssl.PEM_cert_to_DER_cert(ensure_str(certificate_str))
388 return certificate_fingerprint_from_der(der_bytes)
391def extract_pem_cert(tree):
392 """
393 Extract a given X509 certificate inside an XML tree and return the standard
394 form of a PEM-encoded certificate.
396 :param tree lxml.etree: The tree that contains the X509 element. This is
397 usually the KeyInfo element from the XMLDsig Signature
398 part of the message.
399 """
400 cert = tree.find('.//{http://www.w3.org/2000/09/xmldsig#}X509Certificate').text
401 return "-----BEGIN CERTIFICATE-----\n" + cert + "-----END CERTIFICATE-----\n"
404def find_by(dict_or_list, key, value, *args):
405 """
406 Find a dict inside a dict or list by key, value properties.
407 You can search for a nesting by separating the levels with a period (.).
408 """
409 search_params = [(key, value)]
410 if args:
411 search_params += [(args[i], args[i+1]) for i in range(0, len(args), 2)]
412 if isinstance(dict_or_list, dict):
413 dict_or_list = dict_or_list.values()
414 for item in dict_or_list:
415 for key, value in search_params:
416 _item = item
417 keys = key.split(".")
418 for key in keys[:-1]:
419 if not hasmember(_item, key):
420 break
421 _item = getmember(_item, key)
422 key = keys[-1]
423 if isinstance(value, tuple):
424 if not hasmember(_item, key) or getmember(_item, key) not in value:
425 break
426 else:
427 if not hasmember(_item, key) or getmember(_item, key) != value:
428 break
429 else:
430 return item
431 else:
432 return None
435def group_by(list_, key, pop_key=False):
436 """
437 Return a dict that groups values
438 """
439 grouped = {}
440 key_path = key.split(".")
441 for item in list_:
442 value = item
443 for key in key_path:
444 value = value.get(key)
445 if value not in grouped:
446 grouped[value] = []
447 grouped[value].append(item)
448 return grouped
451def pop_by(list_, key, value, *args):
452 """
453 Pop the first item that satisfies the search params from the given list.
454 """
455 item = find_by(list_, key, value, *args)
456 if item:
457 index = list_.index(item)
458 list_.pop(index)
459 return item
462def cron_config(interval, randomize_seconds=False):
463 """
464 Returns a dict with cron settings for the given interval
465 """
466 if interval < timedelta(minutes=1):
467 second = f"*/{interval.seconds}"
468 minute = "*"
469 hour = "*"
470 elif interval < timedelta(hours=1):
471 second = "0"
472 minute = f"*/{int(interval.total_seconds()/60)}"
473 hour = "*"
474 elif interval < timedelta(hours=24):
475 second = "0"
476 minute = "0"
477 hour = f"*/{int(interval.total_seconds()/3600)}"
478 else:
479 second = "0"
480 minute = "0"
481 hour = "0"
482 cron_config = {"second": second, "minute": minute, "hour": hour}
483 if randomize_seconds:
484 jitter = min(int(interval.total_seconds() / 10), 300)
485 cron_config['jitter'] = jitter
486 return cron_config
489def get_cert_fingerprint_from_request(request):
490 ssl_object = request.transport.get_extra_info('ssl_object')
491 if ssl_object:
492 der_bytes = ssl_object.getpeercert(binary_form=True)
493 if der_bytes:
494 return certificate_fingerprint_from_der(der_bytes)
497def group_targets_by_type(list_of_targets):
498 targets_by_type = {}
499 for target in list_of_targets:
500 for key, value in target.items():
501 if value is None:
502 continue
503 if key not in targets_by_type:
504 targets_by_type[key] = []
505 targets_by_type[key].append(value)
506 return targets_by_type
509def ungroup_targets_by_type(targets_by_type):
510 ungrouped_targets = []
511 for target_type, targets in targets_by_type.items():
512 if isinstance(targets, list):
513 for target in targets:
514 ungrouped_targets.append({target_type: target})
515 elif isinstance(targets, str):
516 ungrouped_targets.append({target_type: targets})
517 return ungrouped_targets
520def validate_report_measurement_dict(measurement):
521 from openleadr.enums import _ACCEPTABLE_UNITS, _MEASUREMENT_DESCRIPTIONS
523 if 'name' not in measurement \
524 or 'description' not in measurement \
525 or 'unit' not in measurement:
526 raise ValueError("The measurement dict must contain the following keys: "
527 "'name', 'description', 'unit'. Please correct this.")
529 name = measurement['name']
530 description = measurement['description']
531 unit = measurement['unit']
533 # Validate the item name and description match
534 if name in _MEASUREMENT_DESCRIPTIONS:
535 required_description = _MEASUREMENT_DESCRIPTIONS[name]
536 if description != required_description:
537 if description.lower() == required_description.lower():
538 logger.warning(f"The description for the measurement with name '{name}' "
539 f"was not in the correct case; you provided '{description}' but "
540 f"it should be '{required_description}'. "
541 "This was automatically corrected.")
542 measurement['description'] = required_description
543 else:
544 raise ValueError(f"The measurement's description '{description}' "
545 f"did not match the expected description for this type "
546 f" ('{required_description}'). Please correct this, or use "
547 "'customUnit' as the name.")
548 if unit not in _ACCEPTABLE_UNITS[name]:
549 raise ValueError(f"The unit '{unit}' is not acceptable for measurement '{name}'. Allowed "
550 f"units are: '" + "', '".join(_ACCEPTABLE_UNITS[name]) + "'.")
551 else:
552 if name != 'customUnit':
553 logger.warning(f"You provided a measurement with an unknown name {name}. "
554 "This was corrected to 'customUnit'. Please correct this in your "
555 "report definition.")
556 measurement['name'] = 'customUnit'
558 if 'power' in name:
559 if 'power_attributes' in measurement:
560 power_attributes = measurement['power_attributes']
561 if 'voltage' not in power_attributes \
562 or 'ac' not in power_attributes \
563 or 'hertz' not in power_attributes:
564 raise ValueError("The power_attributes of the measurement must contain the "
565 "following keys: 'voltage' (int), 'ac' (bool), 'hertz' (int).")
566 else:
567 raise ValueError("A 'power' related measurement must contain a "
568 "'power_attributes' section that contains the following "
569 "keys: 'voltage' (int), 'ac' (boolean), 'hertz' (int)")
572def get_active_period_from_intervals(intervals, as_dict=True):
573 if is_dataclass(intervals[0]):
574 intervals = [asdict(i) for i in intervals]
575 period_start = min([i['dtstart'] for i in intervals])
576 period_duration = max([i['dtstart'] + i['duration'] - period_start for i in intervals])
577 if as_dict:
578 return {'dtstart': period_start,
579 'duration': period_duration}
580 else:
581 from openleadr.objects import ActivePeriod
582 return ActivePeriod(dtstart=period_start, duration=period_duration)
585def determine_event_status(active_period):
586 now = datetime.now(timezone.utc)
587 active_period_start = getmember(active_period, 'dtstart')
588 if active_period_start.tzinfo is None:
589 active_period_start = active_period_start.astimezone(timezone.utc)
590 setmember(active_period, 'dtstart', active_period_start)
591 active_period_end = active_period_start + getmember(active_period, 'duration')
592 if now >= active_period_end:
593 return 'completed'
594 if now >= active_period_start:
595 return 'active'
596 if getmember(active_period, 'ramp_up_period', missing=None) is not None:
597 ramp_up_start = active_period_start - getmember(active_period, 'ramp_up_period')
598 if now >= ramp_up_start:
599 return 'near'
600 return 'far'
603def hasmember(obj, member):
604 """
605 Check if a dict or dataclass has the given member
606 """
607 if is_dataclass(obj):
608 if hasattr(obj, member):
609 return True
610 else:
611 if member in obj:
612 return True
613 return False
616def getmember(obj, member, missing='_RAISE_'):
617 """
618 Get a member from a dict or dataclass. Nesting is possible.
619 """
620 def getmember_inner(obj, member, missing='_RAISE_'):
621 if is_dataclass(obj):
622 if not missing == '_RAISE_' and not hasattr(obj, member):
623 return missing
624 else:
625 return getattr(obj, member)
626 else:
627 if missing == '_RAISE_':
628 return obj[member]
629 else:
630 return obj.get(member, missing)
632 for m in member.split("."):
633 obj = getmember_inner(obj, m, missing=missing)
634 return obj
637def setmember(obj, member, value):
638 """
639 Set a member of a dict of dataclass
640 """
641 if '.' in member:
642 members = member.split('.')
643 obj = getmember(obj, ".".join(members[:-1]))
644 member = members[-1]
646 if is_dataclass(obj):
647 setattr(obj, member, value)
648 else:
649 obj[member] = value
652def validate_report_request_tuples(list_of_report_requests, mode='full'):
653 if len(list_of_report_requests) == 0:
654 return
655 for report_requests in list_of_report_requests:
656 if report_requests is None:
657 continue
658 for i, rrq in enumerate(report_requests):
659 if rrq is None:
660 continue
662 # Check if it is a tuple
663 elif not isinstance(rrq, tuple):
664 report_requests[i] = None
665 if mode == 'full':
666 logger.error("Your on_register_report handler did not return a list of tuples. "
667 f"The first item from the list was '{rrq}' ({rrq.__class__.__name__}).")
668 else:
669 logger.error("Your on_register_report handler did not return a tuple. "
670 f"It returned '{rrq}'. Please see the documentation for the correct format.")
672 # Check if it has the correct length
673 elif not len(rrq) in (3, 4):
674 report_requests[i] = None
675 if mode == 'full':
676 logger.error("Your on_register_report handler returned tuples of the wrong length. "
677 f"It should be 3 or 4. It returned: '{rrq}'.")
678 else:
679 logger.error("Your on_register_report handler returned a tuple of the wrong length. "
680 f"It should be 2 or 3. It returned: '{rrq[1:]}'.")
682 # Check if the first element is callable
683 elif not callable(rrq[1]):
684 report_requests[i] = None
685 if mode == 'full':
686 logger.error(f"Your on_register_report handler did not return the correct tuple. "
687 "It should return a list of (r_id, callback, sampling_interval) or "
688 "(r_id, callback, sampling_interval, reporting_interval) tuples, where "
689 "the r_id is a string, callback is a callable function or coroutine, and "
690 "sampling_interval and reporting_interval are of type datetime.timedelta. "
691 f"It returned: '{rrq}'. The second element was not callable.")
692 else:
693 logger.error(f"Your on_register_report handler did not return the correct tuple. "
694 "It should return a (callback, sampling_interval) or "
695 "(callback, sampling_interval, reporting_interval) tuple, where "
696 "the callback is a callable function or coroutine, and "
697 "sampling_interval and reporting_interval are of type datetime.timedelta. "
698 f"It returned: '{rrq[1:]}'. The first element was not callable.")
700 # Check if the second element is a timedelta
701 elif not isinstance(rrq[2], timedelta):
702 report_requests[i] = None
703 if mode == 'full':
704 logger.error(f"Your on_register_report handler did not return the correct tuple. "
705 "It should return a list of (r_id, callback, sampling_interval) or "
706 "(r_id, callback, sampling_interval, reporting_interval) tuples, where "
707 "sampling_interval and reporting_interval are of type datetime.timedelta. "
708 f"It returned: '{rrq}'. The third element was not of type timedelta.")
709 else:
710 logger.error(f"Your on_register_report handler did not return the correct tuple. "
711 "It should return a (callback, sampling_interval) or "
712 "(callback, sampling_interval, reporting_interval) tuple, where "
713 "sampling_interval and reporting_interval are of type datetime.timedelta. "
714 f"It returned: '{rrq[1:]}'. The second element was not of type timedelta.")
716 # Check if the third element is a timedelta (if it exists)
717 elif len(rrq) == 4 and not isinstance(rrq[3], timedelta):
718 report_requests[i] = None
719 if mode == 'full':
720 logger.error(f"Your on_register_report handler did not return the correct tuple. "
721 "It should return a list of (r_id, callback, sampling_interval) or "
722 "(r_id, callback, sampling_interval, reporting_interval) tuples, where "
723 "sampling_interval and reporting_interval are of type datetime.timedelta. "
724 f"It returned: '{rrq}'. The fourth element was not of type timedelta.")
725 else:
726 logger.error(f"Your on_register_report handler did not return the correct tuple. "
727 "It should return a (callback, sampling_interval) or "
728 "(callback, sampling_interval, reporting_interval) tuple, where "
729 "sampling_interval and reporting_interval are of type datetime.timedelta. "
730 f"It returned: '{rrq[1:]}'. The third element was not of type timedelta.")
733async def await_if_required(result):
734 if asyncio.iscoroutine(result):
735 result = await result
736 return result
739async def gather_if_required(results):
740 if results is None:
741 return results
742 if len(results) > 0:
743 if not any([asyncio.iscoroutine(r) for r in results]):
744 results = results
745 elif all([asyncio.iscoroutine(r) for r in results]):
746 results = await asyncio.gather(*results)
747 else:
748 results = [await await_if_required(result) for result in results]
749 return results
752def order_events(events, limit=None, offset=None):
753 """
754 Order the events according to the OpenADR rules:
755 - active events before inactive events
756 - high priority before low priority
757 - earlier before later
758 """
759 def event_priority(event):
760 # The default and lowest priority is 0, which we should interpret as a high value.
761 priority = getmember(event, 'event_descriptor.priority', missing=float('inf'))
762 if priority == 0:
763 priority = float('inf')
764 return priority
766 if events is None:
767 return None
768 if isinstance(events, objects.Event):
769 events = [events]
770 elif isinstance(events, dict):
771 events = [events]
773 # Update the event statuses
774 for event in events:
775 if getmember(event, 'event_descriptor.event_status') != enums.EVENT_STATUS.CANCELLED:
776 event_status = determine_event_status(getmember(event, 'active_period'))
777 setmember(event, 'event_descriptor.event_status', event_status)
779 # Short circuit if we only have one event:
780 if len(events) == 1:
781 return events
783 # Get all the active events first
784 active_events = [event for event in events
785 if getmember(event, 'event_descriptor.event_status') == 'active']
786 other_events = [event for event in events
787 if getmember(event, 'event_descriptor.event_status') != 'active']
789 # Sort the active events by priority
790 active_events.sort(key=lambda e: event_priority(e))
792 # Sort the active events by start date
793 active_events.sort(key=lambda e: getmember(e, 'active_period.dtstart'))
795 # Sort the non-active events by their start date
796 other_events.sort(key=lambda e: getmember(e, 'active_period.dtstart'))
798 ordered_events = active_events + other_events
799 if limit and offset:
800 return ordered_events[offset:offset+limit]
801 return ordered_events
804def increment_event_modification_number(event):
805 """
806 Increments the modification number of the event by 1 and returns the new modification number.
807 """
808 modification_number = getmember(event, 'event_descriptor.modification_number') + 1
809 setmember(event, 'event_descriptor.modification_number', modification_number)
810 return modification_number