-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
executable file
·127 lines (97 loc) · 3.38 KB
/
worker.py
File metadata and controls
executable file
·127 lines (97 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/bin/python
import os
import sys
import argparse
import asyncio
import yaml
import importlib
from pathlib import Path
from dataclasses import dataclass
from temporalio.client import Client
from temporalio.worker import Worker
from foundry.common import logger
@dataclass
class TemporalConfig:
server_url: str = os.environ["TEMPORAL_SERVER_URL"]
api_key = os.environ["TEMPORAL_API_KEY"]
namespace = os.environ["TEMPORAL_NAMESPACE"]
path: Path = Path("/temporal")
def resolve_config_path(self, filename:str) -> Path:
return self.path / f"configs/{filename}"
async def get_client(temporal_config:TemporalConfig):
return await Client.connect(
target_host=temporal_config.server_url,
api_key=temporal_config.api_key,
namespace=temporal_config.namespace,
tls=True,
)
async def run_worker(config_filename: str = "default.yml"):
temporal_config = TemporalConfig()
logger.info(f"connecting to {temporal_config.server_url}")
client = await get_client(temporal_config)
# Load workflow and activities configuration
with temporal_config.resolve_config_path(config_filename).open() as f:
config = yaml.safe_load(f)
# Workflows and activities are in a nested path
if str(temporal_config.path) not in sys.path:
sys.path.insert(0, str(temporal_config.path))
# Load workflows and activities from config
workflows = []
activities = []
for workflow_config in config.get("workflows", []):
module = importlib.import_module(workflow_config["module"])
workflow_class = getattr(module, workflow_config["class"])
workflows.append(workflow_class)
for activity_config in config.get("activities", []):
module = importlib.import_module(activity_config["module"])
activity_func = getattr(module, activity_config["function"])
activities.append(activity_func)
# Start worker
worker = Worker(
client,
task_queue="example-task-queue",
workflows=workflows,
activities=activities,
)
logger.info("starting worker")
await worker.run()
async def start_workflow(workflow:str):
temporal_config = TemporalConfig()
client = await get_client(temporal_config)
# Start your workflow
result = await client.execute_workflow(
workflow,
id="test-workflow-1",
task_queue="example-task-queue"
)
print(f"Workflow result: {result}")
def main():
parser = argparse.ArgumentParser(description="Run configurable Temporal worker or start workflow")
parser.add_argument(
"--mode",
"-m",
type=str,
choices=["worker", "workflow"],
default="worker",
help="Mode to run: 'worker' to start worker, 'workflow' to execute test workflow (default: worker)"
)
parser.add_argument(
"--workflow",
"-w",
type=str,
help="Name of workflow to execute"
)
parser.add_argument(
"--config",
"-c",
type=str,
default="default.yml",
help="Path to the worker configuration YAML file (default: default.yml)"
)
args = parser.parse_args()
if args.mode == "worker":
asyncio.run(run_worker(args.config))
elif args.mode == "workflow":
asyncio.run(start_workflow(args.workflow))
if __name__ == "__main__":
main()