Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-License-Identifier: Apache-2.0 

2 

3# Copyright 2020 Contributors to OpenLEADR 

4 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8 

9# http://www.apache.org/licenses/LICENSE-2.0 

10 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17from datetime import datetime, timedelta, timezone 

18from dataclasses import is_dataclass, asdict 

19from collections import OrderedDict 

20from openleadr import enums, objects 

21import asyncio 

22import re 

23import ssl 

24import hashlib 

25import uuid 

26import logging 

27 

28logger = logging.getLogger('openleadr') 

29 

30DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 

31DATETIME_FORMAT_NO_MICROSECONDS = "%Y-%m-%dT%H:%M:%SZ" 

32 

33 

34def generate_id(*args, **kwargs): 

35 """ 

36 Generate a string that can be used as an identifier in OpenADR messages. 

37 """ 

38 return str(uuid.uuid4()) 

39 

40 

41def flatten_xml(message): 

42 """ 

43 Flatten the entire XML structure. 

44 """ 

45 lines = [line.strip() for line in message.split("\n") if line.strip() != ""] 

46 for line in lines: 

47 line = re.sub(r'\n', '', line) 

48 line = re.sub(r'\s\s+', ' ', line) 

49 return "".join(lines) 

50 

51 

52def normalize_dict(ordered_dict): 

53 """ 

54 Main conversion function for the output of xmltodict to the OpenLEADR 

55 representation of OpenADR contents. 

56 

57 :param ordered_dict dict: The OrderedDict, dict or dataclass that you wish to convert. 

58 """ 

59 if is_dataclass(ordered_dict): 

60 ordered_dict = asdict(ordered_dict) 

61 

62 def normalize_key(key): 

63 if key.startswith('oadr'): 

64 key = key[4:] 

65 elif key.startswith('ei'): 

66 key = key[2:] 

67 # Don't normalize the measurement descriptions 

68 if key in enums._MEASUREMENT_NAMESPACES: 

69 return key 

70 key = re.sub(r'([a-z])([A-Z])', r'\1_\2', key) 

71 if '-' in key: 

72 key = key.replace('-', '_') 

73 return key.lower() 

74 

75 d = {} 

76 for key, value in ordered_dict.items(): 

77 # Interpret values from the dict 

78 if key.startswith("@"): 

79 continue 

80 key = normalize_key(key) 

81 

82 if isinstance(value, (OrderedDict, dict)): 

83 d[key] = normalize_dict(value) 

84 

85 elif isinstance(value, list): 

86 d[key] = [] 

87 for item in value: 

88 if isinstance(item, (OrderedDict, dict)): 

89 dict_item = normalize_dict(item) 

90 d[key].append(normalize_dict(dict_item)) 

91 else: 

92 d[key].append(item) 

93 elif key in ("duration", "startafter", "max_period", "min_period"): 

94 d[key] = parse_duration(value) 

95 elif ("date_time" in key or key == "dtstart") and isinstance(value, str): 

96 d[key] = parse_datetime(value) 

97 elif value in ('true', 'false'): 

98 d[key] = parse_boolean(value) 

99 elif isinstance(value, str): 

100 if re.match(r'^-?\d+$', value): 

101 d[key] = int(value) 

102 elif re.match(r'^-?[\d.]+$', value): 

103 d[key] = float(value) 

104 else: 

105 d[key] = value 

106 else: 

107 d[key] = value 

108 

109 # Do our best to make the dictionary structure as pythonic as possible 

110 if key.startswith("x_ei_"): 

111 d[key[5:]] = d.pop(key) 

112 key = key[5:] 

113 

114 # Group all targets as a list of dicts under the key "target" 

115 if key == 'target': 

116 targets = d.pop(key) 

117 new_targets = [] 

118 if targets: 

119 for ikey in targets: 

120 if isinstance(targets[ikey], list): 

121 new_targets.extend([{ikey: value} for value in targets[ikey]]) 

122 else: 

123 new_targets.append({ikey: targets[ikey]}) 

124 d[key + "s"] = new_targets 

125 key = key + "s" 

126 

127 # Also add a targets_by_type element to this dict 

128 # to access the targets in a more convenient way. 

129 d['targets_by_type'] = group_targets_by_type(new_targets) 

130 

131 # Group all reports as a list of dicts under the key "pending_reports" 

132 if key == "pending_reports": 

133 if isinstance(d[key], dict) and 'report_request_id' in d[key] \ 

134 and isinstance(d[key]['report_request_id'], list): 

135 d['pending_reports'] = [{'request_id': rrid} 

136 for rrid in d['pending_reports']['report_request_id']] 

137 

138 # Group all events al a list of dicts under the key "events" 

139 elif key == "event" and isinstance(d[key], list): 

140 events = d.pop("event") 

141 new_events = [] 

142 for event in events: 

143 new_event = event['event'] 

144 new_event['response_required'] = event['response_required'] 

145 new_events.append(new_event) 

146 d["events"] = new_events 

147 

148 # If there's only one event, also put it into a list 

149 elif key == "event" and isinstance(d[key], dict) and "event" in d[key]: 

150 oadr_event = d.pop('event') 

151 ei_event = oadr_event['event'] 

152 ei_event['response_required'] = oadr_event['response_required'] 

153 d['events'] = [ei_event] 

154 

155 elif key in ("request_event", "created_event") and isinstance(d[key], dict): 

156 d = d[key] 

157 

158 # Plurarize some lists 

159 elif key in ('report_request', 'report', 'specifier_payload'): 

160 if isinstance(d[key], list): 

161 d[key + 's'] = d.pop(key) 

162 else: 

163 d[key + 's'] = [d.pop(key)] 

164 

165 elif key in ('report_description', 'event_signal'): 

166 descriptions = d.pop(key) 

167 if not isinstance(descriptions, list): 

168 descriptions = [descriptions] 

169 for description in descriptions: 

170 # We want to make the identification of the measurement universal 

171 for measurement in enums._MEASUREMENT_NAMESPACES: 

172 if measurement in description: 

173 name, item = measurement, description.pop(measurement) 

174 break 

175 else: 

176 break 

177 item['description'] = item.pop('item_description', None) 

178 item['unit'] = item.pop('item_units', None) 

179 if 'si_scale_code' in item: 

180 item['scale'] = item.pop('si_scale_code') 

181 if 'pulse_factor' in item: 

182 item['pulse_factor'] = item.pop('pulse_factor') 

183 description['measurement'] = {'name': name, 

184 **item} 

185 d[key + 's'] = descriptions 

186 

187 # Promote the contents of the Qualified Event ID 

188 elif key == "qualified_event_id" and isinstance(d['qualified_event_id'], dict): 

189 qeid = d.pop('qualified_event_id') 

190 d['event_id'] = qeid['event_id'] 

191 d['modification_number'] = qeid['modification_number'] 

192 

193 # Durations are encapsulated in their own object, remove this nesting 

194 elif isinstance(d[key], dict) and "duration" in d[key] and len(d[key]) == 1: 

195 d[key] = d[key]["duration"] 

196 

197 # In general, remove all double nesting 

198 elif isinstance(d[key], dict) and key in d[key] and len(d[key]) == 1: 

199 d[key] = d[key][key] 

200 

201 # In general, remove the double nesting of lists of items 

202 elif isinstance(d[key], dict) and key[:-1] in d[key] and len(d[key]) == 1: 

203 if isinstance(d[key][key[:-1]], list): 

204 d[key] = d[key][key[:-1]] 

205 else: 

206 d[key] = [d[key][key[:-1]]] 

207 

208 # Payload values are wrapped in an object according to their type. We don't need that. 

209 elif key in ("signal_payload", "current_value"): 

210 value = d[key] 

211 if isinstance(d[key], dict): 

212 if 'payload_float' in d[key] and 'value' in d[key]['payload_float'] \ 

213 and d[key]['payload_float']['value'] is not None: 

214 d[key] = float(d[key]['payload_float']['value']) 

215 elif 'payload_int' in d[key] and 'value' in d[key]['payload_int'] \ 

216 and d[key]['payload_int'] is not None: 

217 d[key] = int(d[key]['payload_int']['value']) 

218 

219 # Report payloads contain an r_id and a type-wrapped payload_float 

220 elif key == 'report_payload': 

221 if 'payload_float' in d[key] and 'value' in d[key]['payload_float']: 

222 v = d[key].pop('payload_float') 

223 d[key]['value'] = float(v['value']) 

224 elif 'payload_int' in d[key] and 'value' in d[key]['payload_int']: 

225 v = d[key].pop('payload_float') 

226 d[key]['value'] = int(v['value']) 

227 

228 # All values other than 'false' must be interpreted as True for testEvent (rule 006) 

229 elif key == 'test_event' and not isinstance(d[key], bool): 

230 d[key] = True 

231 

232 # Promote the 'text' item 

233 elif isinstance(d[key], dict) and "text" in d[key] and len(d[key]) == 1: 

234 if key == 'uid': 

235 d[key] = int(d[key]["text"]) 

236 else: 

237 d[key] = d[key]["text"] 

238 

239 # Promote a 'date-time' item 

240 elif isinstance(d[key], dict) and "date_time" in d[key] and len(d[key]) == 1: 

241 d[key] = d[key]["date_time"] 

242 

243 # Promote 'properties' item, discard the unused? 'components' item 

244 elif isinstance(d[key], dict) and "properties" in d[key] and len(d[key]) <= 2: 

245 d[key] = d[key]["properties"] 

246 

247 # Remove all empty dicts 

248 elif isinstance(d[key], dict) and len(d[key]) == 0: 

249 d.pop(key) 

250 return d 

251 

252 

253def parse_datetime(value): 

254 """ 

255 Parse an ISO8601 datetime into a datetime.datetime object. 

256 """ 

257 matches = re.match(r'(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})\.?(\d{1,6})?\d*Z', value) 

258 if matches: 

259 year, month, day, hour, minute, second = (int(value)for value in matches.groups()[:-1]) 

260 micro = matches.groups()[-1] 

261 if micro is None: 

262 micro = 0 

263 else: 

264 micro = int(micro + "0" * (6 - len(micro))) 

265 return datetime(year, month, day, hour, minute, second, micro, tzinfo=timezone.utc) 

266 else: 

267 logger.warning(f"parse_datetime: {value} did not match format") 

268 return value 

269 

270 

271def parse_duration(value): 

272 """ 

273 Parse a RFC5545 duration. 

274 """ 

275 if isinstance(value, timedelta): 

276 return value 

277 regex = r'(\+|\-)?P(?:(?:(\d+)Y)?(?:(\d+)M)?(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?)|(?:(\d+)W)' 

278 matches = re.match(regex, value) 

279 if not matches: 

280 raise ValueError(f"The duration '{value}' did not match the requested format") 

281 years, months, days, hours, minutes, seconds, weeks = (int(g) if g else 0 for g in matches.groups()[1:]) 

282 if years != 0: 

283 logger.warning("Received a duration that specifies years, which is not a determinate duration. " 

284 "It will be interpreted as 1 year = 365 days.") 

285 days = days + 365 * years 

286 if months != 0: 

287 logger.warning("Received a duration that specifies months, which is not a determinate duration " 

288 "It will be interpreted as 1 month = 30 days.") 

289 days = days + 30 * months 

290 duration = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) 

291 if matches.groups()[0] == "-": 

292 duration = -1 * duration 

293 return duration 

294 

295 

296def parse_boolean(value): 

297 if value == 'true': 

298 return True 

299 else: 

300 return False 

301 

302 

303def datetimeformat(value, format=DATETIME_FORMAT): 

304 """ 

305 Format a given datetime as a UTC ISO3339 string. 

306 """ 

307 if not isinstance(value, datetime): 

308 return value 

309 return value.astimezone(timezone.utc).strftime(format) 

310 

311 

312def timedeltaformat(value): 

313 """ 

314 Format a timedelta to a RFC5545 Duration. 

315 """ 

316 if not isinstance(value, timedelta): 

317 return value 

318 days = value.days 

319 hours, seconds = divmod(value.seconds, 3600) 

320 minutes, seconds = divmod(seconds, 60) 

321 formatted = "P" 

322 if days: 

323 formatted += f"{days}D" 

324 if hours or minutes or seconds: 

325 formatted += "T" 

326 if hours: 

327 formatted += f"{hours}H" 

328 if minutes: 

329 formatted += f"{minutes}M" 

330 if seconds: 

331 formatted += f"{seconds}S" 

332 return formatted 

333 

334 

335def booleanformat(value): 

336 """ 

337 Format a boolean value 

338 """ 

339 if isinstance(value, bool): 

340 if value is True: 

341 return "true" 

342 elif value is False: 

343 return "false" 

344 elif value in ("true", "false"): 

345 return value 

346 else: 

347 raise ValueError(f"A boolean value must be provided, not {value}.") 

348 

349 

350def ensure_bytes(obj): 

351 """ 

352 Converts a utf-8 str object to bytes. 

353 """ 

354 if obj is None: 

355 return obj 

356 if isinstance(obj, bytes): 

357 return obj 

358 if isinstance(obj, str): 

359 return bytes(obj, 'utf-8') 

360 else: 

361 raise TypeError("Must be bytes or str") 

362 

363 

364def ensure_str(obj): 

365 """ 

366 Converts bytes to a utf-8 string. 

367 """ 

368 if obj is None: 

369 return None 

370 if isinstance(obj, str): 

371 return obj 

372 if isinstance(obj, bytes): 

373 return obj.decode('utf-8') 

374 else: 

375 raise TypeError("Must be bytes or str") 

376 

377 

378def certificate_fingerprint_from_der(der_bytes): 

379 hash = hashlib.sha256(der_bytes).digest().hex() 

380 return ":".join([hash[i-2:i].upper() for i in range(-20, 0, 2)]) 

381 

382 

383def certificate_fingerprint(certificate_str): 

384 """ 

385 Calculate the fingerprint for the given certificate, as defined by OpenADR. 

386 """ 

387 der_bytes = ssl.PEM_cert_to_DER_cert(ensure_str(certificate_str)) 

388 return certificate_fingerprint_from_der(der_bytes) 

389 

390 

391def extract_pem_cert(tree): 

392 """ 

393 Extract a given X509 certificate inside an XML tree and return the standard 

394 form of a PEM-encoded certificate. 

395 

396 :param tree lxml.etree: The tree that contains the X509 element. This is 

397 usually the KeyInfo element from the XMLDsig Signature 

398 part of the message. 

399 """ 

400 cert = tree.find('.//{http://www.w3.org/2000/09/xmldsig#}X509Certificate').text 

401 return "-----BEGIN CERTIFICATE-----\n" + cert + "-----END CERTIFICATE-----\n" 

402 

403 

404def find_by(dict_or_list, key, value, *args): 

405 """ 

406 Find a dict inside a dict or list by key, value properties. 

407 You can search for a nesting by separating the levels with a period (.). 

408 """ 

409 search_params = [(key, value)] 

410 if args: 

411 search_params += [(args[i], args[i+1]) for i in range(0, len(args), 2)] 

412 if isinstance(dict_or_list, dict): 

413 dict_or_list = dict_or_list.values() 

414 for item in dict_or_list: 

415 for key, value in search_params: 

416 _item = item 

417 keys = key.split(".") 

418 for key in keys[:-1]: 

419 if not hasmember(_item, key): 

420 break 

421 _item = getmember(_item, key) 

422 key = keys[-1] 

423 if isinstance(value, tuple): 

424 if not hasmember(_item, key) or getmember(_item, key) not in value: 

425 break 

426 else: 

427 if not hasmember(_item, key) or getmember(_item, key) != value: 

428 break 

429 else: 

430 return item 

431 else: 

432 return None 

433 

434 

435def group_by(list_, key, pop_key=False): 

436 """ 

437 Return a dict that groups values 

438 """ 

439 grouped = {} 

440 key_path = key.split(".") 

441 for item in list_: 

442 value = item 

443 for key in key_path: 

444 value = value.get(key) 

445 if value not in grouped: 

446 grouped[value] = [] 

447 grouped[value].append(item) 

448 return grouped 

449 

450 

451def pop_by(list_, key, value, *args): 

452 """ 

453 Pop the first item that satisfies the search params from the given list. 

454 """ 

455 item = find_by(list_, key, value, *args) 

456 if item: 

457 index = list_.index(item) 

458 list_.pop(index) 

459 return item 

460 

461 

462def cron_config(interval, randomize_seconds=False): 

463 """ 

464 Returns a dict with cron settings for the given interval 

465 """ 

466 if interval < timedelta(minutes=1): 

467 second = f"*/{interval.seconds}" 

468 minute = "*" 

469 hour = "*" 

470 elif interval < timedelta(hours=1): 

471 second = "0" 

472 minute = f"*/{int(interval.total_seconds()/60)}" 

473 hour = "*" 

474 elif interval < timedelta(hours=24): 

475 second = "0" 

476 minute = "0" 

477 hour = f"*/{int(interval.total_seconds()/3600)}" 

478 else: 

479 second = "0" 

480 minute = "0" 

481 hour = "0" 

482 cron_config = {"second": second, "minute": minute, "hour": hour} 

483 if randomize_seconds: 

484 jitter = min(int(interval.total_seconds() / 10), 300) 

485 cron_config['jitter'] = jitter 

486 return cron_config 

487 

488 

489def get_cert_fingerprint_from_request(request): 

490 ssl_object = request.transport.get_extra_info('ssl_object') 

491 if ssl_object: 

492 der_bytes = ssl_object.getpeercert(binary_form=True) 

493 if der_bytes: 

494 return certificate_fingerprint_from_der(der_bytes) 

495 

496 

497def group_targets_by_type(list_of_targets): 

498 targets_by_type = {} 

499 for target in list_of_targets: 

500 for key, value in target.items(): 

501 if value is None: 

502 continue 

503 if key not in targets_by_type: 

504 targets_by_type[key] = [] 

505 targets_by_type[key].append(value) 

506 return targets_by_type 

507 

508 

509def ungroup_targets_by_type(targets_by_type): 

510 ungrouped_targets = [] 

511 for target_type, targets in targets_by_type.items(): 

512 if isinstance(targets, list): 

513 for target in targets: 

514 ungrouped_targets.append({target_type: target}) 

515 elif isinstance(targets, str): 

516 ungrouped_targets.append({target_type: targets}) 

517 return ungrouped_targets 

518 

519 

520def validate_report_measurement_dict(measurement): 

521 from openleadr.enums import _ACCEPTABLE_UNITS, _MEASUREMENT_DESCRIPTIONS 

522 

523 if 'name' not in measurement \ 

524 or 'description' not in measurement \ 

525 or 'unit' not in measurement: 

526 raise ValueError("The measurement dict must contain the following keys: " 

527 "'name', 'description', 'unit'. Please correct this.") 

528 

529 name = measurement['name'] 

530 description = measurement['description'] 

531 unit = measurement['unit'] 

532 

533 # Validate the item name and description match 

534 if name in _MEASUREMENT_DESCRIPTIONS: 

535 required_description = _MEASUREMENT_DESCRIPTIONS[name] 

536 if description != required_description: 

537 if description.lower() == required_description.lower(): 

538 logger.warning(f"The description for the measurement with name '{name}' " 

539 f"was not in the correct case; you provided '{description}' but " 

540 f"it should be '{required_description}'. " 

541 "This was automatically corrected.") 

542 measurement['description'] = required_description 

543 else: 

544 raise ValueError(f"The measurement's description '{description}' " 

545 f"did not match the expected description for this type " 

546 f" ('{required_description}'). Please correct this, or use " 

547 "'customUnit' as the name.") 

548 if unit not in _ACCEPTABLE_UNITS[name]: 

549 raise ValueError(f"The unit '{unit}' is not acceptable for measurement '{name}'. Allowed " 

550 f"units are: '" + "', '".join(_ACCEPTABLE_UNITS[name]) + "'.") 

551 else: 

552 if name != 'customUnit': 

553 logger.warning(f"You provided a measurement with an unknown name {name}. " 

554 "This was corrected to 'customUnit'. Please correct this in your " 

555 "report definition.") 

556 measurement['name'] = 'customUnit' 

557 

558 if 'power' in name: 

559 if 'power_attributes' in measurement: 

560 power_attributes = measurement['power_attributes'] 

561 if 'voltage' not in power_attributes \ 

562 or 'ac' not in power_attributes \ 

563 or 'hertz' not in power_attributes: 

564 raise ValueError("The power_attributes of the measurement must contain the " 

565 "following keys: 'voltage' (int), 'ac' (bool), 'hertz' (int).") 

566 else: 

567 raise ValueError("A 'power' related measurement must contain a " 

568 "'power_attributes' section that contains the following " 

569 "keys: 'voltage' (int), 'ac' (boolean), 'hertz' (int)") 

570 

571 

572def get_active_period_from_intervals(intervals, as_dict=True): 

573 if is_dataclass(intervals[0]): 

574 intervals = [asdict(i) for i in intervals] 

575 period_start = min([i['dtstart'] for i in intervals]) 

576 period_duration = max([i['dtstart'] + i['duration'] - period_start for i in intervals]) 

577 if as_dict: 

578 return {'dtstart': period_start, 

579 'duration': period_duration} 

580 else: 

581 from openleadr.objects import ActivePeriod 

582 return ActivePeriod(dtstart=period_start, duration=period_duration) 

583 

584 

585def determine_event_status(active_period): 

586 now = datetime.now(timezone.utc) 

587 active_period_start = getmember(active_period, 'dtstart') 

588 if active_period_start.tzinfo is None: 

589 active_period_start = active_period_start.astimezone(timezone.utc) 

590 setmember(active_period, 'dtstart', active_period_start) 

591 active_period_end = active_period_start + getmember(active_period, 'duration') 

592 if now >= active_period_end: 

593 return 'completed' 

594 if now >= active_period_start: 

595 return 'active' 

596 if getmember(active_period, 'ramp_up_period', missing=None) is not None: 

597 ramp_up_start = active_period_start - getmember(active_period, 'ramp_up_period') 

598 if now >= ramp_up_start: 

599 return 'near' 

600 return 'far' 

601 

602 

603def hasmember(obj, member): 

604 """ 

605 Check if a dict or dataclass has the given member 

606 """ 

607 if is_dataclass(obj): 

608 if hasattr(obj, member): 

609 return True 

610 else: 

611 if member in obj: 

612 return True 

613 return False 

614 

615 

616def getmember(obj, member, missing='_RAISE_'): 

617 """ 

618 Get a member from a dict or dataclass. Nesting is possible. 

619 """ 

620 def getmember_inner(obj, member, missing='_RAISE_'): 

621 if is_dataclass(obj): 

622 if not missing == '_RAISE_' and not hasattr(obj, member): 

623 return missing 

624 else: 

625 return getattr(obj, member) 

626 else: 

627 if missing == '_RAISE_': 

628 return obj[member] 

629 else: 

630 return obj.get(member, missing) 

631 

632 for m in member.split("."): 

633 obj = getmember_inner(obj, m, missing=missing) 

634 return obj 

635 

636 

637def setmember(obj, member, value): 

638 """ 

639 Set a member of a dict of dataclass 

640 """ 

641 if '.' in member: 

642 members = member.split('.') 

643 obj = getmember(obj, ".".join(members[:-1])) 

644 member = members[-1] 

645 

646 if is_dataclass(obj): 

647 setattr(obj, member, value) 

648 else: 

649 obj[member] = value 

650 

651 

652def validate_report_request_tuples(list_of_report_requests, mode='full'): 

653 if len(list_of_report_requests) == 0: 

654 return 

655 for report_requests in list_of_report_requests: 

656 if report_requests is None: 

657 continue 

658 for i, rrq in enumerate(report_requests): 

659 if rrq is None: 

660 continue 

661 

662 # Check if it is a tuple 

663 elif not isinstance(rrq, tuple): 

664 report_requests[i] = None 

665 if mode == 'full': 

666 logger.error("Your on_register_report handler did not return a list of tuples. " 

667 f"The first item from the list was '{rrq}' ({rrq.__class__.__name__}).") 

668 else: 

669 logger.error("Your on_register_report handler did not return a tuple. " 

670 f"It returned '{rrq}'. Please see the documentation for the correct format.") 

671 

672 # Check if it has the correct length 

673 elif not len(rrq) in (3, 4): 

674 report_requests[i] = None 

675 if mode == 'full': 

676 logger.error("Your on_register_report handler returned tuples of the wrong length. " 

677 f"It should be 3 or 4. It returned: '{rrq}'.") 

678 else: 

679 logger.error("Your on_register_report handler returned a tuple of the wrong length. " 

680 f"It should be 2 or 3. It returned: '{rrq[1:]}'.") 

681 

682 # Check if the first element is callable 

683 elif not callable(rrq[1]): 

684 report_requests[i] = None 

685 if mode == 'full': 

686 logger.error(f"Your on_register_report handler did not return the correct tuple. " 

687 "It should return a list of (r_id, callback, sampling_interval) or " 

688 "(r_id, callback, sampling_interval, reporting_interval) tuples, where " 

689 "the r_id is a string, callback is a callable function or coroutine, and " 

690 "sampling_interval and reporting_interval are of type datetime.timedelta. " 

691 f"It returned: '{rrq}'. The second element was not callable.") 

692 else: 

693 logger.error(f"Your on_register_report handler did not return the correct tuple. " 

694 "It should return a (callback, sampling_interval) or " 

695 "(callback, sampling_interval, reporting_interval) tuple, where " 

696 "the callback is a callable function or coroutine, and " 

697 "sampling_interval and reporting_interval are of type datetime.timedelta. " 

698 f"It returned: '{rrq[1:]}'. The first element was not callable.") 

699 

700 # Check if the second element is a timedelta 

701 elif not isinstance(rrq[2], timedelta): 

702 report_requests[i] = None 

703 if mode == 'full': 

704 logger.error(f"Your on_register_report handler did not return the correct tuple. " 

705 "It should return a list of (r_id, callback, sampling_interval) or " 

706 "(r_id, callback, sampling_interval, reporting_interval) tuples, where " 

707 "sampling_interval and reporting_interval are of type datetime.timedelta. " 

708 f"It returned: '{rrq}'. The third element was not of type timedelta.") 

709 else: 

710 logger.error(f"Your on_register_report handler did not return the correct tuple. " 

711 "It should return a (callback, sampling_interval) or " 

712 "(callback, sampling_interval, reporting_interval) tuple, where " 

713 "sampling_interval and reporting_interval are of type datetime.timedelta. " 

714 f"It returned: '{rrq[1:]}'. The second element was not of type timedelta.") 

715 

716 # Check if the third element is a timedelta (if it exists) 

717 elif len(rrq) == 4 and not isinstance(rrq[3], timedelta): 

718 report_requests[i] = None 

719 if mode == 'full': 

720 logger.error(f"Your on_register_report handler did not return the correct tuple. " 

721 "It should return a list of (r_id, callback, sampling_interval) or " 

722 "(r_id, callback, sampling_interval, reporting_interval) tuples, where " 

723 "sampling_interval and reporting_interval are of type datetime.timedelta. " 

724 f"It returned: '{rrq}'. The fourth element was not of type timedelta.") 

725 else: 

726 logger.error(f"Your on_register_report handler did not return the correct tuple. " 

727 "It should return a (callback, sampling_interval) or " 

728 "(callback, sampling_interval, reporting_interval) tuple, where " 

729 "sampling_interval and reporting_interval are of type datetime.timedelta. " 

730 f"It returned: '{rrq[1:]}'. The third element was not of type timedelta.") 

731 

732 

733async def await_if_required(result): 

734 if asyncio.iscoroutine(result): 

735 result = await result 

736 return result 

737 

738 

739async def gather_if_required(results): 

740 if results is None: 

741 return results 

742 if len(results) > 0: 

743 if not any([asyncio.iscoroutine(r) for r in results]): 

744 results = results 

745 elif all([asyncio.iscoroutine(r) for r in results]): 

746 results = await asyncio.gather(*results) 

747 else: 

748 results = [await await_if_required(result) for result in results] 

749 return results 

750 

751 

752def order_events(events, limit=None, offset=None): 

753 """ 

754 Order the events according to the OpenADR rules: 

755 - active events before inactive events 

756 - high priority before low priority 

757 - earlier before later 

758 """ 

759 def event_priority(event): 

760 # The default and lowest priority is 0, which we should interpret as a high value. 

761 priority = getmember(event, 'event_descriptor.priority', missing=float('inf')) 

762 if priority == 0: 

763 priority = float('inf') 

764 return priority 

765 

766 if events is None: 

767 return None 

768 if isinstance(events, objects.Event): 

769 events = [events] 

770 elif isinstance(events, dict): 

771 events = [events] 

772 

773 # Update the event statuses 

774 for event in events: 

775 if getmember(event, 'event_descriptor.event_status') != enums.EVENT_STATUS.CANCELLED: 

776 event_status = determine_event_status(getmember(event, 'active_period')) 

777 setmember(event, 'event_descriptor.event_status', event_status) 

778 

779 # Short circuit if we only have one event: 

780 if len(events) == 1: 

781 return events 

782 

783 # Get all the active events first 

784 active_events = [event for event in events 

785 if getmember(event, 'event_descriptor.event_status') == 'active'] 

786 other_events = [event for event in events 

787 if getmember(event, 'event_descriptor.event_status') != 'active'] 

788 

789 # Sort the active events by priority 

790 active_events.sort(key=lambda e: event_priority(e)) 

791 

792 # Sort the active events by start date 

793 active_events.sort(key=lambda e: getmember(e, 'active_period.dtstart')) 

794 

795 # Sort the non-active events by their start date 

796 other_events.sort(key=lambda e: getmember(e, 'active_period.dtstart')) 

797 

798 ordered_events = active_events + other_events 

799 if limit and offset: 

800 return ordered_events[offset:offset+limit] 

801 return ordered_events 

802 

803 

804def increment_event_modification_number(event): 

805 """ 

806 Increments the modification number of the event by 1 and returns the new modification number. 

807 """ 

808 modification_number = getmember(event, 'event_descriptor.modification_number') + 1 

809 setmember(event, 'event_descriptor.modification_number', modification_number) 

810 return modification_number