Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-License-Identifier: Apache-2.0 

2 

3# Copyright 2020 Contributors to OpenLEADR 

4 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8 

9# http://www.apache.org/licenses/LICENSE-2.0 

10 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17import asyncio 

18import inspect 

19import logging 

20import ssl 

21from datetime import datetime, timedelta, timezone 

22from functools import partial 

23from http import HTTPStatus 

24 

25import aiohttp 

26from lxml.etree import XMLSyntaxError 

27from signxml.exceptions import InvalidSignature 

28from apscheduler.schedulers.asyncio import AsyncIOScheduler 

29from openleadr import enums, objects, errors 

30from openleadr.messaging import create_message, parse_message, \ 

31 validate_xml_schema, validate_xml_signature 

32from openleadr import utils 

33 

34logger = logging.getLogger('openleadr') 

35 

36 

37class OpenADRClient: 

38 """ 

39 Main client class. Most of these methods will be called automatically, but 

40 you can always choose to call them manually. 

41 """ 

42 def __init__(self, ven_name, vtn_url, debug=False, cert=None, key=None, 

43 passphrase=None, vtn_fingerprint=None, show_fingerprint=True, ca_file=None, 

44 allow_jitter=True, ven_id=None): 

45 """ 

46 Initializes a new OpenADR Client (Virtual End Node) 

47 

48 :param str ven_name: The name for this VEN 

49 :param str vtn_url: The URL of the VTN (Server) to connect to 

50 :param bool debug: Whether or not to print debugging messages 

51 :param str cert: The path to a PEM-formatted Certificate file to use 

52 for signing messages. 

53 :param str key: The path to a PEM-formatted Private Key file to use 

54 for signing messages. 

55 :param str fingerprint: The fingerprint for the VTN's certificate to 

56 verify incomnig messages 

57 :param str show_fingerprint: Whether to print your own fingerprint 

58 on startup. Defaults to True. 

59 :param str ca_file: The path to the PEM-formatted CA file for validating the VTN server's 

60 certificate. 

61 :param str ven_id: The ID for this VEN. If you leave this blank, 

62 a VEN_ID will be assigned by the VTN. 

63 """ 

64 

65 self.ven_name = ven_name 

66 if vtn_url.endswith("/"): 

67 vtn_url = vtn_url[:-1] 

68 self.vtn_url = vtn_url 

69 self.ven_id = ven_id 

70 self.registration_id = None 

71 self.poll_frequency = None 

72 self.vtn_fingerprint = vtn_fingerprint 

73 self.debug = debug 

74 

75 self.reports = [] 

76 self.report_callbacks = {} # Holds the callbacks for each specific report 

77 self.report_requests = [] # Keep track of the report requests from the VTN 

78 self.incomplete_reports = {} # Holds reports that are being populated over time 

79 self.pending_reports = asyncio.Queue() # Holds reports that are waiting to be sent 

80 self.scheduler = AsyncIOScheduler() 

81 self.client_session = None 

82 self.report_queue_task = None 

83 

84 self.received_events = [] # Holds the events that we received. 

85 self.responded_events = {} # Holds the events that we already saw. 

86 

87 self.cert_path = cert 

88 self.key_path = key 

89 self.passphrase = passphrase 

90 self.ca_file = ca_file 

91 self.allow_jitter = allow_jitter 

92 

93 if cert and key: 

94 with open(cert, 'rb') as file: 

95 cert = file.read() 

96 with open(key, 'rb') as file: 

97 key = file.read() 

98 if show_fingerprint: 

99 print("") 

100 print("*" * 80) 

101 print("Your VEN Certificate Fingerprint is ".center(80)) 

102 print(f"{utils.certificate_fingerprint(cert).center(80)}".center(80)) 

103 print("Please deliver this fingerprint to the VTN.".center(80)) 

104 print("You do not need to keep this a secret.".center(80)) 

105 print("*" * 80) 

106 print("") 

107 

108 self._create_message = partial(create_message, 

109 cert=cert, 

110 key=key, 

111 passphrase=passphrase) 

112 

113 async def run(self): 

114 """ 

115 Run the client in full-auto mode. 

116 """ 

117 # if not hasattr(self, 'on_event'): 

118 # raise NotImplementedError("You must implement on_event.") 

119 self.loop = asyncio.get_event_loop() 

120 await self.create_party_registration(ven_id=self.ven_id) 

121 

122 if not self.ven_id: 

123 logger.error("No VEN ID received from the VTN, aborting.") 

124 await self.stop() 

125 return 

126 

127 if self.reports: 

128 await self.register_reports(self.reports) 

129 self.report_queue_task = self.loop.create_task(self._report_queue_worker()) 

130 

131 await self._poll() 

132 

133 # Set up automatic polling 

134 if self.poll_frequency > timedelta(hours=24): 

135 logger.warning("Polling with intervals of more than 24 hours is not supported. " 

136 "Will use 24 hours as the polling interval.") 

137 self.poll_frequency = timedelta(hours=24) 

138 cron_config = utils.cron_config(self.poll_frequency, randomize_seconds=self.allow_jitter) 

139 

140 self.scheduler.add_job(self._poll, 

141 trigger='cron', 

142 **cron_config) 

143 self.scheduler.add_job(self._event_cleanup, 

144 trigger='interval', 

145 seconds=300) 

146 self.scheduler.start() 

147 

148 async def stop(self): 

149 """ 

150 Cleanly stops the client. Run this coroutine before closing your event loop. 

151 """ 

152 if self.scheduler.running: 

153 self.scheduler.shutdown() 

154 if self.report_queue_task: 

155 self.report_queue_task.cancel() 

156 await self.client_session.close() 

157 await asyncio.sleep(0) 

158 

159 def add_handler(self, handler, callback): 

160 """ 

161 Add a callback for the given situation 

162 """ 

163 if handler not in ('on_event', 'on_update_event'): 

164 logger.error("'handler' must be either on_event or on_update_event") 

165 return 

166 

167 setattr(self, handler, callback) 

168 

169 def add_report(self, callback, resource_id, measurement=None, 

170 data_collection_mode='incremental', 

171 report_specifier_id=None, r_id=None, 

172 report_name=enums.REPORT_NAME.TELEMETRY_USAGE, 

173 reading_type=enums.READING_TYPE.DIRECT_READ, 

174 report_type=enums.REPORT_TYPE.READING, sampling_rate=None, data_source=None, 

175 scale="none", unit=None, power_ac=True, power_hertz=50, power_voltage=230, 

176 market_context=None, end_device_asset_mrid=None, report_data_source=None): 

177 """ 

178 Add a new reporting capability to the client. 

179 

180 :param callable callback: A callback or coroutine that will fetch the value for a specific 

181 report. This callback will be passed the report_id and the r_id 

182 of the requested value. 

183 :param str resource_id: A specific name for this resource within this report. 

184 :param str measurement: The quantity that is being measured (openleadr.enums.MEASUREMENTS). 

185 Optional for TELEMETRY_STATUS reports. 

186 :param str data_collection_mode: Whether you want the data to be collected incrementally 

187 or at once. If the VTN requests the sampling interval to be 

188 higher than the reporting interval, this setting determines 

189 if the callback should be called at the sampling rate (with 

190 no args, assuming it returns the current value), or at the 

191 reporting interval (with date_from and date_to as keyword 

192 arguments). Choose 'incremental' for the former case, or 

193 'full' for the latter case. 

194 :param str report_specifier_id: A unique identifier for this report. Leave this blank for a 

195 random generated id, or fill it in if your VTN depends on 

196 this being a known value, or if it needs to be constant 

197 between restarts of the client. 

198 :param str r_id: A unique identifier for a datapoint in a report. The same remarks apply as 

199 for the report_specifier_id. 

200 :param str report_name: An OpenADR name for this report (one of openleadr.enums.REPORT_NAME) 

201 :param str reading_type: An OpenADR reading type (found in openleadr.enums.READING_TYPE) 

202 :param str report_type: An OpenADR report type (found in openleadr.enums.REPORT_TYPE) 

203 :param datetime.timedelta sampling_rate: The sampling rate for the measurement. 

204 :param str unit: The unit for this measurement. 

205 :param boolean power_ac: Whether the power is AC (True) or DC (False). 

206 Only required when supplying a power-related measurement. 

207 :param int power_hertz: Grid frequency of the power. 

208 Only required when supplying a power-related measurement. 

209 :param int power_voltage: Voltage of the power. 

210 Only required when supplying a power-related measurement. 

211 :param str market_context: The Market Context that this report belongs to. 

212 :param str end_device_asset_mrid: the Meter ID for the end device that is measured by this report. 

213 :param report_data_source: A (list of) target(s) that this report is related to. 

214 """ 

215 

216 # Verify input 

217 if report_name not in enums.REPORT_NAME.values and not report_name.startswith('x-'): 

218 raise ValueError(f"{report_name} is not a valid report_name. Valid options are " 

219 f"{', '.join(enums.REPORT_NAME.values)}", 

220 " or any name starting with 'x-'.") 

221 if reading_type not in enums.READING_TYPE.values and not reading_type.startswith('x-'): 

222 raise ValueError(f"{reading_type} is not a valid reading_type. Valid options are " 

223 f"{', '.join(enums.READING_TYPE.values)}" 

224 " or any name starting with 'x-'.") 

225 if report_type not in enums.REPORT_TYPE.values and not report_type.startswith('x-'): 

226 raise ValueError(f"{report_type} is not a valid report_type. Valid options are " 

227 f"{', '.join(enums.REPORT_TYPE.values)}" 

228 " or any name starting with 'x-'.") 

229 if scale not in enums.SI_SCALE_CODE.values: 

230 raise ValueError(f"{scale} is not a valid scale. Valid options are " 

231 f"{', '.join(enums.SI_SCALE_CODE.values)}") 

232 

233 if sampling_rate is None: 

234 sampling_rate = objects.SamplingRate(min_period=timedelta(seconds=10), 

235 max_period=timedelta(hours=24), 

236 on_change=False) 

237 elif isinstance(sampling_rate, timedelta): 

238 sampling_rate = objects.SamplingRate(min_period=sampling_rate, 

239 max_period=sampling_rate, 

240 on_change=False) 

241 

242 if data_collection_mode not in ('incremental', 'full'): 

243 raise ValueError("The data_collection_mode should be 'incremental' or 'full'.") 

244 

245 if data_collection_mode == 'full': 

246 args = inspect.signature(callback).parameters 

247 if not ('date_from' in args and 'date_to' in args and 'sampling_interval' in args): 

248 raise TypeError("Your callback function must accept the 'date_from', 'date_to' " 

249 "and 'sampling_interval' arguments if used " 

250 "with data_collection_mode 'full'.") 

251 

252 # Determine the correct item name, item description and unit 

253 if report_name == 'TELEMETRY_STATUS': 

254 item_base = None 

255 elif isinstance(measurement, objects.Measurement): 

256 item_base = measurement 

257 elif isinstance(measurement, dict): 

258 utils.validate_report_measurement_dict(measurement) 

259 power_attributes = object.PowerAttributes(**measurement.get('power_attributes')) or None 

260 item_base = objects.Measurement(name=measurement['name'], 

261 description=measurement['description'], 

262 unit=measurement['unit'], 

263 scale=measurement.get('scale'), 

264 power_attributes=power_attributes) 

265 elif measurement.upper() in enums.MEASUREMENTS.members: 

266 item_base = enums.MEASUREMENTS[measurement.upper()] 

267 else: 

268 item_base = objects.Measurement(name='customUnit', 

269 description=measurement, 

270 unit=unit, 

271 scale=scale) 

272 

273 if report_name != 'TELEMETRY_STATUS' and scale is not None: 

274 if item_base.scale is not None: 

275 if scale in enums.SI_SCALE_CODE.values: 

276 item_base.scale = scale 

277 else: 

278 raise ValueError("The 'scale' argument must be one of '{'. ',join(enums.SI_SCALE_CODE.values)}") 

279 

280 # Check if unit is compatible 

281 if unit is not None and unit != item_base.unit and unit not in item_base.acceptable_units: 

282 logger.warning(f"The supplied unit {unit} for measurement {measurement} " 

283 f"will be ignored, {item_base.unit} will be used instead. " 

284 f"Allowed units for this measurement are: " 

285 f"{', '.join(item_base.acceptable_units)}") 

286 

287 # Get or create the relevant Report 

288 if report_specifier_id: 

289 report = utils.find_by(self.reports, 

290 'report_name', report_name, 

291 'report_specifier_id', report_specifier_id) 

292 else: 

293 report = utils.find_by(self.reports, 'report_name', report_name) 

294 

295 if not report: 

296 report_specifier_id = report_specifier_id or utils.generate_id() 

297 report = objects.Report(created_date_time=datetime.now(), 

298 report_name=report_name, 

299 report_specifier_id=report_specifier_id, 

300 data_collection_mode=data_collection_mode) 

301 self.reports.append(report) 

302 

303 # Add the new report description to the report 

304 target = objects.Target(resource_id=resource_id) 

305 r_id = utils.generate_id() 

306 report_description = objects.ReportDescription(r_id=r_id, 

307 reading_type=reading_type, 

308 report_data_source=target, 

309 report_subject=target, 

310 report_type=report_type, 

311 sampling_rate=sampling_rate, 

312 measurement=item_base, 

313 market_context=market_context) 

314 self.report_callbacks[(report.report_specifier_id, r_id)] = callback 

315 report.report_descriptions.append(report_description) 

316 return report_specifier_id, r_id 

317 

318 ########################################################################### 

319 # # 

320 # POLLING METHODS # 

321 # # 

322 ########################################################################### 

323 

324 async def poll(self): 

325 """ 

326 Request the next available message from the Server. This coroutine is called automatically. 

327 """ 

328 service = 'OadrPoll' 

329 message = self._create_message('oadrPoll', ven_id=self.ven_id) 

330 response_type, response_payload = await self._perform_request(service, message) 

331 return response_type, response_payload 

332 

333 ########################################################################### 

334 # # 

335 # REGISTRATION METHODS # 

336 # # 

337 ########################################################################### 

338 

339 async def query_registration(self): 

340 """ 

341 Request information about the VTN. 

342 """ 

343 request_id = utils.generate_id() 

344 service = 'EiRegisterParty' 

345 message = self._create_message('oadrQueryRegistration', request_id=request_id) 

346 response_type, response_payload = await self._perform_request(service, message) 

347 return response_type, response_payload 

348 

349 async def create_party_registration(self, http_pull_model=True, xml_signature=False, 

350 report_only=False, profile_name='2.0b', 

351 transport_name='simpleHttp', transport_address=None, 

352 ven_id=None): 

353 """ 

354 Take the neccessary steps to register this client with the server. 

355 

356 :param bool http_pull_model: Whether to use the 'pull' model for HTTP. 

357 :param bool xml_signature: Whether to sign each XML message. 

358 :param bool report_only: Whether or not this is a reporting-only client 

359 which does not deal with Events. 

360 :param str profile_name: Which OpenADR profile to use. 

361 :param str transport_name: The transport name to use. Either 'simpleHttp' or 'xmpp'. 

362 :param str transport_address: Which public-facing address the server should use 

363 to communicate. 

364 :param str ven_id: The ID for this VEN. If you leave this blank, 

365 a VEN_ID will be assigned by the VTN. 

366 """ 

367 request_id = utils.generate_id() 

368 service = 'EiRegisterParty' 

369 payload = {'ven_name': self.ven_name, 

370 'http_pull_model': http_pull_model, 

371 'xml_signature': xml_signature, 

372 'report_only': report_only, 

373 'profile_name': profile_name, 

374 'transport_name': transport_name, 

375 'transport_address': transport_address} 

376 if ven_id: 

377 payload['ven_id'] = ven_id 

378 message = self._create_message('oadrCreatePartyRegistration', 

379 request_id=request_id, 

380 **payload) 

381 response_type, response_payload = await self._perform_request(service, message) 

382 if response_type is None: 

383 return 

384 if response_payload['response']['response_code'] != 200: 

385 status_code = response_payload['response']['response_code'] 

386 status_description = response_payload['response']['response_description'] 

387 logger.error(f"Got error on Create Party Registration: " 

388 f"{status_code} {status_description}") 

389 return 

390 self.ven_id = response_payload['ven_id'] 

391 self.registration_id = response_payload['registration_id'] 

392 self.poll_frequency = response_payload.get('requested_oadr_poll_freq', 

393 timedelta(seconds=10)) 

394 logger.info(f"VEN is now registered with ID {self.ven_id}") 

395 logger.info(f"The polling frequency is {self.poll_frequency}") 

396 return response_type, response_payload 

397 

398 async def cancel_party_registration(self): 

399 raise NotImplementedError("Cancel Registration is not yet implemented") 

400 

401 ########################################################################### 

402 # # 

403 # EVENT METHODS # 

404 # # 

405 ########################################################################### 

406 

407 async def request_event(self, reply_limit=None): 

408 """ 

409 Request the next Event from the VTN, if it has any. 

410 """ 

411 payload = {'request_id': utils.generate_id(), 

412 'ven_id': self.ven_id, 

413 'reply_limit': reply_limit} 

414 message = self._create_message('oadrRequestEvent', **payload) 

415 service = 'EiEvent' 

416 response_type, response_payload = await self._perform_request(service, message) 

417 return response_type, response_payload 

418 

419 async def created_event(self, request_id, event_id, opt_type, modification_number=0): 

420 """ 

421 Inform the VTN that we created an event. 

422 """ 

423 service = 'EiEvent' 

424 payload = {'ven_id': self.ven_id, 

425 'response': {'response_code': 200, 

426 'response_description': 'OK', 

427 'request_id': request_id}, 

428 'event_responses': [{'response_code': 200, 

429 'response_description': 'OK', 

430 'request_id': request_id, 

431 'event_id': event_id, 

432 'modification_number': modification_number, 

433 'opt_type': opt_type}]} 

434 message = self._create_message('oadrCreatedEvent', **payload) 

435 response_type, response_payload = await self._perform_request(service, message) 

436 

437 ########################################################################### 

438 # # 

439 # REPORTING METHODS # 

440 # # 

441 ########################################################################### 

442 

443 async def register_reports(self, reports): 

444 """ 

445 Tell the VTN about our reports. The VTN miht respond with an 

446 oadrCreateReport message that tells us which reports are to be sent. 

447 """ 

448 request_id = utils.generate_id() 

449 payload = {'request_id': request_id, 

450 'ven_id': self.ven_id, 

451 'reports': reports, 

452 'report_request_id': 0} 

453 

454 service = 'EiReport' 

455 message = self._create_message('oadrRegisterReport', **payload) 

456 response_type, response_payload = await self._perform_request(service, message) 

457 

458 # Handle the subscriptions that the VTN is interested in. 

459 if 'report_requests' in response_payload: 

460 for report_request in response_payload['report_requests']: 

461 await self.create_report(report_request) 

462 

463 message_type = 'oadrCreatedReport' 

464 message_payload = {} 

465 

466 return message_type, message_payload 

467 

468 async def create_report(self, report_request): 

469 """ 

470 Add the requested reports to the reporting mechanism. 

471 This is called when the VTN requests reports from us. 

472 

473 :param report_request dict: The oadrReportRequest dict from the VTN. 

474 """ 

475 # Get the relevant variables from the report requests 

476 report_request_id = report_request['report_request_id'] 

477 report_specifier_id = report_request['report_specifier']['report_specifier_id'] 

478 report_back_duration = report_request['report_specifier'].get('report_back_duration') 

479 granularity = report_request['report_specifier']['granularity'] 

480 

481 # Check if this report actually exists 

482 report = utils.find_by(self.reports, 'report_specifier_id', report_specifier_id) 

483 if not report: 

484 logger.error(f"A non-existant report with report_specifier_id " 

485 f"{report_specifier_id} was requested.") 

486 return False 

487 

488 # Check and collect the requested r_ids for this report 

489 requested_r_ids = [] 

490 for specifier_payload in report_request['report_specifier']['specifier_payloads']: 

491 r_id = specifier_payload['r_id'] 

492 # Check if the requested r_id actually exists 

493 rd = utils.find_by(report.report_descriptions, 'r_id', r_id) 

494 if not rd: 

495 logger.error(f"A non-existant report with r_id {r_id} " 

496 f"inside report with report_specifier_id {report_specifier_id} " 

497 f"was requested.") 

498 continue 

499 

500 # Check if the requested measurement exists and if the correct unit is requested 

501 if 'measurement' in specifier_payload: 

502 measurement = specifier_payload['measurement'] 

503 if measurement['description'] != rd.measurement.description: 

504 logger.error(f"A non-matching measurement description for report with " 

505 f"report_request_id {report_request_id} and r_id {r_id} was given " 

506 f"by the VTN. Offered: {rd.measurement.description}, " 

507 f"requested: {measurement['description']}") 

508 continue 

509 if measurement['unit'] != rd.measurement.unit: 

510 logger.error(f"A non-matching measurement unit for report with " 

511 f"report_request_id {report_request_id} and r_id {r_id} was given " 

512 f"by the VTN. Offered: {rd.measurement.unit}, " 

513 f"requested: {measurement['unit']}") 

514 continue 

515 

516 if granularity is not None: 

517 if not rd.sampling_rate.min_period <= granularity <= rd.sampling_rate.max_period: 

518 logger.error(f"An invalid sampling rate {granularity} was requested for report " 

519 f"with report_specifier_id {report_specifier_id} and r_id {r_id}. " 

520 f"The offered sampling rate was between " 

521 f"{rd.sampling_rate.min_period} and " 

522 f"{rd.sampling_rate.max_period}") 

523 continue 

524 else: 

525 # If no granularity is specified, set it to the lowest sampling rate. 

526 granularity = rd.sampling_rate.max_period 

527 

528 requested_r_ids.append(r_id) 

529 

530 callback = partial(self.update_report, report_request_id=report_request_id) 

531 

532 reporting_interval = report_back_duration or granularity 

533 job = self.scheduler.add_job(func=callback, 

534 trigger='cron', 

535 **utils.cron_config(reporting_interval)) 

536 

537 self.report_requests.append({'report_request_id': report_request_id, 

538 'report_specifier_id': report_specifier_id, 

539 'report_back_duration': report_back_duration, 

540 'r_ids': requested_r_ids, 

541 'granularity': granularity, 

542 'job': job}) 

543 

544 async def create_single_report(self, report_request): 

545 """ 

546 Create a single report in response to a request from the VTN. 

547 """ 

548 

549 async def update_report(self, report_request_id): 

550 """ 

551 Call the previously registered report callback and send the result as a message to the VTN. 

552 """ 

553 logger.debug(f"Running update_report for {report_request_id}") 

554 report_request = utils.find_by(self.report_requests, 'report_request_id', report_request_id) 

555 granularity = report_request['granularity'] 

556 report_back_duration = report_request['report_back_duration'] 

557 report_specifier_id = report_request['report_specifier_id'] 

558 report = utils.find_by(self.reports, 'report_specifier_id', report_specifier_id) 

559 data_collection_mode = report.data_collection_mode 

560 

561 if report_request_id in self.incomplete_reports: 

562 logger.debug("We were already compiling this report") 

563 outgoing_report = self.incomplete_reports[report_request_id] 

564 else: 

565 logger.debug("There is no report in progress") 

566 outgoing_report = objects.Report(report_request_id=report_request_id, 

567 report_specifier_id=report.report_specifier_id, 

568 report_name=report.report_name, 

569 intervals=[]) 

570 

571 intervals = outgoing_report.intervals or [] 

572 if data_collection_mode == 'full': 

573 if report_back_duration is None: 

574 report_back_duration = granularity 

575 date_to = datetime.now(timezone.utc) 

576 date_from = date_to - max(report_back_duration, granularity) 

577 for r_id in report_request['r_ids']: 

578 report_callback = self.report_callbacks[(report_specifier_id, r_id)] 

579 result = report_callback(date_from=date_from, 

580 date_to=date_to, 

581 sampling_interval=granularity) 

582 if asyncio.iscoroutine(result): 

583 result = await result 

584 for dt, value in result: 

585 report_payload = objects.ReportPayload(r_id=r_id, value=value) 

586 intervals.append(objects.ReportInterval(dtstart=dt, 

587 report_payload=report_payload)) 

588 

589 else: 

590 for r_id in report_request['r_ids']: 

591 report_callback = self.report_callbacks[(report_specifier_id, r_id)] 

592 result = report_callback() 

593 if asyncio.iscoroutine(result): 

594 result = await result 

595 if isinstance(result, (int, float)): 

596 result = [(datetime.now(timezone.utc), result)] 

597 for dt, value in result: 

598 logger.info(f"Adding {dt}, {value} to report") 

599 report_payload = objects.ReportPayload(r_id=r_id, value=value) 

600 intervals.append(objects.ReportInterval(dtstart=dt, 

601 report_payload=report_payload)) 

602 outgoing_report.intervals = intervals 

603 logger.info(f"The number of intervals in the report is now {len(outgoing_report.intervals)}") 

604 

605 # Figure out if the report is complete after this sampling 

606 if data_collection_mode == 'incremental' and report_back_duration is not None\ 

607 and report_back_duration > granularity: 

608 report_interval = report_back_duration.total_seconds() 

609 sampling_interval = granularity.total_seconds() 

610 expected_len = len(report_request['r_ids']) * int(report_interval / sampling_interval) 

611 if len(outgoing_report.intervals) == expected_len: 

612 logger.info("The report is now complete with all the values. Will queue for sending.") 

613 await self.pending_reports.put(self.incomplete_reports.pop(report_request_id)) 

614 else: 

615 logger.debug("The report is not yet complete, will hold until it is.") 

616 self.incomplete_reports[report_request_id] = outgoing_report 

617 else: 

618 logger.info("Report will be sent now.") 

619 await self.pending_reports.put(outgoing_report) 

620 

621 async def cancel_report(self, payload): 

622 """ 

623 Cancel this report. 

624 """ 

625 

626 async def _report_queue_worker(self): 

627 """ 

628 A Queue worker that pushes out the pending reports. 

629 """ 

630 try: 

631 while True: 

632 report = await self.pending_reports.get() 

633 service = 'EiReport' 

634 message = self._create_message('oadrUpdateReport', 

635 ven_id=self.ven_id, 

636 request_id=utils.generate_id(), 

637 reports=[report]) 

638 try: 

639 response_type, response_payload = await self._perform_request(service, message) 

640 except Exception as err: 

641 logger.error(f"Unable to send the report to the VTN. Error: {err}") 

642 else: 

643 if 'cancel_report' in response_payload: 

644 await self.cancel_report(response_payload['cancel_report']) 

645 except asyncio.CancelledError: 

646 return 

647 

648 ########################################################################### 

649 # # 

650 # PLACEHOLDER # 

651 # # 

652 ########################################################################### 

653 

654 async def on_event(self, event): 

655 """ 

656 Placeholder for the on_event handler. 

657 """ 

658 logger.warning("You should implement your own on_event handler. This handler receives " 

659 "an Event dict and should return either 'optIn' or 'optOut' based on your " 

660 "choice. Will opt out of the event for now.") 

661 return 'optOut' 

662 

663 async def on_update_event(self, event): 

664 """ 

665 Placeholder for the on_update_event handler. 

666 """ 

667 logger.warning("An Event was updated, but you don't have an on_updated_event handler configured. " 

668 "You should implement your own on_update_event handler. This handler receives " 

669 "an Event dict and should return either 'optIn' or 'optOut' based on your " 

670 "choice. Will re-use the previous opt status for this event_id for now") 

671 if event['event_descriptor']['event_id'] in self.events: 

672 return self.responded_events['event_id'] 

673 

674 async def on_register_report(self, report): 

675 """ 

676 Placeholder for the on_register_report handler. 

677 """ 

678 

679 ########################################################################### 

680 # # 

681 # EMPTY RESPONSES # 

682 # # 

683 ########################################################################### 

684 

685 async def send_response(self, service, response_code=200, response_description="OK", request_id=None): 

686 """ 

687 Send an empty oadrResponse, for instance after receiving oadrRequestReregistration. 

688 """ 

689 msg = self._create_message('oadrResponse', 

690 response={'response_code': response_code, 

691 'response_description': response_description, 

692 'request_id': request_id}) 

693 await self._perform_request(service, msg) 

694 

695 ########################################################################### 

696 # # 

697 # LOW LEVEL # 

698 # # 

699 ########################################################################### 

700 

701 async def _perform_request(self, service, message): 

702 await self._ensure_client_session() 

703 logger.debug(f"Client is sending {message}") 

704 url = f"{self.vtn_url}/{service}" 

705 try: 

706 async with self.client_session.post(url, data=message) as req: 

707 content = await req.read() 

708 if req.status != HTTPStatus.OK: 

709 logger.warning(f"Non-OK status {req.status} when performing a request to {url} " 

710 f"with data {message}: {req.status} {content.decode('utf-8')}") 

711 return None, {} 

712 logger.debug(content.decode('utf-8')) 

713 except aiohttp.client_exceptions.ClientConnectorError as err: 

714 # Could not connect to server 

715 logger.error(f"Could not connect to server with URL {self.vtn_url}:") 

716 logger.error(f"{err.__class__.__name__}: {str(err)}") 

717 return None, {} 

718 except Exception as err: 

719 logger.error(f"Request error {err.__class__.__name__}:{err}") 

720 return None, {} 

721 if len(content) == 0: 

722 return None 

723 try: 

724 tree = validate_xml_schema(content) 

725 if self.vtn_fingerprint: 

726 validate_xml_signature(tree) 

727 message_type, message_payload = parse_message(content) 

728 except XMLSyntaxError as err: 

729 logger.warning(f"Incoming message did not pass XML schema validation: {err}") 

730 return None, {} 

731 except errors.FingerprintMismatch as err: 

732 logger.warning(err) 

733 return None, {} 

734 except InvalidSignature: 

735 logger.warning("Incoming message had invalid signature, ignoring.") 

736 return None, {} 

737 except Exception as err: 

738 logger.error(f"The incoming message could not be parsed or validated: {err}") 

739 return None, {} 

740 if 'response' in message_payload and 'response_code' in message_payload['response']: 

741 if message_payload['response']['response_code'] != 200: 

742 logger.warning("We got a non-OK OpenADR response from the server: " 

743 f"{message_payload['response']['response_code']}: " 

744 f"{message_payload['response']['response_description']}") 

745 return message_type, message_payload 

746 

747 async def _on_event(self, message): 

748 logger.debug("The VEN received an event") 

749 events = message['events'] 

750 try: 

751 results = [] 

752 for event in message['events']: 

753 event_id = event['event_descriptor']['event_id'] 

754 event_status = event['event_descriptor']['event_status'] 

755 modification_number = event['event_descriptor']['modification_number'] 

756 received_event = utils.find_by(self.received_events, 'event_descriptor.event_id', event_id) 

757 if received_event: 

758 if received_event['event_descriptor']['modification_number'] == modification_number: 

759 # Re-submit the same opt type as we already had previously 

760 result = self.responded_events[event_id] 

761 else: 

762 # Replace the event with the fresh copy 

763 utils.pop_by(self.received_events, 'event_descriptor.event_id', event_id) 

764 self.received_events.append(event) 

765 # Wait for the result of the on_update_event handler 

766 result = await utils.await_if_required(self.on_update_event(event)) 

767 else: 

768 # Wait for the result of the on_event 

769 self.received_events.append(event) 

770 result = self.on_event(event) 

771 if asyncio.iscoroutine(result): 

772 result = await result 

773 results.append(result) 

774 if event_status in (enums.EVENT_STATUS.COMPLETED, enums.EVENT_STATUS.CANCELLED): 

775 self.responded_events.pop(event_id) 

776 else: 

777 self.responded_events[event_id] = result 

778 for i, result in enumerate(results): 

779 if result not in ('optIn', 'optOut') and events[i]['response_required'] == 'always': 

780 logger.error("Your on_event or on_update_event handler must return 'optIn' or 'optOut'; " 

781 f"you supplied {result}. Please fix your on_event handler.") 

782 results[i] = 'optOut' 

783 except Exception as err: 

784 logger.error("Your on_event handler encountered an error. Will Opt Out of the event. " 

785 f"The error was {err.__class__.__name__}: {str(err)}") 

786 results = ['optOut'] * len(events) 

787 

788 event_responses = [{'response_code': 200, 

789 'response_description': 'OK', 

790 'opt_type': results[i], 

791 'request_id': message['request_id'], 

792 'modification_number': modification_number, 

793 'event_id': events[i]['event_descriptor']['event_id']} 

794 for i, event in enumerate(events) 

795 if event['response_required'] == 'always' 

796 and not utils.determine_event_status(event['active_period']) == 'completed'] 

797 

798 if len(event_responses) > 0: 

799 response = {'response_code': 200, 

800 'response_description': 'OK', 

801 'request_id': message['request_id']} 

802 message = self._create_message('oadrCreatedEvent', 

803 response=response, 

804 event_responses=event_responses, 

805 ven_id=self.ven_id) 

806 service = 'EiEvent' 

807 response_type, response_payload = await self._perform_request(service, message) 

808 logger.info(response_type, response_payload) 

809 else: 

810 logger.info("Not sending any event responses, because a response was not required/allowed by the VTN.") 

811 

812 async def _event_cleanup(self): 

813 """ 

814 Periodic task that will clean up completed and cancelled events in our memory. 

815 """ 

816 for event in self.received_events: 

817 if event['event_descriptor']['event_status'] == 'cancelled' or \ 

818 utils.determine_event_status(event['active_period']) == 'completed': 

819 logger.info(f"Removing event {event} because it is no longer relevant.") 

820 self.received_events.pop(self.received_events.index(event)) 

821 

822 async def _poll(self): 

823 logger.debug("Now polling for new messages") 

824 response_type, response_payload = await self.poll() 

825 if response_type is None: 

826 return 

827 

828 elif response_type == 'oadrResponse': 

829 logger.debug("Received empty response from the VTN.") 

830 return 

831 

832 elif response_type == 'oadrRequestReregistration': 

833 logger.info("The VTN required us to re-register. Calling the registration procedure.") 

834 await self.send_response(service='EiRegisterParty') 

835 await self.create_party_registration() 

836 if self.reports: 

837 await self.register_reports(self.reports) 

838 

839 elif response_type == 'oadrDistributeEvent': 

840 if 'events' in response_payload and len(response_payload['events']) > 0: 

841 await self._on_event(response_payload) 

842 

843 elif response_type == 'oadrUpdateReport': 

844 await self._on_report(response_payload) 

845 

846 elif response_type == 'oadrCreateReport': 

847 if 'report_requests' in response_payload: 

848 for report_request in response_payload['report_requests']: 

849 await self.create_report(report_request) 

850 

851 elif response_type == 'oadrRegisterReport': 

852 if 'reports' in response_payload and len(response_payload['reports']) > 0: 

853 for report in response_payload['reports']: 

854 await self.register_report(report) 

855 

856 else: 

857 logger.warning(f"No handler implemented for incoming message " 

858 f"of type {response_type}, ignoring.") 

859 

860 # Immediately poll again, because there might be more messages 

861 await self._poll() 

862 

863 async def _ensure_client_session(self): 

864 if not self.client_session: 

865 headers = {'content-type': 'application/xml'} 

866 if self.cert_path: 

867 ssl_context = ssl.create_default_context(cafile=self.ca_file, 

868 purpose=ssl.Purpose.CLIENT_AUTH) 

869 ssl_context.load_cert_chain(self.cert_path, self.key_path, self.passphrase) 

870 ssl_context.check_hostname = False 

871 connector = aiohttp.TCPConnector(ssl=ssl_context) 

872 self.client_session = aiohttp.ClientSession(connector=connector, headers=headers) 

873 else: 

874 self.client_session = aiohttp.ClientSession(headers=headers)