Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: Apache-2.0
3# Copyright 2020 Contributors to OpenLEADR
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9# http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17from . import service, handler, VTNService
18from asyncio import iscoroutine
19from openleadr import objects, utils
20import logging
21import inspect
22logger = logging.getLogger('openleadr')
24# ╔══════════════════════════════════════════════════════════════════════════╗
25# ║ REPORT SERVICE ║
26# ╚══════════════════════════════════════════════════════════════════════════╝
27# ┌──────────────────────────────────────────────────────────────────────────┐
28# │ The VEN can register its reporting capabilities. │
29# │ │
30# │ ┌────┐ ┌────┐ │
31# │ │VEN │ │VTN │ │
32# │ └─┬──┘ └─┬──┘ │
33# │ │───────────────oadrRegisterReport(METADATA Report)──────────────▶│ │
34# │ │ │ │
35# │ │◀ ─ ─ ─ ─oadrRegisteredReport(optional oadrReportRequest) ─ ─ ─ ─│ │
36# │ │ │ │
37# │ │ │ │
38# │ │─────────────oadrCreatedReport(if report requested)─────────────▶│ │
39# │ │ │ │
40# │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrResponse()─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
41# │ │ │ │
42# │ │
43# └──────────────────────────────────────────────────────────────────────────┘
44# ┌──────────────────────────────────────────────────────────────────────────┐
45# │ A report can also be canceled │
46# │ │
47# │ ┌────┐ ┌────┐ │
48# │ │VEN │ │VTN │ │
49# │ └─┬──┘ └─┬──┘ │
50# │ │───────────────oadrRegisterReport(METADATA Report)──────────────▶│ │
51# │ │ │ │
52# │ │◀ ─ ─ ─ ─oadrRegisteredReport(optional oadrReportRequest) ─ ─ ─ ─│ │
53# │ │ │ │
54# │ │ │ │
55# │ │─────────────oadrCreatedReport(if report requested)─────────────▶│ │
56# │ │ │ │
57# │ │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ oadrResponse()─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ │
58# │ │ │ │
59# │ │
60# └──────────────────────────────────────────────────────────────────────────┘
63@service('EiReport')
64class ReportService(VTNService):
66 def __init__(self, vtn_id):
67 super().__init__(vtn_id)
68 self.report_callbacks = {}
69 self.registered_reports = {}
71 @handler('oadrRegisterReport')
72 async def register_report(self, payload):
73 """
74 Handle the VENs reporting capabilities.
75 """
76 report_requests = []
77 args = inspect.signature(self.on_register_report).parameters
78 if all(['ven_id' in args, 'resource_id' in args, 'measurement' in args,
79 'min_sampling_interval' in args, 'max_sampling_interval' in args,
80 'unit' in args, 'scale' in args]):
81 mode = 'compact'
82 else:
83 mode = 'full'
85 if payload['reports'] is None:
86 return
88 for report in payload['reports']:
89 if report['report_name'] == 'METADATA_TELEMETRY_STATUS':
90 if mode == 'compact':
91 results = [self.on_register_report(ven_id=payload['ven_id'],
92 resource_id=rd.get('report_data_source', {}).get('resource_id'),
93 measurement='Status',
94 unit=None,
95 scale=None,
96 min_sampling_interval=rd['sampling_rate']['min_period'],
97 max_sampling_interval=rd['sampling_rate']['max_period'])
98 for rd in report['report_descriptions']]
99 results = await utils.gather_if_required(results)
100 elif mode == 'full':
101 results = await utils.await_if_required(self.on_register_report(report))
102 elif report['report_name'] == 'METADATA_TELEMETRY_USAGE':
103 if mode == 'compact':
104 results = [self.on_register_report(ven_id=payload['ven_id'],
105 resource_id=rd.get('report_data_source', {}).get('resource_id'),
106 measurement=rd['measurement']['description'],
107 unit=rd['measurement']['unit'],
108 scale=rd['measurement']['scale'],
109 min_sampling_interval=rd['sampling_rate']['min_period'],
110 max_sampling_interval=rd['sampling_rate']['max_period'])
111 for rd in report['report_descriptions']]
112 results = await utils.gather_if_required(results)
113 elif mode == 'full':
114 results = await utils.await_if_required(self.on_register_report(report))
115 elif report['report_name'] in ('METADATA_HISTORY_USAGE', 'METADATA_HISTORY_GREENBUTTON'):
116 if payload['ven_id'] not in self.registered_reports:
117 self.registered_reports[payload['ven_id']] = []
118 report['report_name'] = report['report_name'][9:]
119 self.registered_reports[payload['ven_id']].append(report)
120 report_requests.append(None)
121 continue
122 else:
123 logger.warning("Reports other than TELEMETRY_USAGE, TELEMETRY_STATUS, "
124 "HISTORY_USAGE and HISTORY_GREENBUTTON are not yet supported. "
125 f"Skipping report with name {report['report_name']}.")
126 report_requests.append(None)
127 continue
129 # Perform some rudimentary checks on the returned type
130 if results is not None:
131 if not isinstance(results, list):
132 logger.error("Your on_register_report handler must return a list of tuples or None; "
133 f"it returned '{results}' ({results.__class__.__name__}).")
134 results = None
135 else:
136 for i, r in enumerate(results):
137 if r is None:
138 continue
139 if not isinstance(r, tuple):
140 if mode == 'compact':
141 logger.error("Your on_register_report handler must return a tuple or None; "
142 f"it returned '{r}' ({r.__class__.__name__}).")
143 elif mode == 'full':
144 logger.error("Your on_register_report handler must return a list of tuples or None; "
145 f"The first item from the list was '{r}' ({r.__class__.__name__}).")
146 results[i] = None
147 # If we used compact mode, prepend the r_id to each result
148 # (this is already there when using the full mode)
149 if mode == 'compact':
150 results = [(report['report_descriptions'][i]['r_id'], *results[i])
151 for i in range(len(report['report_descriptions'])) if isinstance(results[i], tuple)]
152 report_requests.append(results)
153 utils.validate_report_request_tuples(report_requests, mode=mode)
155 for i, report_request in enumerate(report_requests):
156 if report_request is None or len(report_request) == 0 or all(rrq is None for rrq in report_request):
157 continue
158 # Check if all sampling rates per report_request are the same
159 sampling_interval = min(rrq[2] for rrq in report_request if isinstance(rrq, tuple))
160 if not all(rrq is not None and report_request[0][2] == sampling_interval for rrq in report_request):
161 logger.error("OpenADR does not support multiple different sampling rates per "
162 "report. OpenLEADR will set all sampling rates to "
163 f"{sampling_interval}")
165 # Form the report request
166 oadr_report_requests = []
167 for i, report_request in enumerate(report_requests):
168 if report_request is None or len(report_request) == 0 or all(rrq is None for rrq in report_request):
169 continue
171 orig_report = payload['reports'][i]
172 report_specifier_id = orig_report['report_specifier_id']
173 report_request_id = utils.generate_id()
174 specifier_payloads = []
175 for rrq in report_request:
176 if len(rrq) == 3:
177 r_id, callback, sampling_interval = rrq
178 report_interval = sampling_interval
179 elif len(rrq) == 4:
180 r_id, callback, sampling_interval, report_interval = rrq
182 report_description = utils.find_by(orig_report['report_descriptions'], 'r_id', r_id)
183 reading_type = report_description['reading_type']
184 specifier_payloads.append(objects.SpecifierPayload(r_id=r_id,
185 reading_type=reading_type))
186 # Append the callback to our list of known callbacks
187 self.report_callbacks[(report_request_id, r_id)] = callback
189 # Add the ReportSpecifier to the ReportRequest
190 report_specifier = objects.ReportSpecifier(report_specifier_id=report_specifier_id,
191 granularity=sampling_interval,
192 report_back_duration=report_interval,
193 specifier_payloads=specifier_payloads)
195 # Add the ReportRequest to our outgoing message
196 oadr_report_requests.append(objects.ReportRequest(report_request_id=report_request_id,
197 report_specifier=report_specifier))
199 # Put the report requests back together
200 response_type = 'oadrRegisteredReport'
201 response_payload = {'report_requests': oadr_report_requests}
202 return response_type, response_payload
204 async def on_register_report(self, payload):
205 """
206 Pre-handler for a oadrOnRegisterReport message. This will call your own handler (if defined)
207 to allow for requesting the offered reports.
208 """
209 logger.warning("You should implement and register your own on_register_report handler "
210 "if you want to receive reports from a VEN. This handler will receive the "
211 "following arguments: ven_id, resource_id, measurement, unit, scale, "
212 "min_sampling_interval, max_sampling_interval and should return either "
213 "None or (callback, sampling_interval) or "
214 "(callback, sampling_interval, reporting_interval).")
215 return None
217 @handler('oadrUpdateReport')
218 async def update_report(self, payload):
219 """
220 Handle a report that we received from the VEN.
221 """
222 for report in payload['reports']:
223 report_request_id = report['report_request_id']
224 if not self.report_callbacks:
225 result = self.on_update_report(report)
226 if iscoroutine(result):
227 result = await result
228 continue
229 for r_id, values in utils.group_by(report['intervals'], 'report_payload.r_id').items():
230 # Find the callback that was registered.
231 if (report_request_id, r_id) in self.report_callbacks:
232 # Collect the values
233 values = [(ri['dtstart'], ri['report_payload']['value']) for ri in values]
234 # Call the callback function to deliver the values
235 result = self.report_callbacks[(report_request_id, r_id)](values)
236 if iscoroutine(result):
237 result = await result
239 response_type = 'oadrUpdatedReport'
240 response_payload = {}
241 return response_type, response_payload
243 async def on_update_report(self, payload):
244 """
245 Placeholder for the on_update_report handler.
246 """
247 logger.warning("You should implement and register your own on_update_report handler "
248 "to deal with reports that your receive from the VEN. This handler will "
249 "receive either a complete oadrReport dict, or a list of (datetime, value) "
250 "tuples that you can then process how you see fit. You don't "
251 "need to return anything from that handler.")
252 return None