Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: Apache-2.0
3# Copyright 2020 Contributors to OpenLEADR
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9# http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17from aiohttp import web
18from openleadr.service import EventService, PollService, RegistrationService, ReportService, \
19 VTNService
20from openleadr.messaging import create_message
21from openleadr import objects, enums, utils
22from functools import partial
23from datetime import datetime, timedelta, timezone
24import asyncio
25import inspect
26import logging
27import ssl
28import re
29logger = logging.getLogger('openleadr')
32class OpenADRServer:
33 _MAP = {'on_created_event': 'event_service',
34 'on_request_event': 'event_service',
36 'on_register_report': 'report_service',
37 'on_create_report': 'report_service',
38 'on_created_report': 'report_service',
39 'on_request_report': 'report_service',
40 'on_update_report': 'report_service',
42 'on_poll': 'poll_service',
44 'on_query_registration': 'registration_service',
45 'on_create_party_registration': 'registration_service',
46 'on_cancel_party_registration': 'registration_service'}
48 def __init__(self, vtn_id, cert=None, key=None, passphrase=None, fingerprint_lookup=None,
49 show_fingerprint=True, http_port=8080, http_host='127.0.0.1', http_cert=None,
50 http_key=None, http_key_passphrase=None, http_path_prefix='/OpenADR2/Simple/2.0b',
51 requested_poll_freq=timedelta(seconds=10), http_ca_file=None, ven_lookup=None):
52 """
53 Create a new OpenADR VTN (Server).
55 :param str vtn_id: An identifier string for this VTN. This is how you identify yourself
56 to the VENs that talk to you.
57 :param str cert: Path to the PEM-formatted certificate file that is used to sign outgoing
58 messages
59 :param str key: Path to the PEM-formatted private key file that is used to sign outgoing
60 messages
61 :param str passphrase: The passphrase used to decrypt the private key file
62 :param callable fingerprint_lookup: A callable that receives a ven_id and should return the
63 registered fingerprint for that VEN. You should receive
64 these fingerprints outside of OpenADR and configure them
65 manually.
66 :param bool show_fingerprint: Whether to print the fingerprint to your stdout on startup.
67 Defaults to True.
68 :param int http_port: The port that the web server is exposed on (default: 8080)
69 :param str http_host: The host or IP address to bind the server to (default: 127.0.0.1).
70 :param str http_cert: The path to the PEM certificate for securing HTTP traffic.
71 :param str http_key: The path to the PEM private key for securing HTTP traffic.
72 :param str http_ca_file: The path to the CA-file that client certificates are checked against.
73 :param str http_key_passphrase: The passphrase for the HTTP private key.
74 :param ven_lookup: A callback that takes a ven_id and returns a dict containing the
75 ven_id, ven_name, fingerprint and registration_id.
76 """
77 # Set up the message queues
79 self.app = web.Application()
80 self.services = {}
81 self.services['event_service'] = EventService(vtn_id)
82 self.services['report_service'] = ReportService(vtn_id)
83 self.services['poll_service'] = PollService(vtn_id)
84 self.services['registration_service'] = RegistrationService(vtn_id, poll_freq=requested_poll_freq)
86 # Register the other services with the poll service
87 self.services['poll_service'].event_service = self.services['event_service']
88 self.services['poll_service'].report_service = self.services['report_service']
90 # Set up the HTTP handlers for the services
91 if http_path_prefix[-1] == "/":
92 http_path_prefix = http_path_prefix[:-1]
93 self.app.add_routes([web.post(f"{http_path_prefix}/{s.__service_name__}", s.handler)
94 for s in self.services.values()])
96 # Add a reference to the openadr VTN to the aiohttp 'app'
97 self.app['server'] = self
99 # Configure the web server
100 self.http_port = http_port
101 self.http_host = http_host
102 self.http_path_prefix = http_path_prefix
104 # Create SSL context for running the server
105 if http_cert and http_key:
106 self.ssl_context = ssl.create_default_context(cafile=http_ca_file,
107 purpose=ssl.Purpose.CLIENT_AUTH)
108 self.ssl_context.verify_mode = ssl.CERT_REQUIRED
109 self.ssl_context.load_cert_chain(http_cert, http_key, http_key_passphrase)
110 else:
111 self.ssl_context = None
113 # Configure message signing
114 if cert and key:
115 with open(cert, "rb") as file:
116 cert = file.read()
117 with open(key, "rb") as file:
118 key = file.read()
119 if show_fingerprint:
120 print("")
121 print("*" * 80)
122 print("Your VTN Certificate Fingerprint is "
123 f"{utils.certificate_fingerprint(cert)}".center(80))
124 print("Please deliver this fingerprint to the VENs that connect to you.".center(80))
125 print("You do not need to keep this a secret.".center(80))
126 print("*" * 80)
127 print("")
128 VTNService._create_message = partial(create_message, cert=cert, key=key,
129 passphrase=passphrase)
130 if fingerprint_lookup is not None:
131 logger.warning("DeprecationWarning: the argument 'fingerprint_lookup' is deprecated and "
132 "is replaced by 'ven_lookup'. 'fingerprint_lookup' will be removed in a "
133 "future version of OpenLEADR. Please see "
134 "https://openleadr.org/docs/server.html#things-you-should-implement.")
135 VTNService.fingerprint_lookup = staticmethod(fingerprint_lookup)
136 if ven_lookup is None:
137 logger.warning("If you provide a 'ven_lookup' to your OpenADRServer() init, OpenLEADR can "
138 "automatically issue ReregistrationRequests for VENs that don't exist in "
139 "your system. Please see https://openleadr.org/docs/server.html#things-you-should-implement.")
140 else:
141 VTNService.ven_lookup = staticmethod(ven_lookup)
142 self.__setattr__ = self.add_handler
144 async def run(self):
145 """
146 Starts the server in an already-running asyncio loop.
147 """
148 self.app_runner = web.AppRunner(self.app)
149 await self.app_runner.setup()
150 site = web.TCPSite(self.app_runner,
151 port=self.http_port,
152 host=self.http_host,
153 ssl_context=self.ssl_context)
154 await site.start()
155 protocol = 'https' if self.ssl_context else 'http'
156 print("")
157 print("*" * 80)
158 print("Your VTN Server is now running at ".center(80))
159 print(f"{protocol}://{self.http_host}:{self.http_port}{self.http_path_prefix}".center(80))
160 print("*" * 80)
161 print("")
163 async def run_async(self):
164 await self.run()
166 async def stop(self):
167 """
168 Stop the server in a graceful manner.
169 """
170 await self.app_runner.cleanup()
172 def add_event(self, ven_id, signal_name, signal_type, intervals, callback=None, event_id=None,
173 targets=None, targets_by_type=None, target=None, response_required='always',
174 market_context="oadr://unknown.context", notification_period=None,
175 ramp_up_period=None, recovery_period=None, signal_target_mrid=None):
176 """
177 Convenience method to add an event with a single signal.
179 :param str ven_id: The ven_id to whom this event must be delivered.
180 :param str signal_name: The OpenADR name of the signal; one of openleadr.objects.SIGNAL_NAME
181 :param str signal_type: The OpenADR type of the signal; one of openleadr.objects.SIGNAL_TYPE
182 :param str intervals: A list of intervals with a dtstart, duration and payload member.
183 :param str callback: A callback function for when your event has been accepted (optIn) or refused (optOut).
184 :param list targets: A list of Targets that this Event applies to.
185 :param target: A single target for this event.
186 :param dict targets_by_type: A dict of targets, grouped by type.
187 :param str market_context: A URI for the DR program that this event belongs to.
188 :param timedelta notification_period: The Notification period for the Event's Active Period.
189 :param timedelta ramp_up_period: The Ramp Up period for the Event's Active Period.
190 :param timedelta recovery_period: The Recovery period for the Event's Active Period.
192 If you don't provide a target using any of the three arguments, the target will be set to the given ven_id.
193 """
194 if self.services['event_service'].polling_method == 'external':
195 logger.error("You cannot use the add_event method after you assign your own on_poll "
196 "handler. If you use your own on_poll handler, you are responsible for "
197 "delivering events from that handler. If you want to use OpenLEADRs "
198 "message queuing system, you should not assign an on_poll handler. "
199 "Your Event will NOT be added.")
200 return
201 if not re.match(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?", market_context):
202 raise ValueError("The Market Context must be a valid URI.")
203 event_id = event_id or utils.generate_id()
205 if response_required not in ('always', 'never'):
206 raise ValueError("'response_required' should be either 'always' or 'never'; "
207 f"you provided '{response_required}'.")
209 # Figure out the target for this Event
210 if target is None and targets is None and targets_by_type is None:
211 targets = [{'ven_id': ven_id}]
212 elif target is not None:
213 targets = [target]
214 elif targets_by_type is not None:
215 targets = utils.ungroup_targets_by_type(targets_by_type)
216 if not isinstance(targets, list):
217 targets = [targets]
218 if signal_type not in enums.SIGNAL_TYPE.values:
219 raise ValueError(f"""The signal_type must be one of '{"', '".join(enums.SIGNAL_TYPE.values)}', """
220 f"""you specified: '{signal_type}'.""")
221 if signal_name not in enums.SIGNAL_NAME.values and not signal_name.startswith('x-'):
222 raise ValueError(f"""The signal_name must be one of '{"', '".join(enums.SIGNAL_TYPE.values)}', """
223 f"""or it must begin with 'x-'. You specified: '{signal_name}'""")
224 if not intervals or not isinstance(intervals, (list, tuple)) or len(intervals) == 0:
225 raise ValueError(f"The intervals must be a list of intervals, you specified: {intervals}")
227 event_descriptor = objects.EventDescriptor(event_id=event_id,
228 modification_number=0,
229 market_context=market_context,
230 event_status="far",
231 created_date_time=datetime.now(timezone.utc))
232 event_signal = objects.EventSignal(intervals=intervals,
233 signal_name=signal_name,
234 signal_type=signal_type,
235 signal_id=utils.generate_id())
237 # Make sure the intervals carry timezone-aware timestamps
238 for interval in intervals:
239 if utils.getmember(interval, 'dtstart').tzinfo is None:
240 utils.setmember(interval, 'dtstart',
241 utils.getmember(interval, 'dtstart').astimezone(timezone.utc))
242 logger.warning("You supplied a naive datetime object to your interval's dtstart. "
243 "This will be interpreted as a timestamp in your local timezone "
244 "and then converted to UTC before sending. Please supply timezone-"
245 "aware timestamps like datetime.datetime.new(timezone.utc) or "
246 "datetime.datetime(..., tzinfo=datetime.timezone.utc)")
247 active_period = utils.get_active_period_from_intervals(intervals, False)
248 active_period.ramp_up_period = ramp_up_period
249 active_period.notification_period = notification_period
250 active_period.recovery_period = recovery_period
251 event = objects.Event(active_period=active_period,
252 event_descriptor=event_descriptor,
253 event_signals=[event_signal],
254 targets=targets,
255 response_required=response_required)
256 self.add_raw_event(ven_id=ven_id, event=event, callback=callback)
257 return event_id
259 def add_raw_event(self, ven_id, event, callback=None):
260 """
261 Add a new event to the queue for a specific VEN.
262 :param str ven_id: The ven_id to which this event should be distributed.
263 :param dict event: The event (as a dict or as a objects.Event instance)
264 that contains the event details.
265 :param callable callback: A callback that will receive the opt status for this event.
266 This callback receives ven_id, event_id, opt_type as its arguments.
267 """
268 if utils.getmember(event, 'response_required') == 'always':
269 if callback is None:
270 logger.warning("You did not provide a 'callback', which means you won't know if the "
271 "VEN will opt in or opt out of your event. You should consider adding "
272 "a callback for this.")
273 elif not asyncio.isfuture(callback):
274 args = inspect.signature(callback).parameters
275 if not all(['ven_id' in args, 'event_id' in args, 'opt_type' in args]):
276 raise ValueError("The 'callback' must have at least the following parameters: "
277 "'ven_id' (str), 'event_id' (str), 'opt_type' (str). Please fix "
278 "your 'callback' handler.")
280 event_id = utils.getmember(event, 'event_descriptor.event_id')
281 # Create the event queue if it does not exist yet
282 if ven_id not in self.events:
283 self.events[ven_id] = []
285 # Add event to the queue
286 self.events[ven_id].append(event)
287 self.events_updated[ven_id] = True
289 # Add the callback for the response to this event
290 if callback is not None:
291 self.event_callbacks[event_id] = (event, callback)
292 return event_id
294 def cancel_event(self, ven_id, event_id):
295 """
296 Mark the indicated event as cancelled.
297 """
298 event = utils.find_by(self.events[ven_id], 'event_descriptor.event_id', event_id)
299 if not event:
300 logger.error("""The event you tried to cancel was not found. """
301 """Was looking for event_id {event_id} for ven {ven_id}."""
302 """Only found these: [getmember(e, 'event_descriptor.event_id')
303 for e in self.events[ven_id]]""")
304 return
306 # Set the Event Status to cancelled
307 utils.setmember(event, 'event_descriptor.event_status', enums.EVENT_STATUS.CANCELLED)
308 utils.increment_event_modification_number(event)
309 self.events_updated[ven_id] = True
311 def add_handler(self, name, func):
312 """
313 Add a handler to the OpenADRServer.
315 :param str name: The name for this handler. Should be one of: on_created_event,
316 on_request_event, on_register_report, on_create_report,
317 on_created_report, on_request_report, on_update_report, on_poll,
318 on_query_registration, on_create_party_registration,
319 on_cancel_party_registration.
320 :param callable func: A function or coroutine that handles this type of occurrence.
321 It receives the message, and should return the contents of a response.
322 """
323 logger.debug(f"Adding handler: {name} {func}")
324 if name in self._MAP:
325 setattr(self.services[self._MAP[name]], name, func)
326 if name == 'on_poll':
327 self.services['poll_service'].polling_method = 'external'
328 self.services['event_service'].polling_method = 'external'
329 else:
330 raise NameError(f"""Unknown handler '{name}'. """
331 f"""Correct handler names are: '{"', '".join(self._MAP.keys())}'.""")
333 @property
334 def registered_reports(self):
335 return self.services['report_service'].registered_reports
337 @property
338 def events(self):
339 return self.services['event_service'].events
341 @property
342 def events_updated(self):
343 return self.services['poll_service'].events_updated
345 @property
346 def event_callbacks(self):
347 return self.services['event_service'].event_callbacks