Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-License-Identifier: Apache-2.0 

2 

3# Copyright 2020 Contributors to OpenLEADR 

4 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8 

9# http://www.apache.org/licenses/LICENSE-2.0 

10 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17from aiohttp import web 

18from openleadr.service import EventService, PollService, RegistrationService, ReportService, \ 

19 VTNService 

20from openleadr.messaging import create_message 

21from openleadr import objects, enums, utils 

22from functools import partial 

23from datetime import datetime, timedelta, timezone 

24import asyncio 

25import inspect 

26import logging 

27import ssl 

28import re 

29logger = logging.getLogger('openleadr') 

30 

31 

32class OpenADRServer: 

33 _MAP = {'on_created_event': 'event_service', 

34 'on_request_event': 'event_service', 

35 

36 'on_register_report': 'report_service', 

37 'on_create_report': 'report_service', 

38 'on_created_report': 'report_service', 

39 'on_request_report': 'report_service', 

40 'on_update_report': 'report_service', 

41 

42 'on_poll': 'poll_service', 

43 

44 'on_query_registration': 'registration_service', 

45 'on_create_party_registration': 'registration_service', 

46 'on_cancel_party_registration': 'registration_service'} 

47 

48 def __init__(self, vtn_id, cert=None, key=None, passphrase=None, fingerprint_lookup=None, 

49 show_fingerprint=True, http_port=8080, http_host='127.0.0.1', http_cert=None, 

50 http_key=None, http_key_passphrase=None, http_path_prefix='/OpenADR2/Simple/2.0b', 

51 requested_poll_freq=timedelta(seconds=10), http_ca_file=None, ven_lookup=None): 

52 """ 

53 Create a new OpenADR VTN (Server). 

54 

55 :param str vtn_id: An identifier string for this VTN. This is how you identify yourself 

56 to the VENs that talk to you. 

57 :param str cert: Path to the PEM-formatted certificate file that is used to sign outgoing 

58 messages 

59 :param str key: Path to the PEM-formatted private key file that is used to sign outgoing 

60 messages 

61 :param str passphrase: The passphrase used to decrypt the private key file 

62 :param callable fingerprint_lookup: A callable that receives a ven_id and should return the 

63 registered fingerprint for that VEN. You should receive 

64 these fingerprints outside of OpenADR and configure them 

65 manually. 

66 :param bool show_fingerprint: Whether to print the fingerprint to your stdout on startup. 

67 Defaults to True. 

68 :param int http_port: The port that the web server is exposed on (default: 8080) 

69 :param str http_host: The host or IP address to bind the server to (default: 127.0.0.1). 

70 :param str http_cert: The path to the PEM certificate for securing HTTP traffic. 

71 :param str http_key: The path to the PEM private key for securing HTTP traffic. 

72 :param str http_ca_file: The path to the CA-file that client certificates are checked against. 

73 :param str http_key_passphrase: The passphrase for the HTTP private key. 

74 :param ven_lookup: A callback that takes a ven_id and returns a dict containing the 

75 ven_id, ven_name, fingerprint and registration_id. 

76 """ 

77 # Set up the message queues 

78 

79 self.app = web.Application() 

80 self.services = {} 

81 self.services['event_service'] = EventService(vtn_id) 

82 self.services['report_service'] = ReportService(vtn_id) 

83 self.services['poll_service'] = PollService(vtn_id) 

84 self.services['registration_service'] = RegistrationService(vtn_id, poll_freq=requested_poll_freq) 

85 

86 # Register the other services with the poll service 

87 self.services['poll_service'].event_service = self.services['event_service'] 

88 self.services['poll_service'].report_service = self.services['report_service'] 

89 

90 # Set up the HTTP handlers for the services 

91 if http_path_prefix[-1] == "/": 

92 http_path_prefix = http_path_prefix[:-1] 

93 self.app.add_routes([web.post(f"{http_path_prefix}/{s.__service_name__}", s.handler) 

94 for s in self.services.values()]) 

95 

96 # Add a reference to the openadr VTN to the aiohttp 'app' 

97 self.app['server'] = self 

98 

99 # Configure the web server 

100 self.http_port = http_port 

101 self.http_host = http_host 

102 self.http_path_prefix = http_path_prefix 

103 

104 # Create SSL context for running the server 

105 if http_cert and http_key: 

106 self.ssl_context = ssl.create_default_context(cafile=http_ca_file, 

107 purpose=ssl.Purpose.CLIENT_AUTH) 

108 self.ssl_context.verify_mode = ssl.CERT_REQUIRED 

109 self.ssl_context.load_cert_chain(http_cert, http_key, http_key_passphrase) 

110 else: 

111 self.ssl_context = None 

112 

113 # Configure message signing 

114 if cert and key: 

115 with open(cert, "rb") as file: 

116 cert = file.read() 

117 with open(key, "rb") as file: 

118 key = file.read() 

119 if show_fingerprint: 

120 print("") 

121 print("*" * 80) 

122 print("Your VTN Certificate Fingerprint is " 

123 f"{utils.certificate_fingerprint(cert)}".center(80)) 

124 print("Please deliver this fingerprint to the VENs that connect to you.".center(80)) 

125 print("You do not need to keep this a secret.".center(80)) 

126 print("*" * 80) 

127 print("") 

128 VTNService._create_message = partial(create_message, cert=cert, key=key, 

129 passphrase=passphrase) 

130 if fingerprint_lookup is not None: 

131 logger.warning("DeprecationWarning: the argument 'fingerprint_lookup' is deprecated and " 

132 "is replaced by 'ven_lookup'. 'fingerprint_lookup' will be removed in a " 

133 "future version of OpenLEADR. Please see " 

134 "https://openleadr.org/docs/server.html#things-you-should-implement.") 

135 VTNService.fingerprint_lookup = staticmethod(fingerprint_lookup) 

136 if ven_lookup is None: 

137 logger.warning("If you provide a 'ven_lookup' to your OpenADRServer() init, OpenLEADR can " 

138 "automatically issue ReregistrationRequests for VENs that don't exist in " 

139 "your system. Please see https://openleadr.org/docs/server.html#things-you-should-implement.") 

140 else: 

141 VTNService.ven_lookup = staticmethod(ven_lookup) 

142 self.__setattr__ = self.add_handler 

143 

144 async def run(self): 

145 """ 

146 Starts the server in an already-running asyncio loop. 

147 """ 

148 self.app_runner = web.AppRunner(self.app) 

149 await self.app_runner.setup() 

150 site = web.TCPSite(self.app_runner, 

151 port=self.http_port, 

152 host=self.http_host, 

153 ssl_context=self.ssl_context) 

154 await site.start() 

155 protocol = 'https' if self.ssl_context else 'http' 

156 print("") 

157 print("*" * 80) 

158 print("Your VTN Server is now running at ".center(80)) 

159 print(f"{protocol}://{self.http_host}:{self.http_port}{self.http_path_prefix}".center(80)) 

160 print("*" * 80) 

161 print("") 

162 

163 async def run_async(self): 

164 await self.run() 

165 

166 async def stop(self): 

167 """ 

168 Stop the server in a graceful manner. 

169 """ 

170 await self.app_runner.cleanup() 

171 

172 def add_event(self, ven_id, signal_name, signal_type, intervals, callback=None, event_id=None, 

173 targets=None, targets_by_type=None, target=None, response_required='always', 

174 market_context="oadr://unknown.context", notification_period=None, 

175 ramp_up_period=None, recovery_period=None, signal_target_mrid=None): 

176 """ 

177 Convenience method to add an event with a single signal. 

178 

179 :param str ven_id: The ven_id to whom this event must be delivered. 

180 :param str signal_name: The OpenADR name of the signal; one of openleadr.objects.SIGNAL_NAME 

181 :param str signal_type: The OpenADR type of the signal; one of openleadr.objects.SIGNAL_TYPE 

182 :param str intervals: A list of intervals with a dtstart, duration and payload member. 

183 :param str callback: A callback function for when your event has been accepted (optIn) or refused (optOut). 

184 :param list targets: A list of Targets that this Event applies to. 

185 :param target: A single target for this event. 

186 :param dict targets_by_type: A dict of targets, grouped by type. 

187 :param str market_context: A URI for the DR program that this event belongs to. 

188 :param timedelta notification_period: The Notification period for the Event's Active Period. 

189 :param timedelta ramp_up_period: The Ramp Up period for the Event's Active Period. 

190 :param timedelta recovery_period: The Recovery period for the Event's Active Period. 

191 

192 If you don't provide a target using any of the three arguments, the target will be set to the given ven_id. 

193 """ 

194 if self.services['event_service'].polling_method == 'external': 

195 logger.error("You cannot use the add_event method after you assign your own on_poll " 

196 "handler. If you use your own on_poll handler, you are responsible for " 

197 "delivering events from that handler. If you want to use OpenLEADRs " 

198 "message queuing system, you should not assign an on_poll handler. " 

199 "Your Event will NOT be added.") 

200 return 

201 if not re.match(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?", market_context): 

202 raise ValueError("The Market Context must be a valid URI.") 

203 event_id = event_id or utils.generate_id() 

204 

205 if response_required not in ('always', 'never'): 

206 raise ValueError("'response_required' should be either 'always' or 'never'; " 

207 f"you provided '{response_required}'.") 

208 

209 # Figure out the target for this Event 

210 if target is None and targets is None and targets_by_type is None: 

211 targets = [{'ven_id': ven_id}] 

212 elif target is not None: 

213 targets = [target] 

214 elif targets_by_type is not None: 

215 targets = utils.ungroup_targets_by_type(targets_by_type) 

216 if not isinstance(targets, list): 

217 targets = [targets] 

218 if signal_type not in enums.SIGNAL_TYPE.values: 

219 raise ValueError(f"""The signal_type must be one of '{"', '".join(enums.SIGNAL_TYPE.values)}', """ 

220 f"""you specified: '{signal_type}'.""") 

221 if signal_name not in enums.SIGNAL_NAME.values and not signal_name.startswith('x-'): 

222 raise ValueError(f"""The signal_name must be one of '{"', '".join(enums.SIGNAL_TYPE.values)}', """ 

223 f"""or it must begin with 'x-'. You specified: '{signal_name}'""") 

224 if not intervals or not isinstance(intervals, (list, tuple)) or len(intervals) == 0: 

225 raise ValueError(f"The intervals must be a list of intervals, you specified: {intervals}") 

226 

227 event_descriptor = objects.EventDescriptor(event_id=event_id, 

228 modification_number=0, 

229 market_context=market_context, 

230 event_status="far", 

231 created_date_time=datetime.now(timezone.utc)) 

232 event_signal = objects.EventSignal(intervals=intervals, 

233 signal_name=signal_name, 

234 signal_type=signal_type, 

235 signal_id=utils.generate_id()) 

236 

237 # Make sure the intervals carry timezone-aware timestamps 

238 for interval in intervals: 

239 if utils.getmember(interval, 'dtstart').tzinfo is None: 

240 utils.setmember(interval, 'dtstart', 

241 utils.getmember(interval, 'dtstart').astimezone(timezone.utc)) 

242 logger.warning("You supplied a naive datetime object to your interval's dtstart. " 

243 "This will be interpreted as a timestamp in your local timezone " 

244 "and then converted to UTC before sending. Please supply timezone-" 

245 "aware timestamps like datetime.datetime.new(timezone.utc) or " 

246 "datetime.datetime(..., tzinfo=datetime.timezone.utc)") 

247 active_period = utils.get_active_period_from_intervals(intervals, False) 

248 active_period.ramp_up_period = ramp_up_period 

249 active_period.notification_period = notification_period 

250 active_period.recovery_period = recovery_period 

251 event = objects.Event(active_period=active_period, 

252 event_descriptor=event_descriptor, 

253 event_signals=[event_signal], 

254 targets=targets, 

255 response_required=response_required) 

256 self.add_raw_event(ven_id=ven_id, event=event, callback=callback) 

257 return event_id 

258 

259 def add_raw_event(self, ven_id, event, callback=None): 

260 """ 

261 Add a new event to the queue for a specific VEN. 

262 :param str ven_id: The ven_id to which this event should be distributed. 

263 :param dict event: The event (as a dict or as a objects.Event instance) 

264 that contains the event details. 

265 :param callable callback: A callback that will receive the opt status for this event. 

266 This callback receives ven_id, event_id, opt_type as its arguments. 

267 """ 

268 if utils.getmember(event, 'response_required') == 'always': 

269 if callback is None: 

270 logger.warning("You did not provide a 'callback', which means you won't know if the " 

271 "VEN will opt in or opt out of your event. You should consider adding " 

272 "a callback for this.") 

273 elif not asyncio.isfuture(callback): 

274 args = inspect.signature(callback).parameters 

275 if not all(['ven_id' in args, 'event_id' in args, 'opt_type' in args]): 

276 raise ValueError("The 'callback' must have at least the following parameters: " 

277 "'ven_id' (str), 'event_id' (str), 'opt_type' (str). Please fix " 

278 "your 'callback' handler.") 

279 

280 event_id = utils.getmember(event, 'event_descriptor.event_id') 

281 # Create the event queue if it does not exist yet 

282 if ven_id not in self.events: 

283 self.events[ven_id] = [] 

284 

285 # Add event to the queue 

286 self.events[ven_id].append(event) 

287 self.events_updated[ven_id] = True 

288 

289 # Add the callback for the response to this event 

290 if callback is not None: 

291 self.event_callbacks[event_id] = (event, callback) 

292 return event_id 

293 

294 def cancel_event(self, ven_id, event_id): 

295 """ 

296 Mark the indicated event as cancelled. 

297 """ 

298 event = utils.find_by(self.events[ven_id], 'event_descriptor.event_id', event_id) 

299 if not event: 

300 logger.error("""The event you tried to cancel was not found. """ 

301 """Was looking for event_id {event_id} for ven {ven_id}.""" 

302 """Only found these: [getmember(e, 'event_descriptor.event_id') 

303 for e in self.events[ven_id]]""") 

304 return 

305 

306 # Set the Event Status to cancelled 

307 utils.setmember(event, 'event_descriptor.event_status', enums.EVENT_STATUS.CANCELLED) 

308 utils.increment_event_modification_number(event) 

309 self.events_updated[ven_id] = True 

310 

311 def add_handler(self, name, func): 

312 """ 

313 Add a handler to the OpenADRServer. 

314 

315 :param str name: The name for this handler. Should be one of: on_created_event, 

316 on_request_event, on_register_report, on_create_report, 

317 on_created_report, on_request_report, on_update_report, on_poll, 

318 on_query_registration, on_create_party_registration, 

319 on_cancel_party_registration. 

320 :param callable func: A function or coroutine that handles this type of occurrence. 

321 It receives the message, and should return the contents of a response. 

322 """ 

323 logger.debug(f"Adding handler: {name} {func}") 

324 if name in self._MAP: 

325 setattr(self.services[self._MAP[name]], name, func) 

326 if name == 'on_poll': 

327 self.services['poll_service'].polling_method = 'external' 

328 self.services['event_service'].polling_method = 'external' 

329 else: 

330 raise NameError(f"""Unknown handler '{name}'. """ 

331 f"""Correct handler names are: '{"', '".join(self._MAP.keys())}'.""") 

332 

333 @property 

334 def registered_reports(self): 

335 return self.services['report_service'].registered_reports 

336 

337 @property 

338 def events(self): 

339 return self.services['event_service'].events 

340 

341 @property 

342 def events_updated(self): 

343 return self.services['poll_service'].events_updated 

344 

345 @property 

346 def event_callbacks(self): 

347 return self.services['event_service'].event_callbacks