Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: Apache-2.0
3# Copyright 2020 Contributors to OpenLEADR
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9# http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
17from datetime import datetime, timedelta, timezone
18from dataclasses import asdict, is_dataclass
19from openleadr import enums, utils
20import logging
21logger = logging.getLogger('openleadr')
24def preflight_message(message_type, message_payload):
25 """
26 Tests message contents before sending them. It will correct benign errors
27 and warn you about them. Uncorrectable errors will raise an Exception. It
28 changes the message_payload dict in-place.
30 :param message_type string: The type of message you are sending
31 :param message_payload dict: The contents of the message
32 """
33 if f'_preflight_{message_type}' in globals():
34 message_payload = message_payload.copy()
35 for key, value in message_payload.items():
36 if isinstance(value, list):
37 message_payload[key] = [asdict(item) if is_dataclass(item) else item
38 for item in value]
39 else:
40 message_payload[key] = asdict(value) if is_dataclass(value) else value
41 globals()[f'_preflight_{message_type}'](message_payload)
42 return message_payload
45def _preflight_oadrRegisterReport(message_payload):
46 for report in message_payload['reports']:
47 # Check that the report name is preceded by METADATA_ when registering reports
48 if report['report_name'] in enums.REPORT_NAME.values \
49 and not report['report_name'].startswith("METADATA"):
50 report['report_name'] = 'METADATA_' + report['report_name']
52 # Check that the measurement name and description match according to the schema
53 for report_description in report['report_descriptions']:
54 if 'measurement' in report_description and report_description['measurement'] is not None:
55 utils.validate_report_measurement_dict(report_description['measurement'])
57 # Add the correct namespace to the measurement
58 for report_description in report['report_descriptions']:
59 if 'measurement' in report_description and report_description['measurement'] is not None:
60 if report_description['measurement']['name'] in enums._MEASUREMENT_NAMESPACES:
61 measurement_name = report_description['measurement']['name']
62 measurement_ns = enums._MEASUREMENT_NAMESPACES[measurement_name]
63 report_description['measurement']['ns'] = measurement_ns
64 else:
65 raise ValueError("The Measurement Name is unknown")
68def _preflight_oadrDistributeEvent(message_payload):
69 if 'parse_duration' not in globals():
70 from .utils import parse_duration
71 # Check that the total event_duration matches the sum of the interval durations (rule 8)
72 for event in message_payload['events']:
73 active_period_duration = event['active_period']['duration']
74 signal_durations = []
75 for signal in event['event_signals']:
76 signal_durations.append(sum([parse_duration(i['duration'])
77 for i in signal['intervals']], timedelta(seconds=0)))
79 if not all([d == active_period_duration for d in signal_durations]):
80 if not all([d == signal_durations[0] for d in signal_durations]):
81 raise ValueError("The different EventSignals have different total durations. "
82 "Please correct this.")
83 else:
84 logger.warning(f"The active_period duration for event "
85 f"{event['event_descriptor']['event_id']} ({active_period_duration})"
86 f" differs from the sum of the interval's durations "
87 f"({signal_durations[0]}). The active_period duration has been "
88 f"adjusted to ({signal_durations[0]}).")
89 event['active_period']['duration'] = signal_durations[0]
91 # Check that payload values with signal name SIMPLE are constricted (rule 9)
92 for event in message_payload['events']:
93 for event_signal in event['event_signals']:
94 if event_signal['signal_name'] == "SIMPLE":
95 for interval in event_signal['intervals']:
96 if interval['signal_payload'] not in (0, 1, 2, 3):
97 raise ValueError("Payload Values used with Signal Name SIMPLE "
98 "must be one of 0, 1, 2 or 3")
100 # Check that the current_value is 0 for SIMPLE events that are not yet active (rule 14)
101 for event in message_payload['events']:
102 for event_signal in event['event_signals']:
103 if 'current_value' in event_signal and event_signal['current_value'] != 0:
104 if event_signal['signal_name'] == "SIMPLE" \
105 and event['event_descriptor']['event_status'] != "ACTIVE":
106 logger.warning("The current_value for a SIMPLE event "
107 "that is not yet active must be 0. "
108 "This will be corrected.")
109 event_signal['current_value'] = 0
111 # Add the correct namespace to the measurement
112 for event in message_payload['events']:
113 for event_signal in event['event_signals']:
114 if 'measurement' in event_signal and event_signal['measurement'] is not None:
115 if event_signal['measurement']['name'] in enums._MEASUREMENT_NAMESPACES:
116 measurement_name = event_signal['measurement']['name']
117 measurement_ns = enums._MEASUREMENT_NAMESPACES[measurement_name]
118 event_signal['measurement']['ns'] = measurement_ns
119 else:
120 raise ValueError("The Measurement Name is unknown")
122 # Check that there is a valid oadrResponseRequired value for each Event
123 for event in message_payload['events']:
124 if 'response_required' not in event:
125 event['response_required'] = 'always'
126 elif event['response_required'] not in ('never', 'always'):
127 logger.warning(f"The response_required property in an Event "
128 f"should be 'never' or 'always', not "
129 f"{event['response_required']}. Changing to 'always'.")
130 event['response_required'] = 'always'
132 # Check that there is a valid oadrResponseRequired value for each Event
133 for event in message_payload['events']:
134 if 'created_date_time' not in event['event_descriptor'] \
135 or not event['event_descriptor']['created_date_time']:
136 logger.warning("Your event descriptor did not contain a created_date_time. "
137 "This will be automatically added.")
138 event['event_descriptor']['created_date_time'] = datetime.now(timezone.utc)
140 # Check that the target designations are correct and consistent
141 for event in message_payload['events']:
142 if 'targets' in event and 'targets_by_type' in event:
143 if utils.group_targets_by_type(event['targets']) != event['targets_by_type']:
144 raise ValueError("You assigned both 'targets' and 'targets_by_type' in your event, "
145 "but the two were not consistent with each other. "
146 f"You supplied 'targets' = {event['targets']} and "
147 f"'targets_by_type' = {event['targets_by_type']}")
148 elif 'targets_by_type' in event and 'targets' not in event:
149 event['targets'] = utils.ungroup_targets_by_type(event['targets_by_type'])