Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-License-Identifier: Apache-2.0 

2 

3# Copyright 2020 Contributors to OpenLEADR 

4 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8 

9# http://www.apache.org/licenses/LICENSE-2.0 

10 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17from . import service, handler, VTNService 

18from asyncio import iscoroutine 

19from openleadr import objects, utils 

20import logging 

21import inspect 

22logger = logging.getLogger('openleadr') 

23 

24# ╔══════════════════════════════════════════════════════════════════════════╗ 

25# ║ REPORT SERVICE ║ 

26# ╚══════════════════════════════════════════════════════════════════════════╝ 

27# ┌──────────────────────────────────────────────────────────────────────────┐ 

28# │ The VEN can register its reporting capabilities. │ 

29# │ │ 

30# │ ┌────┐ ┌────┐ │ 

31# │ │VEN │ │VTN │ │ 

32# │ └─┬──┘ └─┬──┘ │ 

33# │ │───────────────oadrRegisterReport(METADATA Report)──────────────▶│ │ 

34# │ │ │ │ 

35# │ │◀ ─ ─ ─ ─oadrRegisteredReport(optional oadrReportRequest) ─ ─ ─ ─│ │ 

36# │ │ │ │ 

37# │ │ │ │ 

38# │ │─────────────oadrCreatedReport(if report requested)─────────────▶│ │ 

39# │ │ │ │ 

40# │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrResponse()─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ 

41# │ │ │ │ 

42# │ │ 

43# └──────────────────────────────────────────────────────────────────────────┘ 

44# ┌──────────────────────────────────────────────────────────────────────────┐ 

45# │ A report can also be canceled │ 

46# │ │ 

47# │ ┌────┐ ┌────┐ │ 

48# │ │VEN │ │VTN │ │ 

49# │ └─┬──┘ └─┬──┘ │ 

50# │ │───────────────oadrRegisterReport(METADATA Report)──────────────▶│ │ 

51# │ │ │ │ 

52# │ │◀ ─ ─ ─ ─oadrRegisteredReport(optional oadrReportRequest) ─ ─ ─ ─│ │ 

53# │ │ │ │ 

54# │ │ │ │ 

55# │ │─────────────oadrCreatedReport(if report requested)─────────────▶│ │ 

56# │ │ │ │ 

57# │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrResponse()─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │ 

58# │ │ │ │ 

59# │ │ 

60# └──────────────────────────────────────────────────────────────────────────┘ 

61 

62 

63@service('EiReport') 

64class ReportService(VTNService): 

65 

66 def __init__(self, vtn_id): 

67 super().__init__(vtn_id) 

68 self.report_callbacks = {} 

69 self.registered_reports = {} 

70 

71 @handler('oadrRegisterReport') 

72 async def register_report(self, payload): 

73 """ 

74 Handle the VENs reporting capabilities. 

75 """ 

76 report_requests = [] 

77 args = inspect.signature(self.on_register_report).parameters 

78 if all(['ven_id' in args, 'resource_id' in args, 'measurement' in args, 

79 'min_sampling_interval' in args, 'max_sampling_interval' in args, 

80 'unit' in args, 'scale' in args]): 

81 mode = 'compact' 

82 else: 

83 mode = 'full' 

84 

85 if payload['reports'] is None: 

86 return 

87 

88 for report in payload['reports']: 

89 if report['report_name'] == 'METADATA_TELEMETRY_STATUS': 

90 if mode == 'compact': 

91 results = [self.on_register_report(ven_id=payload['ven_id'], 

92 resource_id=rd.get('report_data_source', {}).get('resource_id'), 

93 measurement='Status', 

94 unit=None, 

95 scale=None, 

96 min_sampling_interval=rd['sampling_rate']['min_period'], 

97 max_sampling_interval=rd['sampling_rate']['max_period']) 

98 for rd in report['report_descriptions']] 

99 results = await utils.gather_if_required(results) 

100 elif mode == 'full': 

101 results = await utils.await_if_required(self.on_register_report(report)) 

102 elif report['report_name'] == 'METADATA_TELEMETRY_USAGE': 

103 if mode == 'compact': 

104 results = [self.on_register_report(ven_id=payload['ven_id'], 

105 resource_id=rd.get('report_data_source', {}).get('resource_id'), 

106 measurement=rd['measurement']['description'], 

107 unit=rd['measurement']['unit'], 

108 scale=rd['measurement']['scale'], 

109 min_sampling_interval=rd['sampling_rate']['min_period'], 

110 max_sampling_interval=rd['sampling_rate']['max_period']) 

111 for rd in report['report_descriptions']] 

112 results = await utils.gather_if_required(results) 

113 elif mode == 'full': 

114 results = await utils.await_if_required(self.on_register_report(report)) 

115 elif report['report_name'] in ('METADATA_HISTORY_USAGE', 'METADATA_HISTORY_GREENBUTTON'): 

116 if payload['ven_id'] not in self.registered_reports: 

117 self.registered_reports[payload['ven_id']] = [] 

118 report['report_name'] = report['report_name'][9:] 

119 self.registered_reports[payload['ven_id']].append(report) 

120 report_requests.append(None) 

121 continue 

122 else: 

123 logger.warning("Reports other than TELEMETRY_USAGE, TELEMETRY_STATUS, " 

124 "HISTORY_USAGE and HISTORY_GREENBUTTON are not yet supported. " 

125 f"Skipping report with name {report['report_name']}.") 

126 report_requests.append(None) 

127 continue 

128 

129 # Perform some rudimentary checks on the returned type 

130 if results is not None: 

131 if not isinstance(results, list): 

132 logger.error("Your on_register_report handler must return a list of tuples or None; " 

133 f"it returned '{results}' ({results.__class__.__name__}).") 

134 results = None 

135 else: 

136 for i, r in enumerate(results): 

137 if r is None: 

138 continue 

139 if not isinstance(r, tuple): 

140 if mode == 'compact': 

141 logger.error("Your on_register_report handler must return a tuple or None; " 

142 f"it returned '{r}' ({r.__class__.__name__}).") 

143 elif mode == 'full': 

144 logger.error("Your on_register_report handler must return a list of tuples or None; " 

145 f"The first item from the list was '{r}' ({r.__class__.__name__}).") 

146 results[i] = None 

147 # If we used compact mode, prepend the r_id to each result 

148 # (this is already there when using the full mode) 

149 if mode == 'compact': 

150 results = [(report['report_descriptions'][i]['r_id'], *results[i]) 

151 for i in range(len(report['report_descriptions'])) if isinstance(results[i], tuple)] 

152 report_requests.append(results) 

153 utils.validate_report_request_tuples(report_requests, mode=mode) 

154 

155 for i, report_request in enumerate(report_requests): 

156 if report_request is None or len(report_request) == 0 or all(rrq is None for rrq in report_request): 

157 continue 

158 # Check if all sampling rates per report_request are the same 

159 sampling_interval = min(rrq[2] for rrq in report_request if isinstance(rrq, tuple)) 

160 if not all(rrq is not None and report_request[0][2] == sampling_interval for rrq in report_request): 

161 logger.error("OpenADR does not support multiple different sampling rates per " 

162 "report. OpenLEADR will set all sampling rates to " 

163 f"{sampling_interval}") 

164 

165 # Form the report request 

166 oadr_report_requests = [] 

167 for i, report_request in enumerate(report_requests): 

168 if report_request is None or len(report_request) == 0 or all(rrq is None for rrq in report_request): 

169 continue 

170 

171 orig_report = payload['reports'][i] 

172 report_specifier_id = orig_report['report_specifier_id'] 

173 report_request_id = utils.generate_id() 

174 specifier_payloads = [] 

175 for rrq in report_request: 

176 if len(rrq) == 3: 

177 r_id, callback, sampling_interval = rrq 

178 report_interval = sampling_interval 

179 elif len(rrq) == 4: 

180 r_id, callback, sampling_interval, report_interval = rrq 

181 

182 report_description = utils.find_by(orig_report['report_descriptions'], 'r_id', r_id) 

183 reading_type = report_description['reading_type'] 

184 specifier_payloads.append(objects.SpecifierPayload(r_id=r_id, 

185 reading_type=reading_type)) 

186 # Append the callback to our list of known callbacks 

187 self.report_callbacks[(report_request_id, r_id)] = callback 

188 

189 # Add the ReportSpecifier to the ReportRequest 

190 report_specifier = objects.ReportSpecifier(report_specifier_id=report_specifier_id, 

191 granularity=sampling_interval, 

192 report_back_duration=report_interval, 

193 specifier_payloads=specifier_payloads) 

194 

195 # Add the ReportRequest to our outgoing message 

196 oadr_report_requests.append(objects.ReportRequest(report_request_id=report_request_id, 

197 report_specifier=report_specifier)) 

198 

199 # Put the report requests back together 

200 response_type = 'oadrRegisteredReport' 

201 response_payload = {'report_requests': oadr_report_requests} 

202 return response_type, response_payload 

203 

204 async def on_register_report(self, payload): 

205 """ 

206 Pre-handler for a oadrOnRegisterReport message. This will call your own handler (if defined) 

207 to allow for requesting the offered reports. 

208 """ 

209 logger.warning("You should implement and register your own on_register_report handler " 

210 "if you want to receive reports from a VEN. This handler will receive the " 

211 "following arguments: ven_id, resource_id, measurement, unit, scale, " 

212 "min_sampling_interval, max_sampling_interval and should return either " 

213 "None or (callback, sampling_interval) or " 

214 "(callback, sampling_interval, reporting_interval).") 

215 return None 

216 

217 @handler('oadrUpdateReport') 

218 async def update_report(self, payload): 

219 """ 

220 Handle a report that we received from the VEN. 

221 """ 

222 for report in payload['reports']: 

223 report_request_id = report['report_request_id'] 

224 if not self.report_callbacks: 

225 result = self.on_update_report(report) 

226 if iscoroutine(result): 

227 result = await result 

228 continue 

229 for r_id, values in utils.group_by(report['intervals'], 'report_payload.r_id').items(): 

230 # Find the callback that was registered. 

231 if (report_request_id, r_id) in self.report_callbacks: 

232 # Collect the values 

233 values = [(ri['dtstart'], ri['report_payload']['value']) for ri in values] 

234 # Call the callback function to deliver the values 

235 result = self.report_callbacks[(report_request_id, r_id)](values) 

236 if iscoroutine(result): 

237 result = await result 

238 

239 response_type = 'oadrUpdatedReport' 

240 response_payload = {} 

241 return response_type, response_payload 

242 

243 async def on_update_report(self, payload): 

244 """ 

245 Placeholder for the on_update_report handler. 

246 """ 

247 logger.warning("You should implement and register your own on_update_report handler " 

248 "to deal with reports that your receive from the VEN. This handler will " 

249 "receive either a complete oadrReport dict, or a list of (datetime, value) " 

250 "tuples that you can then process how you see fit. You don't " 

251 "need to return anything from that handler.") 

252 return None