forked from bitcart/bitcart
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
151 lines (128 loc) · 5.67 KB
/
worker.py
File metadata and controls
151 lines (128 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import asyncio
import contextlib
import functools
import platform
import signal
import sys
from datetime import timedelta
from multiprocessing import Process
import sqlalchemy
from alembic import config, script
from alembic.runtime import migration
from dishka import AsyncContainer
from dishka.integrations.taskiq import setup_dishka
from taskiq import TaskiqEvents, TaskiqState
from taskiq.api import run_receiver_task, run_scheduler_task
from api import constants
from api.db import create_async_engine
from api.ioc import build_container
from api.ioc.worker import WorkerProvider
from api.logfire import configure_logfire
from api.logging import configure as configure_logging
from api.logging import get_logger
from api.logserver import main as start_logserver
from api.logserver import wait_for_port
from api.sentry import configure_sentry
from api.services.coins import CoinService
from api.services.notification_manager import NotificationManager
from api.services.plugin_registry import PluginRegistry
from api.settings import Settings
from api.tasks import broker, scheduler
from api.types import TasksBroker
from api.utils.common import excepthook_handler, handle_event_loop_exception
logger = get_logger("worker")
def check_revision(conn: sqlalchemy.engine.Connection, script_dir: script.ScriptDirectory) -> bool:
context = migration.MigrationContext.configure(conn)
return context.get_current_revision() == script_dir.get_current_head()
async def check_db() -> bool:
try:
settings = Settings(IS_WORKER=True)
if settings.is_testing():
return True
engine = create_async_engine(settings, "migrations")
alembic_cfg = config.Config("alembic.ini")
script_dir = script.ScriptDirectory.from_config(alembic_cfg)
async with engine.begin() as conn:
return await conn.run_sync(functools.partial(check_revision, script_dir=script_dir))
await engine.dispose()
return True
except Exception:
return False
async def lifespan_start(container: AsyncContainer, state: TaskiqState) -> None:
sys.excepthook = excepthook_handler(logger, sys.excepthook)
asyncio.get_running_loop().set_exception_handler(
lambda *args, **kwargs: handle_event_loop_exception(logger, *args, **kwargs)
)
plugin_registry = await container.get(PluginRegistry)
await plugin_registry.startup()
await plugin_registry.worker_setup()
for service in WorkerProvider.TO_PRELOAD:
await container.get(service)
settings = await container.get(Settings)
coin_service = await container.get(CoinService)
notification_manager = await container.get(NotificationManager)
logger.info(f"Bitcart version: {constants.VERSION} - {constants.WEBSITE} - {constants.GIT_REPO_URL}")
logger.info(f"Python version: {sys.version}. On platform: {platform.platform()}")
logger.info(
f"BITCART_CRYPTOS={','.join(list(settings.ENABLED_CRYPTOS))}; IN_DOCKER={settings.DOCKER_ENV}; "
f"LOG_FILE={settings.LOG_FILE_NAME}"
)
logger.info(f"Successfully loaded {len(coin_service.cryptos)} cryptos")
logger.info(f"{len(notification_manager.notifiers)} notification providers available")
state.scheduler_task = asyncio.create_task(run_scheduler_task(scheduler, interval=timedelta(seconds=1)))
async def lifespan_stop(container: AsyncContainer, state: TaskiqState) -> None:
state.scheduler_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await state.scheduler_task
plugin_registry = await container.get(PluginRegistry)
await plugin_registry.shutdown()
await container.close()
process.terminate()
def get_app(process: Process) -> TasksBroker:
settings = Settings(IS_WORKER=True)
configure_sentry(settings)
configure_logfire(settings, "worker")
configure_logging(settings=settings, logfire=True)
container = build_container(settings, extra_providers=(WorkerProvider(),))
broker.add_event_handler(TaskiqEvents.WORKER_STARTUP, functools.partial(lifespan_start, container))
broker.add_event_handler(TaskiqEvents.WORKER_SHUTDOWN, functools.partial(lifespan_stop, container))
setup_dishka(container, broker)
return broker
async def _run_broker_core(broker: TasksBroker, stop_event: asyncio.Event | None) -> None:
broker.is_worker_process = True
await broker.startup()
receiver_task = asyncio.create_task(run_receiver_task(broker))
try:
if stop_event is None:
with contextlib.suppress(KeyboardInterrupt, asyncio.CancelledError):
await receiver_task
else:
stopper_task = asyncio.create_task(stop_event.wait())
await asyncio.wait({receiver_task, stopper_task}, return_when=asyncio.FIRST_COMPLETED)
finally:
receiver_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await receiver_task
await broker.shutdown()
async def start_broker_basic(broker: TasksBroker) -> None:
await _run_broker_core(broker, stop_event=None)
async def start_broker(broker: TasksBroker) -> None:
stop_event = asyncio.Event()
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, lambda *_: stop_event.set())
await _run_broker_core(broker, stop_event=stop_event)
async def wait_loop() -> None:
while True:
if await check_db():
break
print("Database not available/not migrated, waiting...") # noqa: T201
await asyncio.sleep(1)
# TODO: use CLI from taskiq
if __name__ == "__main__":
process = Process(target=start_logserver)
process.daemon = True
process.start()
wait_for_port()
asyncio.run(wait_loop())
broker = get_app(process)
asyncio.run(start_broker(broker))