Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-License-Identifier: Apache-2.0 

2 

3# Copyright 2020 Contributors to OpenLEADR 

4 

5# Licensed under the Apache License, Version 2.0 (the "License"); 

6# you may not use this file except in compliance with the License. 

7# You may obtain a copy of the License at 

8 

9# http://www.apache.org/licenses/LICENSE-2.0 

10 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17from datetime import datetime, timedelta, timezone 

18from dataclasses import asdict, is_dataclass 

19from openleadr import enums, utils 

20import logging 

21logger = logging.getLogger('openleadr') 

22 

23 

24def preflight_message(message_type, message_payload): 

25 """ 

26 Tests message contents before sending them. It will correct benign errors 

27 and warn you about them. Uncorrectable errors will raise an Exception. It 

28 changes the message_payload dict in-place. 

29 

30 :param message_type string: The type of message you are sending 

31 :param message_payload dict: The contents of the message 

32 """ 

33 if f'_preflight_{message_type}' in globals(): 

34 message_payload = message_payload.copy() 

35 for key, value in message_payload.items(): 

36 if isinstance(value, list): 

37 message_payload[key] = [asdict(item) if is_dataclass(item) else item 

38 for item in value] 

39 else: 

40 message_payload[key] = asdict(value) if is_dataclass(value) else value 

41 globals()[f'_preflight_{message_type}'](message_payload) 

42 return message_payload 

43 

44 

45def _preflight_oadrRegisterReport(message_payload): 

46 for report in message_payload['reports']: 

47 # Check that the report name is preceded by METADATA_ when registering reports 

48 if report['report_name'] in enums.REPORT_NAME.values \ 

49 and not report['report_name'].startswith("METADATA"): 

50 report['report_name'] = 'METADATA_' + report['report_name'] 

51 

52 # Check that the measurement name and description match according to the schema 

53 for report_description in report['report_descriptions']: 

54 if 'measurement' in report_description and report_description['measurement'] is not None: 

55 utils.validate_report_measurement_dict(report_description['measurement']) 

56 

57 # Add the correct namespace to the measurement 

58 for report_description in report['report_descriptions']: 

59 if 'measurement' in report_description and report_description['measurement'] is not None: 

60 if report_description['measurement']['name'] in enums._MEASUREMENT_NAMESPACES: 

61 measurement_name = report_description['measurement']['name'] 

62 measurement_ns = enums._MEASUREMENT_NAMESPACES[measurement_name] 

63 report_description['measurement']['ns'] = measurement_ns 

64 else: 

65 raise ValueError("The Measurement Name is unknown") 

66 

67 

68def _preflight_oadrDistributeEvent(message_payload): 

69 if 'parse_duration' not in globals(): 

70 from .utils import parse_duration 

71 # Check that the total event_duration matches the sum of the interval durations (rule 8) 

72 for event in message_payload['events']: 

73 active_period_duration = event['active_period']['duration'] 

74 signal_durations = [] 

75 for signal in event['event_signals']: 

76 signal_durations.append(sum([parse_duration(i['duration']) 

77 for i in signal['intervals']], timedelta(seconds=0))) 

78 

79 if not all([d == active_period_duration for d in signal_durations]): 

80 if not all([d == signal_durations[0] for d in signal_durations]): 

81 raise ValueError("The different EventSignals have different total durations. " 

82 "Please correct this.") 

83 else: 

84 logger.warning(f"The active_period duration for event " 

85 f"{event['event_descriptor']['event_id']} ({active_period_duration})" 

86 f" differs from the sum of the interval's durations " 

87 f"({signal_durations[0]}). The active_period duration has been " 

88 f"adjusted to ({signal_durations[0]}).") 

89 event['active_period']['duration'] = signal_durations[0] 

90 

91 # Check that payload values with signal name SIMPLE are constricted (rule 9) 

92 for event in message_payload['events']: 

93 for event_signal in event['event_signals']: 

94 if event_signal['signal_name'] == "SIMPLE": 

95 for interval in event_signal['intervals']: 

96 if interval['signal_payload'] not in (0, 1, 2, 3): 

97 raise ValueError("Payload Values used with Signal Name SIMPLE " 

98 "must be one of 0, 1, 2 or 3") 

99 

100 # Check that the current_value is 0 for SIMPLE events that are not yet active (rule 14) 

101 for event in message_payload['events']: 

102 for event_signal in event['event_signals']: 

103 if 'current_value' in event_signal and event_signal['current_value'] != 0: 

104 if event_signal['signal_name'] == "SIMPLE" \ 

105 and event['event_descriptor']['event_status'] != "ACTIVE": 

106 logger.warning("The current_value for a SIMPLE event " 

107 "that is not yet active must be 0. " 

108 "This will be corrected.") 

109 event_signal['current_value'] = 0 

110 

111 # Add the correct namespace to the measurement 

112 for event in message_payload['events']: 

113 for event_signal in event['event_signals']: 

114 if 'measurement' in event_signal and event_signal['measurement'] is not None: 

115 if event_signal['measurement']['name'] in enums._MEASUREMENT_NAMESPACES: 

116 measurement_name = event_signal['measurement']['name'] 

117 measurement_ns = enums._MEASUREMENT_NAMESPACES[measurement_name] 

118 event_signal['measurement']['ns'] = measurement_ns 

119 else: 

120 raise ValueError("The Measurement Name is unknown") 

121 

122 # Check that there is a valid oadrResponseRequired value for each Event 

123 for event in message_payload['events']: 

124 if 'response_required' not in event: 

125 event['response_required'] = 'always' 

126 elif event['response_required'] not in ('never', 'always'): 

127 logger.warning(f"The response_required property in an Event " 

128 f"should be 'never' or 'always', not " 

129 f"{event['response_required']}. Changing to 'always'.") 

130 event['response_required'] = 'always' 

131 

132 # Check that there is a valid oadrResponseRequired value for each Event 

133 for event in message_payload['events']: 

134 if 'created_date_time' not in event['event_descriptor'] \ 

135 or not event['event_descriptor']['created_date_time']: 

136 logger.warning("Your event descriptor did not contain a created_date_time. " 

137 "This will be automatically added.") 

138 event['event_descriptor']['created_date_time'] = datetime.now(timezone.utc) 

139 

140 # Check that the target designations are correct and consistent 

141 for event in message_payload['events']: 

142 if 'targets' in event and 'targets_by_type' in event: 

143 if utils.group_targets_by_type(event['targets']) != event['targets_by_type']: 

144 raise ValueError("You assigned both 'targets' and 'targets_by_type' in your event, " 

145 "but the two were not consistent with each other. " 

146 f"You supplied 'targets' = {event['targets']} and " 

147 f"'targets_by_type' = {event['targets_by_type']}") 

148 elif 'targets_by_type' in event and 'targets' not in event: 

149 event['targets'] = utils.ungroup_targets_by_type(event['targets_by_type'])