Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 128 additions & 98 deletions lambda/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@
import time
from botocore.exceptions import ClientError

from test_sprinkler import DYNAMODB_SPRINKLER_STATE_TABLE


# Meta data that device sends during registration
DYNAMODB_DEVICE_METADATA_TABLE = 'device_metadata_table'
# Computed zone-wise aggregated data
DYNAMODB_AGGREGATE_DATA_TABLE = 'aggregate_data_table'
# Alert situation data sent to sprinklers
DYNAMODB_SPRINKLER_ALERTS_TABLE = 'sprinkler_alerts_table'
# Sprinklers state management data
DYNAMODB_SPRINKLER_STATUS_TABLE = 'sprinkler_state_table'

THRESHOLD_LOW_HUMIDITY = 40.0
THRESHOLD_HIGH_HUMIDITY = 80.0
THRESHOLD_TEMP = 15.0

DBClient = boto3.client("dynamodb", region_name="us-east-1")
DBResource = boto3.resource("dynamodb", region_name="us-east-1")

Expand Down Expand Up @@ -221,9 +237,33 @@ def get_data(table_name):
return data

# Arranges data into sorted order for aggregate data generation
def get_aggeregate_data(aggregate_data):
print("Arranging data for aggregation .....")
values = {}
def get_aggeregate_data(telemetry_data):
structured_data = create_structured_telemetry_data(telemetry_data)
print("Aggregating data..")
aggregate_data = []
# Calculate aggregate data
# For each Zone Id Aggregate the data per DeviceType, DataType and TimeStamp
for device in structured_data.keys():
for devType in structured_data[device].keys():
# For each minute
# For each datatype
for datatype in structured_data[device][devType].keys():
# For each minute
for timestamp in structured_data[device][devType][datatype].keys():
values_list = structured_data[device][devType][datatype][timestamp]["values"]
aggregate_data_entry = {}

aggregate_data_entry['zoneId'] = device
aggregate_data_entry['deviceType'] = devType
aggregate_data_entry['dataType'] = datatype
aggregate_data_entry['timestamp'] = timestamp
aggregate_data_entry['average'] = Decimal(str(round(sum(values_list) / len(values_list), 2)))
aggregate_data.append(aggregate_data_entry)

return aggregate_data

def create_structured_telemetry_data(telemetry_data):
structured_data = {}
# Sample of aggregated values
# ===============================
# {
Expand All @@ -241,55 +281,31 @@ def get_aggeregate_data(aggregate_data):
# },
# }

for entry in aggregate_data:
for entry in telemetry_data:
time_truncated_to_minutes = datetime.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ').strftime(
'%Y-%m-%dT%H:%M:00Z')
# if the value list does not exists for zone id create empty list
if entry['zoneId'] not in values.keys():
values[entry["zoneId"]] = {}
if entry['zoneId'] not in structured_data.keys():
structured_data[entry["zoneId"]] = {}

# if dictionary for the devicetype does not exists then create an empty dictionary
if entry['deviceType'] not in values[entry["zoneId"]].keys():
values[entry["zoneId"]][entry['deviceType']] = {}
if entry['deviceType'] not in structured_data[entry["zoneId"]].keys():
structured_data[entry["zoneId"]][entry['deviceType']] = {}

# if dictionary for the datatype does not exists then create an empty dictionary
if entry['datatype'] not in values[entry["zoneId"]][entry["deviceType"]].keys():
values[entry["zoneId"]][entry['deviceType']][entry["datatype"]] = {}
if entry['datatype'] not in structured_data[entry["zoneId"]][entry["deviceType"]].keys():
structured_data[entry["zoneId"]][entry['deviceType']][entry["datatype"]] = {}

# if dictionary for the minute does not exists then create an empty dictionary and empty value list for the timestamp
if time_truncated_to_minutes not in values[entry["zoneId"]][entry['deviceType']][entry['datatype']].keys():
values[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes] = {}
values[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes]["values"] = []
if time_truncated_to_minutes not in structured_data[entry["zoneId"]][entry['deviceType']][entry['datatype']].keys():
structured_data[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes] = {}
structured_data[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes]["values"] = []

values[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes]["values"].append(
structured_data[entry["zoneId"]][entry['deviceType']][entry['datatype']][time_truncated_to_minutes]["values"].append(
float(entry["value"]))

print("values->", values)

print("Aggregating data..")
# print("input values->", values)
aggregate_data = []
# Calculate aggregate data
# For each Zone Id Aggregate the data per DeviceType, DataType and TimeStamp
# print("values.keys()->", values.keys())
for device in values.keys():
for devType in values[device].keys():
# For each minute
# For each datatype
for datatype in values[device][devType].keys():
# For each minute
for timestamp in values[device][devType][datatype].keys():
values_list = values[device][devType][datatype][timestamp]["values"]
aggregate_data_entry = {}

aggregate_data_entry['zoneId'] = device
aggregate_data_entry['deviceType'] = devType
aggregate_data_entry['dataType'] = datatype
aggregate_data_entry['timestamp'] = timestamp
aggregate_data_entry['average'] = Decimal(str(round(sum(values_list) / len(values_list), 2)))
aggregate_data.append(aggregate_data_entry)
print("aggregate_data->", aggregate_data)
return aggregate_data
print("values->", structured_data)
return structured_data

def publishMQTTMessage(msgTopic, msgPayload):
# a3pquorj9tve43-ats.iot.us-east-1.amazonaws.com
Expand All @@ -301,105 +317,119 @@ def publishMQTTMessage(msgTopic, msgPayload):
time.sleep(10)
print("After Publishing")


def create_device_metadata_map(raw_meta_data):
# Format
# {
# "SoilSensor" :{
# "SMS-001" : {"deviceId":"SMS-001", "deviceType":"<>"...},
# "SMS-002" : {"deviceId":"SMS-002", "deviceType":"<>"...}
# {
# },
# "Sprinkler": {
# }
# }
device_meta_data = {}
for m_data in raw_meta_data:
deviceType = m_data['deviceType']
deviceId = m_data['deviceId']
# For every new devie type add empty dictionary
if not deviceType in device_meta_data.keys():
device_meta_data[deviceType] = {}

if not deviceId in device_meta_data[deviceType].keys():
device_meta_data[deviceType][deviceId] = m_data

return device_meta_data

def lambda_handler(event, context):
print("Start Lambda")
# sns_client = boto3.client('sns')
db_name = "agro_agg_data_table"
DBName = checkAndCreateAggrTable(db_name)

db_name1 = "sprinkler_status_on_events_table"
DBName = checkAndCreateSprinklerTable(db_name1)
checkAndCreateAggrTable(DYNAMODB_AGGREGATE_DATA_TABLE)
checkAndCreateSprinklerTable(DYNAMODB_SPRINKLER_ALERTS_TABLE)
checkAndCreateSprinklerStateTable(DYNAMODB_SPRINKLER_STATUS_TABLE)

db_name2 = "sprinkler_zone_state_table"
DBName = checkAndCreateSprinklerStateTable(db_name2)
#Extract device metadata and create a dictinary for easy lookup
device_metadata = get_data(DYNAMODB_DEVICE_METADATA_TABLE)
device_metadata_map = create_device_metadata_map(device_metadata)

table_name = "device_meta_data_table"
data1 = get_data(table_name)
data = []

# Get dta from event and append zone info from metadata table
data_from_event = []
for record in event['Records']:
payload = base64.b64decode(record["kinesis"]["data"])
payload_str1 = str(payload)
payload_str = json.loads(payload)
print("Decoded Payload_str->", payload_str)
deviceId = payload_str["deviceId"]
deviceType = payload_str["deviceType"]
datatype = payload_str["datatype"]
value = payload_str["value"]
timestamp = payload_str["timestamp"]
data.append(json.loads(payload))
data_with_zone = data

# Append ZoneId, latitude,longitude to the Input Stream Data
for datarec in data_with_zone:
for datarec1 in data1:
if (datarec['deviceId'] == datarec1['deviceId']):
datarec.update(datarec1)
## Print the Appended Input STream Data with the ZoneId, latitude, longitude information.
print("table_data_with_zone->", data_with_zone)
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
data_entry = json.loads(payload)

deviceType = data_entry['deviceType']
deviceId = data_entry['deviceId']
sensor_metadata = device_metadata_map[deviceType]

data_entry['zoneId'] = sensor_metadata[deviceId]['zoneId']
data_entry['lattitude'] = sensor_metadata[deviceId]['lattitude']
data_entry['longitude'] = sensor_metadata[deviceId]['longitude']
data_from_event.append(json.loads(payload))

# Aggregate the Data based on ZoneId, DeviceType, Datatype and Timestamp
aggr_data = get_aggeregate_data(data_with_zone)
print("Output Data is ->", aggr_data)
aggregated_data = get_aggeregate_data(data_from_event)

THRESHOLD_HUMIDITY = 90.0
THRESHOLD_TEMP = 15.0
for record in aggr_data:
for record in aggregated_data:
newRecord = {
"zoneId": {"S": str(record['zoneId'])},
"deviceType": {"S": record['deviceType']},
"dataType": {"S": record['dataType']},
"timestamp": {"S": timestamp},
"timestamp": {"S": record['timestamp']},
"avgValue": {"S": str(record['average'])},
}
save_stat = insertIntoAggrDynamoTable(db_name, newRecord)
db_name1 = "sprinkler_status_on_events_table"
insertIntoAggrDynamoTable(DYNAMODB_AGGREGATE_DATA_TABLE, newRecord)

currdateandtime = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
if (float(record['average']) < THRESHOLD_HUMIDITY):
if (float(record['average']) < THRESHOLD_LOW_HUMIDITY):
print("Soil Humidity is very Lower - ", record['average'], "% than the Threshold - ",
str(THRESHOLD_HUMIDITY), "%. Switch ON the Sprinkler in Zone - ", record['zoneId'])
info_wthr = get_weather_info(lat, lon)
if (float(info_wthr['celsius']) > THRESHOLD_TEMP):
str(THRESHOLD_LOW_HUMIDITY), "%. Switch ON the Sprinkler in Zone - ", record['zoneId'])
weather_data = get_weather_info(lat, lon)
if (float(weather_data['celsius']) > THRESHOLD_TEMP):
newRecord = {
"zoneId": {"S": str(record['zoneId'])},
"currdateandtime": {"S": currdateandtime},
"deviceType": {"S": record['deviceType']},
"dataType": {"S": record['dataType']},
"timestamp": {"S": timestamp},
"timestamp": {"S": record['timestamp']},
"avgMoistureValue": {"S": str(record['average'])},
"avgTempValue": {"S": str(info_wthr['celsius'])},
"thresholdHumidity": {"S": str(THRESHOLD_HUMIDITY)},
"avgTempValue": {"S": str(weather_data['celsius'])},
"thresholdHumidity": {"S": str(THRESHOLD_LOW_HUMIDITY)},
"thresholdTemp": {"S": str(THRESHOLD_TEMP)},
"sprinklerStatus": {"S": "ON"}
}
save_stat = insertIntoSprinklerDynamoTable(db_name1, newRecord)
insertIntoSprinklerDynamoTable(DYNAMODB_SPRINKLER_ALERTS_TABLE, newRecord)
# Update the STate of the Sprinkler by maintaining against the Zone. 1 Row per Zone
return_stat = insertOrUpdateIntoSprinklerStateDynamoTable(db_name2, str(record['zoneId']), "ON")
return_item = get_data_with_key(db_name2, str(record['zoneId']))
insertOrUpdateIntoSprinklerStateDynamoTable(DYNAMODB_SPRINKLER_STATUS_TABLE, str(record['zoneId']), "ON")
return_item = get_data_with_key(DYNAMODB_SPRINKLER_STATUS_TABLE, str(record['zoneId']))
if (return_item['State'] != "ON"):
mesgTopic = "actuator/command/set-state/" + str(record['zoneId'])
mesgPayload = json.dumps({"State": "ON"})
publishMQTTMessage(mesgTopic, mesgPayload)

if (float(record['average']) > THRESHOLD_HUMIDITY):
if (float(record['average']) > THRESHOLD_HIGH_HUMIDITY):
print("Soil Humidity is Higher - ", record['average'], "% than the Threshold - ",
str(THRESHOLD_HUMIDITY), "%. Switch OFF the Sprinkler in Zone - ", record['zoneId'])
info_wthr = get_weather_info(lat, lon)
if (float(info_wthr['celsius']) < THRESHOLD_TEMP):
str(THRESHOLD_HIGH_HUMIDITY), "%. Switch OFF the Sprinkler in Zone - ", record['zoneId'])
weather_data = get_weather_info(lat, lon)
if (float(weather_data['celsius']) < THRESHOLD_TEMP):
newRecord = {
"zoneId": {"S": str(record['zoneId'])},
"currdateandtime": {"S": currdateandtime},
"deviceType": {"S": record['deviceType']},
"dataType": {"S": record['dataType']},
"timestamp": {"S": timestamp},
"timestamp": {"S": record['timestamp']},
"avgMoistureValue": {"S": str(record['average'])},
"avgTempValue": {"S": str(info_wthr['celsius'])},
"thresholdHumidity": {"S": str(THRESHOLD_HUMIDITY)},
"avgTempValue": {"S": str(weather_data['celsius'])},
"thresholdHumidity": {"S": str(THRESHOLD_HIGH_HUMIDITY)},
"thresholdTemp": {"S": str(THRESHOLD_TEMP)},
"sprinklerStatus": {"S": "OFF"}
}
save_stat = insertIntoSprinklerDynamoTable(db_name1, newRecord)
# Update the STate of the Sprinkler by maintaining against the Zone. 1 Row per Zone
return_stat = insertOrUpdateIntoSprinklerStateDynamoTable(db_name2, str(record['zoneId']), "ON")
return_item = get_data_with_key(db_name2, str(record['zoneId']))
insertIntoSprinklerDynamoTable(DYNAMODB_SPRINKLER_ALERTS_TABLE, newRecord)
# Update the State of the Sprinkler by maintaining against the Zone. 1 Row per Zone
insertOrUpdateIntoSprinklerStateDynamoTable(DYNAMODB_SPRINKLER_STATUS_TABLE, str(record['zoneId']), "ON")
return_item = get_data_with_key(DYNAMODB_SPRINKLER_STATUS_TABLE, str(record['zoneId']))
if (return_item['State'] != "OFF"):
mesgTopic = "actuator/command/set-state/" + str(record['zoneId'])
mesgPayload = json.dumps({"State": "OFF"})
Expand Down