Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: Apache-2.0
3# Copyright 2020 Contributors to OpenLEADR
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9# http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17from . import service, handler, VTNService
18import asyncio
19from openleadr import utils, errors, enums
20import logging
21logger = logging.getLogger('openleadr')
24@service('EiEvent')
25class EventService(VTNService):
27 def __init__(self, vtn_id, polling_method='internal'):
28 super().__init__(vtn_id)
29 self.polling_method = polling_method
30 self.events = {}
31 self.completed_event_ids = {} # Holds the ids of completed events
32 self.event_callbacks = {}
33 self.event_opt_types = {}
35 @handler('oadrRequestEvent')
36 async def request_event(self, payload):
37 """
38 The VEN requests us to send any events we have.
39 """
40 ven_id = payload['ven_id']
41 if self.polling_method == 'internal':
42 if ven_id in self.events and self.events[ven_id]:
43 events = utils.order_events(self.events[ven_id])
44 for event in events:
45 event_status = utils.getmember(event, 'event_descriptor.event_status')
46 # Pop the event from the events so that this is the last time it is communicated
47 if event_status == enums.EVENT_STATUS.COMPLETED:
48 self.events[ven_id].pop(self.events[ven_id].index(event))
49 else:
50 events = None
51 else:
52 result = self.on_request_event(ven_id=payload['ven_id'])
53 if asyncio.iscoroutine(result):
54 result = await result
55 if result is None:
56 events = None
57 else:
58 events = utils.order_events(result)
60 if events is None:
61 return 'oadrResponse', {}
62 else:
63 return 'oadrDistributeEvent', {'events': events}
64 return 'oadrResponse', result
66 def on_request_event(self, ven_id):
67 """
68 Placeholder for the on_request_event handler.
69 """
70 logger.warning("You should implement and register your own on_request_event handler "
71 "that returns the next event for a VEN. This handler will receive a "
72 "ven_id as its only argument, and should return None (if no events are "
73 "available), a single Event, or a list of Events.")
74 return None
76 @handler('oadrCreatedEvent')
77 async def created_event(self, payload):
78 """
79 The VEN informs us that they created an EiEvent.
80 """
81 ven_id = payload['ven_id']
82 if self.polling_method == 'internal':
83 for event_response in payload['event_responses']:
84 event_id = event_response['event_id']
85 modification_number = event_response['modification_number']
86 opt_type = event_response['opt_type']
87 event = utils.find_by(self.events[ven_id],
88 'event_descriptor.event_id', event_id,
89 'event_descriptor.modification_number', modification_number)
90 if not event:
91 if event_id not in self.completed_event_ids.get(ven_id, []):
92 logger.warning(f"""Got an oadrCreatedEvent message from ven '{ven_id}' """
93 f"""for event '{event_id}' with modification number """
94 f"""{modification_number} that does not exist.""")
95 raise errors.InvalidIdError
96 # Remove the event from the events list if the cancellation is confirmed.
97 if utils.getmember(event, 'event_descriptor.event_status') == enums.EVENT_STATUS.CANCELLED:
98 utils.pop_by(self.events[ven_id], 'event_descriptor.event_id', event_id)
99 if event_response['event_id'] in self.event_callbacks:
100 event, callback = self.event_callbacks.pop(event_id)
101 if isinstance(callback, asyncio.Future):
102 if callback.done():
103 logger.warning(f"Got a second response '{opt_type}' from ven '{ven_id}' "
104 f"to event '{event_id}', which we cannot use because the "
105 "callback future you provided was already completed during "
106 "the first response.")
107 else:
108 callback.set_result(opt_type)
109 else:
110 result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
111 if asyncio.iscoroutine(result):
112 result = await result
113 else:
114 result = self.on_created_event(ven_id=ven_id, event_id=event_id, opt_type=opt_type)
115 if asyncio.iscoroutine(result):
116 result = await(result)
117 return 'oadrResponse', {}
119 def on_created_event(self, ven_id, event_id, opt_type):
120 """
121 Placeholder for the on_created_event handler.
122 """
123 logger.warning("You should implement and register you own on_created_event handler "
124 "to receive the opt status for an Event that you sent to the VEN. This "
125 "handler will receive a ven_id, event_id and opt_status. "
126 "You don't need to return anything from this handler.")
127 return None