-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
41 lines (37 loc) · 1.62 KB
/
lambda_function.py
File metadata and controls
41 lines (37 loc) · 1.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
import os
from slack_sdk import WebClient
from client.saxo_auth_client import SaxoAuthClient
from saxo_order.commands.alerting import run_alerting
from saxo_order.commands.snapshot import execute_snapshot
from saxo_order.commands.workflow import execute_workflow
from utils.configuration import Configuration
from utils.exception import SaxoException
def handler(event, _):
try:
match event.get("command"):
case "refresh_token":
configuration = Configuration(os.getenv("SAXO_CONFIG"))
auth_client = SaxoAuthClient(configuration)
access_token, refresh_token = auth_client.refresh_token()
configuration.save_tokens(access_token, refresh_token)
return {"result": "ok", "message": "token has been refreshed"}
case "alerting":
asyncio.run(run_alerting(os.getenv("SAXO_CONFIG")))
case "workflows":
asyncio.run(execute_workflow(os.getenv("SAXO_CONFIG")))
case "snapshot":
execute_snapshot(os.getenv("SAXO_CONFIG"))
case _:
raise SaxoException(
f"Command {event.get('command')} not found"
)
except Exception as e:
configuration = Configuration(os.getenv("SAXO_CONFIG"))
slack_client = WebClient(token=configuration.slack_token)
slack_client.chat_postMessage(
channel="#errors",
text=f"Command {event.get('command')} fails:\n```{e}```",
)
return {"result": "ko", "message": "can't refresh the token"}
return {"result": "ok"}