Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-License-Identifier: Apache-2.0 

2 

3# Copyright 2020 Contributors to OpenLEADR 

4 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8 

9# http://www.apache.org/licenses/LICENSE-2.0 

10 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17from . import service, handler, VTNService 

18import asyncio 

19from openleadr import utils, errors, enums 

20import logging 

21logger = logging.getLogger('openleadr') 

22 

23 

24@service('EiEvent') 

25class EventService(VTNService): 

26 

27 def __init__(self, vtn_id, polling_method='internal'): 

28 super().__init__(vtn_id) 

29 self.polling_method = polling_method 

30 self.events = {} 

31 self.completed_event_ids = {} # Holds the ids of completed events 

32 self.event_callbacks = {} 

33 self.event_opt_types = {} 

34 

35 @handler('oadrRequestEvent') 

36 async def request_event(self, payload): 

37 """ 

38 The VEN requests us to send any events we have. 

39 """ 

40 ven_id = payload['ven_id'] 

41 if self.polling_method == 'internal': 

42 if ven_id in self.events and self.events[ven_id]: 

43 events = utils.order_events(self.events[ven_id]) 

44 for event in events: 

45 event_status = utils.getmember(event, 'event_descriptor.event_status') 

46 # Pop the event from the events so that this is the last time it is communicated 

47 if event_status == enums.EVENT_STATUS.COMPLETED: 

48 self.events[ven_id].pop(self.events[ven_id].index(event)) 

49 else: 

50 events = None 

51 else: 

52 result = self.on_request_event(ven_id=payload['ven_id']) 

53 if asyncio.iscoroutine(result): 

54 result = await result 

55 if result is None: 

56 events = None 

57 else: 

58 events = utils.order_events(result) 

59 

60 if events is None: 

61 return 'oadrResponse', {} 

62 else: 

63 return 'oadrDistributeEvent', {'events': events} 

64 return 'oadrResponse', result 

65 

66 def on_request_event(self, ven_id): 

67 """ 

68 Placeholder for the on_request_event handler. 

69 """ 

70 logger.warning("You should implement and register your own on_request_event handler " 

71 "that returns the next event for a VEN. This handler will receive a " 

72 "ven_id as its only argument, and should return None (if no events are " 

73 "available), a single Event, or a list of Events.") 

74 return None 

75 

76 @handler('oadrCreatedEvent') 

77 async def created_event(self, payload): 

78 """ 

79 The VEN informs us that they created an EiEvent. 

80 """ 

81 ven_id = payload['ven_id'] 

82 if self.polling_method == 'internal': 

83 for event_response in payload['event_responses']: 

84 event_id = event_response['event_id'] 

85 modification_number = event_response['modification_number'] 

86 opt_type = event_response['opt_type'] 

87 event = utils.find_by(self.events[ven_id], 

88 'event_descriptor.event_id', event_id, 

89 'event_descriptor.modification_number', modification_number) 

90 if not event: 

91 if event_id not in self.completed_event_ids.get(ven_id, []): 

92 logger.warning(f"""Got an oadrCreatedEvent message from ven '{ven_id}' """ 

93 f"""for event '{event_id}' with modification number """ 

94 f"""{modification_number} that does not exist.""") 

95 raise errors.InvalidIdError 

96 # Remove the event from the events list if the cancellation is confirmed. 

97 if utils.getmember(event, 'event_descriptor.event_status') == enums.EVENT_STATUS.CANCELLED: 

98 utils.pop_by(self.events[ven_id], 'event_descriptor.event_id', event_id) 

99 if event_response['event_id'] in self.event_callbacks: 

100 event, callback = self.event_callbacks.pop(event_id) 

101 if isinstance(callback, asyncio.Future): 

102 if callback.done(): 

103 logger.warning(f"Got a second response '{opt_type}' from ven '{ven_id}' " 

104 f"to event '{event_id}', which we cannot use because the " 

105 "callback future you provided was already completed during " 

106 "the first response.") 

107 else: 

108 callback.set_result(opt_type) 

109 else: 

110 result = callback(ven_id=ven_id, event_id=event_id, opt_type=opt_type) 

111 if asyncio.iscoroutine(result): 

112 result = await result 

113 else: 

114 result = self.on_created_event(ven_id=ven_id, event_id=event_id, opt_type=opt_type) 

115 if asyncio.iscoroutine(result): 

116 result = await(result) 

117 return 'oadrResponse', {} 

118 

119 def on_created_event(self, ven_id, event_id, opt_type): 

120 """ 

121 Placeholder for the on_created_event handler. 

122 """ 

123 logger.warning("You should implement and register you own on_created_event handler " 

124 "to receive the opt status for an Event that you sent to the VEN. This " 

125 "handler will receive a ven_id, event_id and opt_status. " 

126 "You don't need to return anything from this handler.") 

127 return None