-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
65 lines (53 loc) · 1.92 KB
/
lambda_function.py
File metadata and controls
65 lines (53 loc) · 1.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import datetime
import json
import boto3
import time
import csv
STREAM_NAME = "mp11v2_ds"
def lambda_handler(event, context):
# Initialize S3 client
s3 = boto3.client("s3")
# Set bucket name and file key
bucket_name = "mp11v2-flink-s3"
key = "AMDprices2021-2022.csv"
# TODO: Get CSV file from S3
response = s3.get_object(Bucket=bucket_name, Key=key)
lines = response["Body"].read().decode("utf-8").split("\n")
# TODO: Split data by lines and extract column names from first row
csv_reader = csv.reader(lines)
column_names = next(csv_reader)
# TODO: Iterate over rows and generate data for Kinesis stream
kinesis_client = boto3.client("kinesis")
for line in csv_reader:
data_dict = dict(zip(column_names, line))
if not data_dict:
return {"statusCode": 200, "body": json.dumps("No data to process")}
generate(STREAM_NAME, kinesis_client, data_dict)
# Return response
return {"statusCode": 200, "body": json.dumps("Hello from Lambda!")}
def get_data(data_dict):
# Generate data dictionary for Kinesis stream
return {
"date": data_dict["Date"],
"ticker": "AMD",
"open_price": data_dict["Open"],
"high": data_dict["High"],
"low": data_dict["Low"],
"close_price": data_dict["Close"],
"adjclose": data_dict["Adj Close"],
"volume": data_dict["Volume"],
"event_time": datetime.datetime.now().isoformat(),
}
def generate(stream_name, kinesis_client, data_dict):
# Get data dictionary and print it
data = get_data(data_dict)
print(data)
# TODO: Put record to Kinesis stream
response = kinesis_client.put_record(
StreamName=stream_name, Data=json.dumps(data), PartitionKey=data["ticker"]
)
# Test with
# {
# "bucketName": "mp11v2-flink-s3",
# "fileName": "AMDprices2021-2022.csv"
# }