-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·876 lines (725 loc) · 34.4 KB
/
main.py
File metadata and controls
executable file
·876 lines (725 loc) · 34.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
#!/usr/bin/env python3
# Skip early FFI verification to avoid tokio runtime hang during Python import lock
# Binary self-verification still runs when get_license_status() is called
import os
os.environ.setdefault("CIRIS_SKIP_EARLY_VERIFY", "1")
# Must run before any print() / logger output that might hit the console,
# otherwise Windows cp1252 stdio crashes on the status glyphs used at startup.
from ciris_engine.logic.utils import win_console as _win_console # noqa: E402
_win_console.setup()
# Load environment variables from .env if present
# Load from all standard config paths in priority order
try:
from pathlib import Path
from dotenv import load_dotenv
# Priority order: ./ciris/.env, ./env (highest), ~/ciris/.env, /etc/ciris/.env (lowest)
# Note: ~/.ciris/ is for keys/secrets only, NOT config!
config_paths = [
Path.cwd() / "ciris" / ".env",
Path.cwd() / ".env",
Path.home() / "ciris" / ".env",
Path("/etc/ciris/.env"),
]
for config_path in config_paths:
if config_path.exists():
load_dotenv(config_path, override=False) # Don't override already-set vars
except ImportError:
pass # dotenv is optional; skip if not installed
# =============================================================================
# CRITICAL: Set CIRIS_HOME environment variable BEFORE any ciris_engine imports
# This must happen early because CIRISVerify (verifier_singleton) requires it.
# Supports: Linux, macOS, Windows, WSL, Android, iOS, Docker/managed deployments
# =============================================================================
from ciris_engine.logic.utils.path_resolution import ensure_ciris_home_env
_ciris_home = ensure_ciris_home_env()
import asyncio
import atexit
import json
import logging
import os
import signal
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import click
from ciris_engine.logic.runtime.ciris_runtime import CIRISRuntime
from ciris_engine.logic.utils.logging_config import setup_basic_logging
from ciris_engine.logic.utils.runtime_utils import load_config
from ciris_engine.schemas.dma.results import ActionSelectionDMAResult
from ciris_engine.schemas.runtime.enums import HandlerActionType, ThoughtStatus
from ciris_engine.schemas.runtime.models import Thought
logger = logging.getLogger(__name__)
# Track if we've logged a proper shutdown reason
_shutdown_reason_logged = {"value": False}
def _atexit_handler() -> None:
"""Log when Python exits - helps diagnose unexpected shutdowns."""
if not _shutdown_reason_logged["value"]:
# Only log if we haven't already logged a shutdown reason
# This catches cases where the process exits without going through signal handlers
logger.critical(f"[ATEXIT] Python interpreter exiting (PID={os.getpid()}) - no shutdown signal logged")
logger.critical("[ATEXIT] This may indicate: SIGKILL, parent process death, or clean exit without signal")
else:
logger.info(f"[ATEXIT] Python interpreter exiting normally (PID={os.getpid()})")
atexit.register(_atexit_handler)
def setup_signal_handlers(runtime: CIRISRuntime) -> None:
"""Setup signal handlers for graceful shutdown."""
shutdown_initiated = {"value": False} # Use dict to allow modification in nested function
# Map signal numbers to names for better logging. SIGHUP/SIGQUIT are
# POSIX-only — Windows Python's signal module doesn't define them, so
# guard every non-portable signal behind hasattr.
signal_names: dict[int, str] = {
int(signal.SIGTERM): "SIGTERM",
int(signal.SIGINT): "SIGINT",
}
for name in ("SIGHUP", "SIGQUIT"):
sig = getattr(signal, name, None)
if sig is not None:
signal_names[int(sig)] = name
def signal_handler(signum: int, frame: Any) -> None:
sig_name = signal_names.get(signum, f"signal {signum}")
_shutdown_reason_logged["value"] = True # Mark that we logged a shutdown reason
if shutdown_initiated["value"]:
logger.critical(f"[SIGNAL] {sig_name} received again (PID={os.getpid()}), forcing immediate exit")
# Don't call sys.exit() in async context - just raise to let Python handle it
raise KeyboardInterrupt("Forced shutdown")
shutdown_initiated["value"] = True
logger.critical(f"[SIGNAL] {sig_name} received (PID={os.getpid()}), requesting graceful shutdown...")
try:
runtime.request_shutdown(f"Signal {sig_name}")
except Exception as e:
logger.critical(f"[SIGNAL] Error during shutdown request: {e}")
# Don't call sys.exit() in async context - raise instead
raise KeyboardInterrupt("Shutdown error") from e
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
for name in ("SIGHUP", "SIGQUIT"):
sig = getattr(signal, name, None)
if sig is not None:
signal.signal(sig, signal_handler)
def setup_global_exception_handler() -> None:
"""Setup global exception handler to catch all uncaught exceptions."""
def handle_exception(exc_type: type[BaseException], exc_value: BaseException, exc_traceback: Any) -> None:
if issubclass(exc_type, KeyboardInterrupt):
# Let KeyboardInterrupt be handled by signal handlers
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("UNCAUGHT EXCEPTION:", exc_info=(exc_type, exc_value, exc_traceback))
logger.error("This should never happen - please report this bug!")
sys.excepthook = handle_exception
def _create_thought() -> Thought:
now = datetime.now(timezone.utc).isoformat()
return Thought(
thought_id=str(uuid.uuid4()),
source_task_id=str(uuid.uuid4()),
thought_type="standard",
status=ThoughtStatus.PENDING,
created_at=now,
updated_at=now,
content="manual invocation",
context={},
)
async def _execute_handler(runtime: CIRISRuntime, handler: str, params: Optional[str]) -> None:
if not runtime.agent_processor:
raise RuntimeError("Agent processor not initialized")
handler_type = HandlerActionType[handler.upper()]
dispatcher = runtime.agent_processor.action_dispatcher
handler_instance = dispatcher.handlers.get(handler_type)
if not handler_instance:
raise ValueError(f"Handler {handler} not registered")
payload = json.loads(params) if params else {}
result = ActionSelectionDMAResult(
selected_action=handler_type,
action_parameters=payload,
rationale="manual trigger",
)
thought = _create_thought()
# Create a proper DispatchContext
from ciris_engine.schemas.runtime.contexts import DispatchContext
from ciris_engine.schemas.runtime.system_context import ChannelContext
dispatch_context = DispatchContext(
channel_context=ChannelContext(
channel_id=runtime.startup_channel_id, channel_type="CLI", created_at=datetime.now(timezone.utc)
),
author_id="system",
author_name="System",
origin_service="main",
handler_name=handler,
action_type=handler_type,
thought_id=thought.thought_id,
task_id=thought.source_task_id,
source_task_id=thought.source_task_id,
event_summary=f"Manual trigger: {handler}",
event_timestamp=datetime.now(timezone.utc).isoformat(),
)
await handler_instance.handle(result, thought, dispatch_context)
def _check_mock_llm_env() -> bool:
"""Check if mock LLM should be enabled via environment variable."""
from ciris_engine.logic.config.env_utils import get_env_var
mock_llm_env = get_env_var("CIRIS_MOCK_LLM", "")
if mock_llm_env and mock_llm_env.lower() in ("true", "1", "yes", "on"):
logger.info("CIRIS_MOCK_LLM environment variable detected, enabling mock LLM")
return True
return False
def _check_python_installation() -> None:
"""Check macOS Python installation and exit if invalid."""
from ciris_engine.logic.setup.first_run import check_macos_python
python_valid, python_message = check_macos_python()
if not python_valid:
click.echo("=" * 70, err=True)
click.echo("PYTHON INSTALLATION ISSUE", err=True)
click.echo("=" * 70, err=True)
click.echo(python_message, err=True)
click.echo("=" * 70, err=True)
sys.exit(1)
def _get_adapter_types_from_env() -> list[str]:
"""Get adapter types from environment variable."""
from ciris_engine.logic.config.env_utils import get_env_var
env_adapter = get_env_var("CIRIS_ADAPTER")
if env_adapter:
return [a.strip() for a in env_adapter.split(",")]
return ["api"]
def _show_configuration_required_message() -> None:
"""Display configuration required message for non-interactive CLI mode."""
click.echo("=" * 70, err=True)
click.echo("CONFIGURATION REQUIRED", err=True)
click.echo("=" * 70, err=True)
click.echo("CIRIS is not configured. Please set environment variables:", err=True)
click.echo("", err=True)
click.echo("Required:", err=True)
click.echo(" OPENAI_API_KEY=your_api_key", err=True)
click.echo(" CIRIS_ADAPTER=api", err=True)
click.echo("", err=True)
click.echo("For local LLM (Ollama, LM Studio, etc.):", err=True)
click.echo(" OPENAI_API_KEY=local", err=True)
click.echo(" OPENAI_API_BASE=http://localhost:11434", err=True)
click.echo(" OPENAI_MODEL=llama3", err=True)
click.echo("", err=True)
click.echo("Or mount a .env file at:", err=True)
click.echo(" ~/.ciris/.env", err=True)
click.echo(" ./.env", err=True)
click.echo("=" * 70, err=True)
sys.exit(1)
def _run_cli_setup_wizard() -> None:
"""Run the CLI setup wizard for first-time interactive setup."""
from ciris_engine.logic.setup.wizard import run_setup_wizard
try:
click.echo()
click.echo("=" * 70)
click.echo("First run detected - running CLI setup wizard...")
click.echo("=" * 70)
config_path = run_setup_wizard()
try:
from dotenv import load_dotenv
load_dotenv(config_path)
click.echo(f"Configuration loaded from: {config_path}")
except ImportError:
pass
except KeyboardInterrupt:
click.echo("\nSetup cancelled by user")
sys.exit(1)
except Exception as e:
click.echo(f"\nSetup failed: {e}", err=True)
click.echo("You can configure manually by creating a .env file", err=True)
sys.exit(1)
def _handle_first_run(first_run: bool, adapter_types_list: tuple[str, ...], is_cli_mode: bool) -> None:
"""Handle first-run setup logic."""
from ciris_engine.logic.setup.first_run import is_interactive_environment
if not first_run or adapter_types_list:
return
if is_cli_mode and not is_interactive_environment():
_show_configuration_required_message()
elif is_cli_mode and is_interactive_environment():
_run_cli_setup_wizard()
def _show_api_key_required_message() -> None:
"""Display API key required message."""
click.echo("=" * 70, err=True)
click.echo("LLM API KEY REQUIRED", err=True)
click.echo("=" * 70, err=True)
click.echo("No LLM API key found in environment.", err=True)
click.echo("", err=True)
click.echo("Supported keys:", err=True)
click.echo(" - OPENAI_API_KEY (OpenAI, OpenRouter, local LLMs)", err=True)
click.echo(" - ANTHROPIC_API_KEY (Claude)", err=True)
click.echo(" - GOOGLE_API_KEY (Gemini)", err=True)
click.echo(" - GROQ_API_KEY, TOGETHER_API_KEY", err=True)
click.echo("", err=True)
click.echo("Options:", err=True)
click.echo(" 1. Set one of the above environment variables", err=True)
click.echo(" 2. Add to .env file", err=True)
click.echo(" 3. Use --mock-llm flag for testing only", err=True)
click.echo("=" * 70, err=True)
sys.exit(1)
def _validate_discord_adapter(adapter_type: str, discord_bot_token: Optional[str]) -> bool:
"""Validate Discord adapter has required token. Returns True if valid."""
from ciris_engine.logic.config.env_utils import get_env_var
base_adapter_type, instance_id = (adapter_type.split(":", 1) + [None])[:2]
token_vars = []
if instance_id:
token_vars.extend([f"DISCORD_{instance_id.upper()}_BOT_TOKEN", f"DISCORD_BOT_TOKEN_{instance_id.upper()}"])
token_vars.append("DISCORD_BOT_TOKEN")
has_token = discord_bot_token or any(get_env_var(var) for var in token_vars)
if not has_token:
click.echo(
f"ERROR: No Discord bot token found for {adapter_type}. Discord adapter cannot start without a bot token.",
err=True,
)
click.echo("Please set DISCORD_BOT_TOKEN environment variable or use --discord-bot-token flag.", err=True)
return True # Always return True to continue (adapter will fail properly)
def _validate_adapter_tokens(adapter_types: list[str], discord_bot_token: Optional[str]) -> list[str]:
"""Validate tokens for all adapters that require them."""
validated = []
for adapter_type in adapter_types:
if adapter_type.startswith("discord"):
_validate_discord_adapter(adapter_type, discord_bot_token)
validated.append(adapter_type)
return validated
def _check_modular_service_config(manifest: Any) -> list[str]:
"""Check modular service configuration. Returns list of missing required vars."""
from ciris_engine.logic.config.env_utils import get_env_var
missing_required = []
if manifest.configuration:
for config_key, config_spec in manifest.configuration.items():
env_var = config_spec.env
if env_var and not get_env_var(env_var):
if config_spec.default is None:
missing_required.append(f"{env_var}")
return missing_required
def _categorize_adapters(
adapter_types: list[str], adapter_map: dict[str, Any]
) -> tuple[list[str], list[tuple[str, Any]]]:
"""Categorize adapters into built-in and modular services."""
# ciris_verify is always loaded at bootstrap — treat it as built-in
builtin_adapters = ["cli", "api", "discord", "ciris_verify"]
final_adapter_types = []
adapters_to_load = []
for adapter_type in adapter_types:
base_type = adapter_type.split(":")[0]
if any(base_type.startswith(builtin) for builtin in builtin_adapters):
final_adapter_types.append(adapter_type)
elif base_type.lower() in adapter_map:
manifest = adapter_map[base_type.lower()]
logger.info(f"Found modular service '{manifest.module.name}' for adapter type '{adapter_type}'")
missing = _check_modular_service_config(manifest)
if missing:
click.echo(
f"ERROR: Modular service '{manifest.module.name}' requires configuration:",
err=True,
)
for var in missing:
click.echo(f" - {var}", err=True)
click.echo(
f"Skipping modular service '{manifest.module.name}' due to missing configuration.",
err=True,
)
continue
adapters_to_load.append((adapter_type, manifest))
logger.info(f"Modular service '{manifest.module.name}' validated and will be loaded")
else:
click.echo(
f"WARNING: Unknown adapter type '{adapter_type}'. Not a built-in adapter or modular service.",
err=True,
)
final_adapter_types.append(adapter_type)
return final_adapter_types, adapters_to_load
async def _load_app_config(config_file_path: Optional[str], template: str) -> Any:
"""Load application configuration."""
if config_file_path and not Path(config_file_path).exists():
logger.error(f"Configuration file not found: {config_file_path}")
raise SystemExit(1)
cli_overrides: dict[str, Any] = {}
if template:
# Always set CLI override when --template is provided
# This ensures CLI takes priority over environment variables
cli_overrides["default_template"] = template
return await load_config(config_file_path, cli_overrides)
def _handle_config_load_error(e: Exception) -> None:
"""Handle configuration load errors."""
import time
error_msg = f"Failed to load config: {e}"
logger.error(error_msg)
print(error_msg, file=sys.stderr)
sys.stdout.flush()
sys.stderr.flush()
for log_handler in logger.handlers:
log_handler.flush()
time.sleep(0.1)
if sys.gettrace() is not None or "coverage" in sys.modules:
logger.debug("EXITING NOW VIA os._exit(1) AT _handle_precommit_wrapper coverage")
os._exit(1)
else:
logger.debug("EXITING NOW VIA sys.exit(1) AT _handle_precommit_wrapper")
sys.exit(1)
def _configure_api_adapter(api_host: Optional[str], api_port: Optional[int]) -> tuple[Any, str]:
"""Configure API adapter. Returns (AdapterConfig, channel_id)."""
from ciris_engine.logic.adapters.api.config import APIAdapterConfig
from ciris_engine.schemas.runtime.adapter_management import AdapterConfig
api_config = APIAdapterConfig()
api_config.load_env_vars()
if api_host:
api_config.host = api_host
if api_port:
api_config.port = api_port
adapter_config = AdapterConfig(adapter_type="api", enabled=True, settings=api_config.model_dump())
channel_id = api_config.get_home_channel_id(api_config.host, api_config.port)
return adapter_config, channel_id
def _configure_discord_adapter(
discord_bot_token: Optional[str],
) -> tuple[Any, Optional[str]]:
"""Configure Discord adapter. Returns (AdapterConfig, channel_id or None)."""
from ciris_engine.logic.adapters.discord.config import DiscordAdapterConfig
from ciris_engine.schemas.runtime.adapter_management import AdapterConfig
discord_config = DiscordAdapterConfig()
if discord_bot_token:
discord_config.bot_token = discord_bot_token
discord_config.load_env_vars()
adapter_config = AdapterConfig(
adapter_type="discord",
enabled=True,
settings=discord_config.model_dump(),
)
discord_channel_id = discord_config.get_home_channel_id()
formatted_channel_id = discord_config.get_formatted_startup_channel_id() if discord_channel_id else None
return adapter_config, formatted_channel_id
def _configure_cli_adapter(
cli_interactive: bool,
) -> tuple[Any, str]:
"""Configure CLI adapter. Returns (AdapterConfig, channel_id)."""
from ciris_engine.logic.adapters.cli.config import CLIAdapterConfig
from ciris_engine.schemas.runtime.adapter_management import AdapterConfig
cli_config = CLIAdapterConfig()
if not cli_interactive:
cli_config.interactive = False
adapter_config = AdapterConfig(adapter_type="cli", enabled=True, settings=cli_config.model_dump())
channel_id = cli_config.get_home_channel_id()
return adapter_config, channel_id
def _configure_adapters(
adapter_types: list[str],
app_config: Any,
api_host: Optional[str],
api_port: Optional[int],
discord_bot_token: Optional[str],
cli_interactive: bool,
) -> tuple[dict[str, Any], Optional[str]]:
"""Configure all adapters. Returns (adapter_configs, startup_channel_id)."""
adapter_configs: dict[str, Any] = {}
startup_channel_id: Optional[str] = getattr(app_config, "startup_channel_id", None)
for adapter_type in adapter_types:
channel_id: Optional[str] = None
if adapter_type.startswith("api"):
config, channel_id = _configure_api_adapter(api_host, api_port)
adapter_configs[adapter_type] = config
if not startup_channel_id:
startup_channel_id = channel_id
elif adapter_type.startswith("discord"):
config, channel_id = _configure_discord_adapter(discord_bot_token)
adapter_configs[adapter_type] = config
if channel_id and not startup_channel_id:
startup_channel_id = channel_id
elif adapter_type.startswith("cli"):
config, channel_id = _configure_cli_adapter(cli_interactive)
adapter_configs[adapter_type] = config
if not startup_channel_id:
startup_channel_id = channel_id
return adapter_configs, startup_channel_id
async def _flush_all_handlers() -> None:
"""Flush all output streams and log handlers."""
async def flush_handler(handler: Any) -> None:
try:
await asyncio.to_thread(handler.flush)
except Exception:
pass
flush_tasks = [
asyncio.create_task(asyncio.to_thread(sys.stdout.flush)),
asyncio.create_task(asyncio.to_thread(sys.stderr.flush)),
]
for log_handler in logging.getLogger().handlers:
flush_tasks.append(asyncio.create_task(flush_handler(log_handler)))
await asyncio.gather(*flush_tasks, return_exceptions=True)
async def _create_cli_monitor(runtime: CIRISRuntime) -> asyncio.Task[None]:
"""Create CLI shutdown monitor task."""
shutdown_event = asyncio.Event()
runtime._shutdown_event = shutdown_event
async def monitor_shutdown() -> None:
await shutdown_event.wait()
logger.info("CLI runtime shutdown complete, preparing clean exit")
await asyncio.sleep(0.2)
await _flush_all_handlers()
logger.info("Forcing exit to handle blocking CLI input thread")
return asyncio.create_task(monitor_shutdown())
async def _handle_cli_exit(selected_adapter_types: list[str]) -> None:
"""Handle CLI adapter exit with proper flushing."""
if "cli" not in selected_adapter_types:
return
logger.info("CLI runtime completed, forcing exit")
await asyncio.sleep(0.5)
await _flush_all_handlers()
logger.debug("EXITING NOW VIA os._exit(0) AT CLI runtime completed")
os._exit(0)
def _handle_final_exit() -> None:
"""Handle final exit logic for main function."""
import time
_shutdown_reason_logged["value"] = True # Mark normal exit
sys.stdout.flush()
sys.stderr.flush()
logger.info(f"[EXIT] CIRIS agent exiting cleanly (PID={os.getpid()})")
if "--adapter" in sys.argv and "api" in sys.argv and "--timeout" in sys.argv:
logger.debug("EXITING NOW VIA os._exit(0) AT API mode subprocess tests")
os._exit(0)
if "--adapter" in sys.argv and "cli" in sys.argv:
logger.info("CLI mode completed, forcing exit to handle blocking input thread")
sys.stdout.flush()
sys.stderr.flush()
for log_handler in logging.getLogger().handlers:
log_handler.flush()
time.sleep(0.1)
logger.debug("EXITING NOW VIA os._exit(0) AT CLI mode force exit")
os._exit(0)
logger.debug("EXITING NOW VIA sys.exit(0) AT end of main")
sys.exit(0)
async def _run_runtime(runtime: CIRISRuntime, timeout: Optional[int], num_rounds: Optional[int] = None) -> None:
"""Run the runtime with optional timeout and graceful shutdown."""
logger.debug(f"[DEBUG] _run_runtime called with timeout={timeout}, num_rounds={num_rounds}")
shutdown_called = False
try:
if timeout:
# Create task and handle timeout manually to allow graceful shutdown
logger.debug(f"[DEBUG] Setting up timeout for {timeout} seconds")
runtime_task = asyncio.create_task(runtime.run(num_rounds))
try:
# Wait for either the task to complete or timeout
await asyncio.wait_for(asyncio.shield(runtime_task), timeout=timeout)
except asyncio.TimeoutError:
logger.info(f"Timeout of {timeout} seconds reached, initiating graceful shutdown...")
# Request shutdown but don't cancel the task immediately
runtime.request_shutdown(f"Runtime timeout after {timeout} seconds")
# Give the shutdown processor time to run (up to 30 seconds)
try:
await asyncio.wait_for(runtime_task, timeout=30.0)
logger.info("Graceful shutdown completed within timeout")
except asyncio.TimeoutError:
logger.warning("Graceful shutdown did not complete within 30 seconds, cancelling...")
runtime_task.cancel()
try:
await runtime_task
except asyncio.CancelledError:
# Expected when we cancel the task
pass # NOSONAR - Intentionally not re-raising after timeout cancellation
# Ensure shutdown is called if the task was cancelled
logger.info("Calling shutdown explicitly after task cancellation")
await runtime.shutdown()
shutdown_called = True
else:
# Run without timeout
logger.debug("[DEBUG] Running without timeout")
await runtime.run(num_rounds)
except KeyboardInterrupt:
logger.info("Received interrupt signal, shutting down gracefully...")
runtime.request_shutdown("User interrupt")
# Don't call shutdown here if runtime.run() will handle it
if not shutdown_called:
await runtime.shutdown()
except Exception as e:
logger.error(f"FATAL ERROR: Unhandled exception in runtime: {e}", exc_info=True)
try:
runtime.request_shutdown(f"Fatal error: {e}")
if not shutdown_called:
await runtime.shutdown()
except Exception as shutdown_error:
logger.error(f"Error during emergency shutdown: {shutdown_error}", exc_info=True)
raise # Re-raise to ensure non-zero exit code
@click.command()
@click.option(
"--adapter",
"adapter_types_list",
multiple=True,
default=[],
help="One or more adapters to run. Specify multiple times for multiple adapters (e.g., --adapter cli --adapter api --adapter discord).",
)
@click.option("--template", default="default", help="Agent template name (only used for first-time setup)")
@click.option(
"--identity-update/--no-identity-update",
default=False,
help="Update existing identity from template (admin operation, requires --template)",
)
@click.option("--config", "config_file_path", type=click.Path(), help="Path to app config")
@click.option("--task", multiple=True, help="Task description to add before starting")
@click.option("--timeout", type=int, help="Maximum runtime duration in seconds")
@click.option("--handler", help="Direct handler to execute and exit")
@click.option("--params", help="JSON parameters for handler execution")
@click.option(
"--host",
"api_host",
default=None,
help="API host (default: 127.0.0.1 for security, use 0.0.0.0 for all interfaces)",
)
@click.option("--port", "api_port", default=None, type=int, help="API port (default: 8080)")
@click.option("--debug/--no-debug", default=False, help="Enable debug logging")
@click.option(
"--no-interactive/--interactive", "cli_interactive", default=True, help="Enable/disable interactive CLI input"
)
@click.option(
"--discord-token", "discord_bot_token", default=os.environ.get("DISCORD_BOT_TOKEN"), help="Discord bot token"
)
@click.option("--mock-llm/--no-mock-llm", default=False, help="Use the mock LLM service for offline testing")
@click.option("--num-rounds", type=int, help="Maximum number of processing rounds (default: infinite)")
def main(
adapter_types_list: tuple[str, ...],
template: str,
identity_update: bool,
config_file_path: Optional[str],
task: tuple[str],
timeout: Optional[int],
handler: Optional[str],
params: Optional[str],
api_host: Optional[str],
api_port: Optional[int],
debug: bool,
cli_interactive: bool,
discord_bot_token: Optional[str],
mock_llm: bool,
num_rounds: Optional[int],
) -> None:
"""Unified CIRIS agent entry point."""
# Setup basic console logging first (without file logging)
# File logging will be set up later once TimeService is available
setup_basic_logging(
level=logging.DEBUG if debug else logging.INFO,
log_to_file=False,
console_output=True,
enable_incident_capture=False, # Will be enabled later with TimeService
)
# Log startup with PID for debugging unexpected shutdowns
logger.info(f"[STARTUP] CIRIS agent starting (PID={os.getpid()}, PPID={os.getppid()})")
async def _async_main() -> None:
nonlocal mock_llm
from ciris_engine.logic.config.env_utils import get_env_var
from ciris_engine.logic.runtime.adapter_loader import AdapterLoader
from ciris_engine.logic.setup.first_run import is_first_run
# Check for CIRIS_MOCK_LLM environment variable
if not mock_llm and _check_mock_llm_env():
mock_llm = True
# Check macOS Python installation
_check_python_installation()
# Determine first-run status
first_run = is_first_run()
if os.environ.get("CIRIS_IMPORT_MODE") == "true":
first_run = False
# Handle adapter selection
final_adapter_types_list = list(adapter_types_list) if adapter_types_list else _get_adapter_types_from_env()
is_cli_mode = any(adapter.startswith("cli") for adapter in final_adapter_types_list)
# Handle first-run setup
_handle_first_run(first_run, adapter_types_list, is_cli_mode)
# Check for API key (any supported provider, with CIRIS_ prefix support)
has_api_key = any(
[
get_env_var("CIRIS_OPENAI_API_KEY") or get_env_var("OPENAI_API_KEY"),
get_env_var("CIRIS_ANTHROPIC_API_KEY") or get_env_var("ANTHROPIC_API_KEY"),
get_env_var("CIRIS_GOOGLE_API_KEY") or get_env_var("GOOGLE_API_KEY"),
get_env_var("CIRIS_LLM_API_KEY"),
get_env_var("OPENROUTER_API_KEY"),
get_env_var("GROQ_API_KEY"),
get_env_var("TOGETHER_API_KEY"),
]
)
if not mock_llm and not has_api_key and not first_run:
_show_api_key_required_message()
# Validate adapter tokens
selected_adapter_types = _validate_adapter_tokens(list(final_adapter_types_list), discord_bot_token)
# Discover modular services
adapter_loader = AdapterLoader()
discovered_services = adapter_loader.discover_services()
adapter_map = {svc.module.name.lower().replace("_adapter", ""): svc for svc in discovered_services}
# Categorize adapters
selected_adapter_types, adapters_to_load = _categorize_adapters(selected_adapter_types, adapter_map)
# Load config
try:
app_config = await _load_app_config(config_file_path, template)
except SystemExit:
raise
except Exception as e:
_handle_config_load_error(e)
return # Unreachable but makes type checker happy
# Build modules to load and add modular adapters to selected_adapter_types
modules_to_load = ["mock_llm"] if mock_llm else []
if mock_llm:
logger.info("Mock LLM module will be loaded")
for adapter_type, manifest in adapters_to_load:
modules_to_load.append(f"modular:{manifest.module.name}")
# Add modular adapter to adapter_types so runtime loads it
selected_adapter_types.append(adapter_type)
logger.info(f"Modular service '{manifest.module.name}' added to modules to load")
# Configure adapters
adapter_configs, startup_channel_id = _configure_adapters(
selected_adapter_types, app_config, api_host, api_port, discord_bot_token, cli_interactive
)
# Setup global exception handling
setup_global_exception_handler()
# Create and initialize runtime
runtime = CIRISRuntime(
adapter_types=selected_adapter_types,
essential_config=app_config,
startup_channel_id=startup_channel_id,
adapter_configs=adapter_configs,
interactive=cli_interactive,
host=api_host,
port=api_port,
discord_bot_token=discord_bot_token,
modules=modules_to_load,
identity_update=identity_update,
template_name=template, # Always pass template so CLI takes priority over env vars
)
await runtime.initialize()
setup_signal_handlers(runtime)
# Store preload tasks
preload_tasks = list(task) if task else []
runtime.set_preload_tasks(preload_tasks)
# Handle direct handler execution
if handler:
await _execute_handler(runtime, handler, params)
await runtime.shutdown()
return
# Determine effective num_rounds
effective_num_rounds = num_rounds
if effective_num_rounds is None:
from ciris_engine.logic.utils.constants import DEFAULT_NUM_ROUNDS
effective_num_rounds = DEFAULT_NUM_ROUNDS
# Create CLI monitor if needed
monitor_task = None
if "cli" in selected_adapter_types:
monitor_task = await _create_cli_monitor(runtime)
# Run the runtime
try:
await _run_runtime(runtime, timeout, effective_num_rounds)
finally:
if monitor_task and not monitor_task.done():
logger.debug("Waiting for CLI monitor task to detect shutdown completion...")
try:
await asyncio.wait_for(monitor_task, timeout=5.0)
except asyncio.TimeoutError:
logger.warning("Monitor task did not complete within 5 seconds")
monitor_task.cancel()
except Exception as e:
logger.error(f"Monitor task error: {e}")
# Handle CLI exit
await _handle_cli_exit(selected_adapter_types)
try:
asyncio.run(_async_main())
except KeyboardInterrupt:
_shutdown_reason_logged["value"] = True
logger.info(f"[EXIT] Interrupted by user (PID={os.getpid()}), exiting...")
logger.debug("EXITING NOW VIA sys.exit(0) AT KeyboardInterrupt in main")
sys.exit(0)
except SystemExit:
_shutdown_reason_logged["value"] = True
raise # Re-raise SystemExit to exit with the correct code
except Exception as e:
_shutdown_reason_logged["value"] = True
logger.error(f"[EXIT] Fatal error in main (PID={os.getpid()}): {e}", exc_info=True)
logger.debug("EXITING NOW VIA sys.exit(1) AT Fatal error in main")
sys.exit(1)
# Handle final exit
_handle_final_exit()
if __name__ == "__main__":
main()