-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtargon_utils.py
More file actions
159 lines (137 loc) · 5.04 KB
/
targon_utils.py
File metadata and controls
159 lines (137 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import asyncio
from typing import Callable
import httpx
from loguru import logger
from targon.client.serverless import ServerlessResourceListItem
from targon_client import ContainerDeployConfig, TargonClient, TargonClientError
def _log(msg: str, echo: Callable[[str], None] | None, level: str = "info") -> None:
"""Log message using echo if provided, otherwise logger."""
if echo:
echo(msg)
else:
getattr(logger, level)(msg)
async def wait_for_visible(
client: TargonClient,
name: str,
*,
timeout: float, # noqa: ASYNC109
check_interval: float = 5.0,
echo: Callable[[str], None] | None = None,
) -> ServerlessResourceListItem | None:
"""
Wait for the container to become visible (stage 1).
Targon containers may not appear immediately after deployment.
"""
deadline = asyncio.get_running_loop().time() + timeout
start = asyncio.get_running_loop().time()
time_elapsed = asyncio.get_running_loop().time() - start
while time_elapsed < timeout:
container = await client.get_container(name)
if container and container.url:
return container
if asyncio.get_running_loop().time() >= deadline:
_log(f"Container {name} not visible within {timeout}s", echo, "warning")
return None
await asyncio.sleep(check_interval)
time_elapsed = asyncio.get_running_loop().time() - start
_log(
f"Container {name} not visible within {timeout}s. Timeout reached.",
echo,
"warning",
)
return None
async def wait_for_healthy(
url: str,
*,
timeout: float, # noqa: ASYNC109
check_interval: float = 5.0,
health_check_path: str = "/health",
echo: Callable[[str], None] | None = None,
) -> bool:
"""Wait for the container health endpoint to return 200 (stage 2)."""
# If health_check_path is a full URL, use it directly; otherwise append to container URL
if health_check_path.startswith("http://") or health_check_path.startswith(
"https://"
):
health_url = health_check_path
else:
health_url = f"{url}{health_check_path}"
async with httpx.AsyncClient(timeout=30.0) as http:
start = asyncio.get_running_loop().time()
time_elapsed = asyncio.get_running_loop().time() - start
while time_elapsed < timeout:
try:
response = await http.get(health_url)
response.raise_for_status()
if response.status_code == 200:
_log(f"Container at {url} healthy", echo, "info")
return True
except Exception as e:
_log(
f"Container not ready yet: {time_elapsed:.1f}/{timeout:.1f}s",
echo,
"info",
)
await asyncio.sleep(check_interval)
time_elapsed = asyncio.get_running_loop().time() - start
_log(
f"Container at {url} not healthy within {timeout}s. Timeout reached.",
echo,
"error",
)
return False
async def ensure_running_container(
client: TargonClient,
name: str,
config: ContainerDeployConfig,
*,
deploy_timeout: float = 600.0,
warmup_timeout: float = 3600.0,
check_interval: float = 10.0,
health_check_path: str = "/health",
echo: Callable[[str], None] | None = None,
) -> ServerlessResourceListItem | None:
"""
Ensure a healthy container is running.
Stages:
1. Wait for the container to become visible (Targon-specific delay)
2. Wait for a health check to pass
Returns container if successful, None if failed.
"""
# Deploy
deploy_start = asyncio.get_running_loop().time()
try:
_log(f"Deploying container with config.", echo)
await client.deploy_container(name, config)
except TargonClientError:
return None
_log(f"Waiting for container to become visible.", echo)
container = await wait_for_visible(
client,
name,
timeout=deploy_timeout,
check_interval=check_interval,
echo=echo,
)
if not container:
_log(f"Container failed to become visible", echo, "error")
return None
deploy_time = asyncio.get_running_loop().time() - deploy_start
_log(f"Container ({container.uid}) visible in {deploy_time:.1f}s", echo)
_log(f"Container URL: {container.url}", echo)
_log(f"Health check path: {health_check_path}", echo)
warmup_start = asyncio.get_running_loop().time()
_log(f"Waiting {warmup_timeout}s for container to become healthy.", echo)
if not await wait_for_healthy(
container.url,
timeout=warmup_timeout,
check_interval=check_interval,
health_check_path=health_check_path,
echo=echo,
):
_log(f"Container failed health check, deleting", echo, "error")
await client.delete_container(container.uid)
return None
warmup_time = asyncio.get_running_loop().time() - warmup_start
_log(f"Container healthy in {warmup_time:.1f}s", echo)
return container