diff --git a/lambda/index.py b/lambda/index.py index 2c89a6d..e831d87 100644 --- a/lambda/index.py +++ b/lambda/index.py @@ -11,6 +11,22 @@ import time from botocore.exceptions import ClientError +from test_sprinkler import DYNAMODB_SPRINKLER_STATE_TABLE + + +# Meta data that device sends during registration +DYNAMODB_DEVICE_METADATA_TABLE = 'device_metadata_table' +# Computed zone-wise aggregated data +DYNAMODB_AGGREGATE_DATA_TABLE = 'aggregate_data_table' +# Alert situation data sent to sprinklers +DYNAMODB_SPRINKLER_ALERTS_TABLE = 'sprinkler_alerts_table' +# Sprinklers state management data +DYNAMODB_SPRINKLER_STATUS_TABLE = 'sprinkler_state_table' + +THRESHOLD_LOW_HUMIDITY = 40.0 +THRESHOLD_HIGH_HUMIDITY = 80.0 +THRESHOLD_TEMP = 15.0 + DBClient = boto3.client("dynamodb", region_name="us-east-1") DBResource = boto3.resource("dynamodb", region_name="us-east-1") @@ -221,9 +237,33 @@ def get_data(table_name): return data # Arranges data into sorted order for aggregate data generation -def get_aggeregate_data(aggregate_data): - print("Arranging data for aggregation .....") - values = {} +def get_aggeregate_data(telemetry_data): + structured_data = create_structured_telemetry_data(telemetry_data) + print("Aggregating data..") + aggregate_data = [] + # Calculate aggregate data + # For each Zone Id Aggregate the data per DeviceType, DataType and TimeStamp + for device in structured_data.keys(): + for devType in structured_data[device].keys(): + # For each minute + # For each datatype + for datatype in structured_data[device][devType].keys(): + # For each minute + for timestamp in structured_data[device][devType][datatype].keys(): + values_list = structured_data[device][devType][datatype][timestamp]["values"] + aggregate_data_entry = {} + + aggregate_data_entry['zoneId'] = device + aggregate_data_entry['deviceType'] = devType + aggregate_data_entry['dataType'] = datatype + aggregate_data_entry['timestamp'] = timestamp + aggregate_data_entry['average'] = Decimal(str(round(sum(values_list) / len(values_list), 2))) + aggregate_data.append(aggregate_data_entry) + + return aggregate_data + +def create_structured_telemetry_data(telemetry_data): + structured_data = {} # Sample of aggregated values # =============================== # { @@ -241,55 +281,31 @@ def get_aggeregate_data(aggregate_data): # }, # } - for entry in aggregate_data: + for entry in telemetry_data: time_truncated_to_minutes = datetime.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ').strftime( '%Y-%m-%dT%H:%M:00Z') # if the value list does not exists for zone id create empty list - if entry['zoneId'] not in values.keys(): - values[entry["zoneId"]] = {} + if entry['zoneId'] not in structured_data.keys(): + structured_data[entry["zoneId"]] = {} # if dictionary for the devicetype does not exists then create an empty dictionary - if entry['deviceType'] not in values[entry["zoneId"]].keys(): - values[entry["zoneId"]][entry['deviceType']] = {} + if entry['deviceType'] not in structured_data[entry["zoneId"]].keys(): + structured_data[entry["zoneId"]][entry['deviceType']] = {} # if dictionary for the datatype does not exists then create an empty dictionary - if entry['datatype'] not in values[entry["zoneId"]][entry["deviceType"]].keys(): - values[entry["zoneId"]][entry['deviceType']][entry["datatype"]] = {} + if entry['datatype'] not in structured_data[entry["zoneId"]][entry["deviceType"]].keys(): + structured_data[entry["zoneId"]][entry['deviceType']][entry["datatype"]] = {} # if dictionary for the minute does not exists then create an empty dictionary and empty value list for the timestamp - if time_truncated_to_minutes not in values[entry["zoneId"]][entry['deviceType']][entry['datatype']].keys(): - values[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes] = {} - values[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes]["values"] = [] + if time_truncated_to_minutes not in structured_data[entry["zoneId"]][entry['deviceType']][entry['datatype']].keys(): + structured_data[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes] = {} + structured_data[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes]["values"] = [] - values[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes]["values"].append( + structured_data[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes]["values"].append( float(entry["value"])) - print("values->", values) - - print("Aggregating data..") - # print("input values->", values) - aggregate_data = [] - # Calculate aggregate data - # For each Zone Id Aggregate the data per DeviceType, DataType and TimeStamp - # print("values.keys()->", values.keys()) - for device in values.keys(): - for devType in values[device].keys(): - # For each minute - # For each datatype - for datatype in values[device][devType].keys(): - # For each minute - for timestamp in values[device][devType][datatype].keys(): - values_list = values[device][devType][datatype][timestamp]["values"] - aggregate_data_entry = {} - - aggregate_data_entry['zoneId'] = device - aggregate_data_entry['deviceType'] = devType - aggregate_data_entry['dataType'] = datatype - aggregate_data_entry['timestamp'] = timestamp - aggregate_data_entry['average'] = Decimal(str(round(sum(values_list) / len(values_list), 2))) - aggregate_data.append(aggregate_data_entry) - print("aggregate_data->", aggregate_data) - return aggregate_data + print("values->", structured_data) + return structured_data def publishMQTTMessage(msgTopic, msgPayload): # a3pquorj9tve43-ats.iot.us-east-1.amazonaws.com @@ -301,105 +317,119 @@ def publishMQTTMessage(msgTopic, msgPayload): time.sleep(10) print("After Publishing") + +def create_device_metadata_map(raw_meta_data): + # Format + # { + # "SoilSensor" :{ + # "SMS-001" : {"deviceId":"SMS-001", "deviceType":"<>"...}, + # "SMS-002" : {"deviceId":"SMS-002", "deviceType":"<>"...} + # { + # }, + # "Sprinkler": { + # } + # } + device_meta_data = {} + for m_data in raw_meta_data: + deviceType = m_data['deviceType'] + deviceId = m_data['deviceId'] + # For every new devie type add empty dictionary + if not deviceType in device_meta_data.keys(): + device_meta_data[deviceType] = {} + + if not deviceId in device_meta_data[deviceType].keys(): + device_meta_data[deviceType][deviceId] = m_data + + return device_meta_data + def lambda_handler(event, context): - print("Start Lambda") - # sns_client = boto3.client('sns') - db_name = "agro_agg_data_table" - DBName = checkAndCreateAggrTable(db_name) - db_name1 = "sprinkler_status_on_events_table" - DBName = checkAndCreateSprinklerTable(db_name1) + checkAndCreateAggrTable(DYNAMODB_AGGREGATE_DATA_TABLE) + checkAndCreateSprinklerTable(DYNAMODB_SPRINKLER_ALERTS_TABLE) + checkAndCreateSprinklerStateTable(DYNAMODB_SPRINKLER_STATUS_TABLE) - db_name2 = "sprinkler_zone_state_table" - DBName = checkAndCreateSprinklerStateTable(db_name2) + #Extract device metadata and create a dictinary for easy lookup + device_metadata = get_data(DYNAMODB_DEVICE_METADATA_TABLE) + device_metadata_map = create_device_metadata_map(device_metadata) - table_name = "device_meta_data_table" - data1 = get_data(table_name) - data = [] + + # Get dta from event and append zone info from metadata table + data_from_event = [] for record in event['Records']: - payload = base64.b64decode(record["kinesis"]["data"]) - payload_str1 = str(payload) - payload_str = json.loads(payload) - print("Decoded Payload_str->", payload_str) - deviceId = payload_str["deviceId"] - deviceType = payload_str["deviceType"] - datatype = payload_str["datatype"] - value = payload_str["value"] - timestamp = payload_str["timestamp"] - data.append(json.loads(payload)) - data_with_zone = data - - # Append ZoneId, latitude,longitude to the Input Stream Data - for datarec in data_with_zone: - for datarec1 in data1: - if (datarec['deviceId'] == datarec1['deviceId']): - datarec.update(datarec1) - ## Print the Appended Input STream Data with the ZoneId, latitude, longitude information. - print("table_data_with_zone->", data_with_zone) + payload = base64.b64decode(record['kinesis']['data']).decode('utf-8') + data_entry = json.loads(payload) + + deviceType = data_entry['deviceType'] + deviceId = data_entry['deviceId'] + sensor_metadata = device_metadata_map[deviceType] + + data_entry['zoneId'] = sensor_metadata[deviceId]['zoneId'] + data_entry['lattitude'] = sensor_metadata[deviceId]['lattitude'] + data_entry['longitude'] = sensor_metadata[deviceId]['longitude'] + data_from_event.append(json.loads(payload)) + # Aggregate the Data based on ZoneId, DeviceType, Datatype and Timestamp - aggr_data = get_aggeregate_data(data_with_zone) - print("Output Data is ->", aggr_data) + aggregated_data = get_aggeregate_data(data_from_event) - THRESHOLD_HUMIDITY = 90.0 THRESHOLD_TEMP = 15.0 - for record in aggr_data: + for record in aggregated_data: newRecord = { "zoneId": {"S": str(record['zoneId'])}, "deviceType": {"S": record['deviceType']}, "dataType": {"S": record['dataType']}, - "timestamp": {"S": timestamp}, + "timestamp": {"S": record['timestamp']}, "avgValue": {"S": str(record['average'])}, } - save_stat = insertIntoAggrDynamoTable(db_name, newRecord) - db_name1 = "sprinkler_status_on_events_table" + insertIntoAggrDynamoTable(DYNAMODB_AGGREGATE_DATA_TABLE, newRecord) + currdateandtime = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") - if (float(record['average']) < THRESHOLD_HUMIDITY): + if (float(record['average']) < THRESHOLD_LOW_HUMIDITY): print("Soil Humidity is very Lower - ", record['average'], "% than the Threshold - ", - str(THRESHOLD_HUMIDITY), "%. Switch ON the Sprinkler in Zone - ", record['zoneId']) - info_wthr = get_weather_info(lat, lon) - if (float(info_wthr['celsius']) > THRESHOLD_TEMP): + str(THRESHOLD_LOW_HUMIDITY), "%. Switch ON the Sprinkler in Zone - ", record['zoneId']) + weather_data = get_weather_info(lat, lon) + if (float(weather_data['celsius']) > THRESHOLD_TEMP): newRecord = { "zoneId": {"S": str(record['zoneId'])}, "currdateandtime": {"S": currdateandtime}, "deviceType": {"S": record['deviceType']}, "dataType": {"S": record['dataType']}, - "timestamp": {"S": timestamp}, + "timestamp": {"S": record['timestamp']}, "avgMoistureValue": {"S": str(record['average'])}, - "avgTempValue": {"S": str(info_wthr['celsius'])}, - "thresholdHumidity": {"S": str(THRESHOLD_HUMIDITY)}, + "avgTempValue": {"S": str(weather_data['celsius'])}, + "thresholdHumidity": {"S": str(THRESHOLD_LOW_HUMIDITY)}, "thresholdTemp": {"S": str(THRESHOLD_TEMP)}, "sprinklerStatus": {"S": "ON"} } - save_stat = insertIntoSprinklerDynamoTable(db_name1, newRecord) + insertIntoSprinklerDynamoTable(DYNAMODB_SPRINKLER_ALERTS_TABLE, newRecord) # Update the STate of the Sprinkler by maintaining against the Zone. 1 Row per Zone - return_stat = insertOrUpdateIntoSprinklerStateDynamoTable(db_name2, str(record['zoneId']), "ON") - return_item = get_data_with_key(db_name2, str(record['zoneId'])) + insertOrUpdateIntoSprinklerStateDynamoTable(DYNAMODB_SPRINKLER_STATUS_TABLE, str(record['zoneId']), "ON") + return_item = get_data_with_key(DYNAMODB_SPRINKLER_STATUS_TABLE, str(record['zoneId'])) if (return_item['State'] != "ON"): mesgTopic = "actuator/command/set-state/" + str(record['zoneId']) mesgPayload = json.dumps({"State": "ON"}) publishMQTTMessage(mesgTopic, mesgPayload) - if (float(record['average']) > THRESHOLD_HUMIDITY): + if (float(record['average']) > THRESHOLD_HIGH_HUMIDITY): print("Soil Humidity is Higher - ", record['average'], "% than the Threshold - ", - str(THRESHOLD_HUMIDITY), "%. Switch OFF the Sprinkler in Zone - ", record['zoneId']) - info_wthr = get_weather_info(lat, lon) - if (float(info_wthr['celsius']) < THRESHOLD_TEMP): + str(THRESHOLD_HIGH_HUMIDITY), "%. Switch OFF the Sprinkler in Zone - ", record['zoneId']) + weather_data = get_weather_info(lat, lon) + if (float(weather_data['celsius']) < THRESHOLD_TEMP): newRecord = { "zoneId": {"S": str(record['zoneId'])}, "currdateandtime": {"S": currdateandtime}, "deviceType": {"S": record['deviceType']}, "dataType": {"S": record['dataType']}, - "timestamp": {"S": timestamp}, + "timestamp": {"S": record['timestamp']}, "avgMoistureValue": {"S": str(record['average'])}, - "avgTempValue": {"S": str(info_wthr['celsius'])}, - "thresholdHumidity": {"S": str(THRESHOLD_HUMIDITY)}, + "avgTempValue": {"S": str(weather_data['celsius'])}, + "thresholdHumidity": {"S": str(THRESHOLD_HIGH_HUMIDITY)}, "thresholdTemp": {"S": str(THRESHOLD_TEMP)}, "sprinklerStatus": {"S": "OFF"} } - save_stat = insertIntoSprinklerDynamoTable(db_name1, newRecord) - # Update the STate of the Sprinkler by maintaining against the Zone. 1 Row per Zone - return_stat = insertOrUpdateIntoSprinklerStateDynamoTable(db_name2, str(record['zoneId']), "ON") - return_item = get_data_with_key(db_name2, str(record['zoneId'])) + insertIntoSprinklerDynamoTable(DYNAMODB_SPRINKLER_ALERTS_TABLE, newRecord) + # Update the State of the Sprinkler by maintaining against the Zone. 1 Row per Zone + insertOrUpdateIntoSprinklerStateDynamoTable(DYNAMODB_SPRINKLER_STATUS_TABLE, str(record['zoneId']), "ON") + return_item = get_data_with_key(DYNAMODB_SPRINKLER_STATUS_TABLE, str(record['zoneId'])) if (return_item['State'] != "OFF"): mesgTopic = "actuator/command/set-state/" + str(record['zoneId']) mesgPayload = json.dumps({"State": "OFF"})