-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtarka.py
More file actions
350 lines (285 loc) · 10.8 KB
/
tarka.py
File metadata and controls
350 lines (285 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#!/usr/bin/env python3
"""
Tarka — adversarial deliberation layer on top of AI agents.
Tarka is a DETERMINISTIC HARNESS, not a third brain. All intelligence
lives inside the agents. Tarka mechanically executes a fixed protocol:
propose → critique × N → synthesize. Given the same agent outputs, it
always routes, formats, and presents identically. No LLM in the
orchestration layer. No smart routing. No AI deciding when to stop.
Usage:
python tarka.py "design the caching layer for our API"
python tarka.py "should we migrate from REST to GraphQL" --rounds 3
python tarka.py "refactor the auth module" --cwd ./project
python tarka.py "pick a database" --agents claude codex --quiet
"""
from __future__ import annotations
import argparse
import subprocess
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
# --- Colors (ANSI, no deps) ---
_RESET = "\033[0m"
_BOLD = "\033[1m"
_DIM = "\033[2m"
_AGENT_COLORS = {
"claude": "\033[38;5;209m", # orange
"codex": "\033[38;5;114m", # green
"gemini": "\033[38;5;75m", # blue
}
def _color(agent_name: str, text: str) -> str:
c = _AGENT_COLORS.get(agent_name.lower(), "")
return f"{c}{text}{_RESET}" if c else text
def _dim(text: str) -> str:
return f"{_DIM}{text}{_RESET}"
def _bold(text: str) -> str:
return f"{_BOLD}{text}{_RESET}"
# --- Agents ---
TIMEOUT = 180
@dataclass(frozen=True)
class Agent:
"""An AI agent accessible via CLI."""
name: str
command: list[str] # must contain {prompt} placeholder
def ask(self, prompt: str, cwd: str = ".", stream: bool = False) -> str:
"""Run the agent. If stream=True, print output to terminal as it arrives."""
cmd = [part.replace("{prompt}", prompt) for part in self.command]
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, cwd=cwd,
)
# Kill the process if it exceeds the timeout, even during streaming.
timer = threading.Timer(TIMEOUT, proc.kill)
timer.start()
try:
if stream:
chunks: list[str] = []
for line in proc.stdout:
chunks.append(line)
sys.stdout.write(line)
sys.stdout.flush()
proc.wait()
if proc.returncode and proc.returncode < 0:
raise RuntimeError(f"{self.name} timed out after {TIMEOUT}s")
return "".join(chunks).strip()
else:
stdout, stderr = proc.communicate()
if proc.returncode and proc.returncode < 0:
raise RuntimeError(f"{self.name} timed out after {TIMEOUT}s")
if proc.returncode != 0:
raise RuntimeError(f"{self.name} failed: {stderr.strip()}")
return stdout.strip()
finally:
timer.cancel()
CLAUDE = Agent("Claude", ["claude", "-p", "{prompt}", "--no-input"])
CODEX = Agent("Codex", ["codex", "exec", "{prompt}"])
GEMINI = Agent("Gemini", ["gemini", "-p", "{prompt}"])
AGENTS = {"claude": CLAUDE, "codex": CODEX, "gemini": GEMINI}
# --- Prompts ---
#
# These are static templates with deterministic substitution. The prompts
# do the work — they are the product, not the code. Anti-sycophancy
# hardening is baked in: without it, RLHF-trained models perform
# "debate theater" and converge to the safe, conventional answer.
PROPOSE = """\
Propose an approach for the following task. Focus on key decisions,
tradeoffs, and risks. Be specific and concrete.
Commit to a clear position. Do not hedge with "it depends" — pick the
best path given what you know and defend it.
TASK: {task}"""
CRITIQUE = """\
You are reviewing another AI agent's proposal. Your job is genuine
adversarial review, not polite agreement.
Rules:
- You MUST find at least one substantive flaw or unstated assumption.
- If you agree with everything, you have FAILED at your job.
- Do not soften your critique. Be direct about what is wrong.
- Say what is missing, what breaks under pressure, and what you would do
differently.
- If their proposal is genuinely strong, attack the implementation
details, timeline assumptions, or edge cases.
Lock in your position: state clearly what you would change and why.
Do NOT concede unless presented with a logical refutation you cannot
counter.
TASK: {task}
THEIR PROPOSAL:
{proposal}"""
SYNTHESIZE = """\
Two experts debated the approach below. Synthesize their positions into
a concrete, actionable plan.
Rules:
- Where they disagree, pick the stronger argument and say WHY the other
is wrong. Do not split the difference or hedge.
- Where both missed something, add it.
- Structure your output as:
1. RECOMMENDATION: The approach to take (1-2 paragraphs)
2. KEY DECISIONS: Bullet list of decisions made and rationale
3. DISSENT: Where the losing argument had merit worth noting
4. NEXT STEPS: Concrete actions, ordered
TASK: {task}
DEBATE:
{debate}"""
# --- Deliberation ---
#
# The protocol is fixed and deterministic:
# 1. Both agents propose independently (parallel)
# 2. Each agent critiques the other's proposal (parallel within round,
# serial across rounds — round N+1 sees round N's output)
# 3. First agent synthesizes the full debate log (serial, streamed)
#
# Tarka makes zero decisions. The user controls agents, rounds, and
# timeout. Tarka executes the protocol exactly as specified.
def _parallel(agents: list[Agent], prompts: list[str], cwd: str) -> list[str | None]:
"""Ask multiple agents in parallel. Returns responses in input order."""
with ThreadPoolExecutor(max_workers=len(agents)) as pool:
futures = {
pool.submit(agent.ask, prompt, cwd): i
for i, (agent, prompt) in enumerate(zip(agents, prompts))
}
results: list[str | None] = [None] * len(agents)
errors: list[str] = []
for future in as_completed(futures):
idx = futures[future]
try:
results[idx] = future.result()
except Exception as e:
errors.append(f"{agents[idx].name}: {e}")
if errors:
alive = [r for r in results if r is not None]
if not alive:
raise RuntimeError("All agents failed:\n" + "\n".join(errors))
for err in errors:
print(f" {_dim(f'warning: {err}')}")
return results
def _header(label: str) -> None:
bar = "\u2501" * 60
print(f"\n{bar}")
print(f" {_bold(label)}")
print(f"{bar}\n")
def _agent_block(agent: Agent, label: str, text: str) -> None:
tag = _color(agent.name, f"[{agent.name}]")
sep = _dim("\u2500" * 56)
print(f"\n{tag} {_dim(label)}")
print(sep)
print(text)
print()
def deliberate(
task: str,
agents: tuple[Agent, Agent] = (CLAUDE, CODEX),
rounds: int = 2,
cwd: str = ".",
quiet: bool = False,
) -> str:
a, b = agents
t0 = time.time()
# --- Phase 1: independent proposals (parallel) ---
_header(f"Phase 1 \u2014 Proposals ({a.name} + {b.name})")
t1 = time.time()
pos_a, pos_b = _parallel(
[a, b],
[PROPOSE.format(task=task)] * 2,
cwd,
)
print(_dim(f" done ({time.time() - t1:.0f}s)"))
if pos_a is None and pos_b is None:
raise RuntimeError("Both agents failed to propose.")
if pos_a is None:
pos_a = f"[{a.name} failed \u2014 echoing {b.name}'s proposal]\n{pos_b}"
if pos_b is None:
pos_b = f"[{b.name} failed \u2014 echoing {a.name}'s proposal]\n{pos_a}"
if not quiet:
_agent_block(a, "proposal", pos_a)
_agent_block(b, "proposal", pos_b)
log = [
f"[{a.name} \u2014 proposal]\n{pos_a}",
f"[{b.name} \u2014 proposal]\n{pos_b}",
]
# --- Phase 2: adversarial critique (parallel per round, serial across rounds) ---
for r in range(1, rounds + 1):
_header(f"Phase 2 \u2014 Round {r}/{rounds}")
t1 = time.time()
crit_a, crit_b = _parallel(
[a, b],
[
CRITIQUE.format(task=task, proposal=pos_b),
CRITIQUE.format(task=task, proposal=pos_a),
],
cwd,
)
print(_dim(f" done ({time.time() - t1:.0f}s)"))
if crit_a is not None:
pos_a = crit_a
if crit_b is not None:
pos_b = crit_b
if not quiet:
if crit_a:
_agent_block(a, f"round {r} critique", crit_a)
if crit_b:
_agent_block(b, f"round {r} critique", crit_b)
log.append(f"[{a.name} \u2014 round {r}]\n{pos_a}")
log.append(f"[{b.name} \u2014 round {r}]\n{pos_b}")
# --- Phase 3: synthesis (streamed to terminal) ---
_header(f"Phase 3 \u2014 Synthesis ({a.name})")
t1 = time.time()
result = a.ask(
SYNTHESIZE.format(task=task, debate="\n\n---\n\n".join(log)),
cwd,
stream=True,
)
total = time.time() - t0
synth_time = time.time() - t1
print(f"\n{'\u2501' * 60}")
print(_dim(f" synthesis {synth_time:.0f}s | total {total:.0f}s"))
print(f"{'\u2501' * 60}")
return result
# --- CLI ---
def main() -> None:
parser = argparse.ArgumentParser(
description="Deterministic deliberation harness for AI agent CLIs.",
)
parser.add_argument("task", help="the task to deliberate on")
parser.add_argument(
"--rounds", type=int, default=2,
help="number of debate rounds (default: 2)",
)
parser.add_argument("--cwd", default=".", help="working directory for agents")
parser.add_argument(
"--agents", nargs=2, default=["claude", "codex"],
choices=list(AGENTS.keys()), metavar="AGENT",
help=f"which two agents to use (choices: {', '.join(AGENTS)}; default: claude codex)",
)
parser.add_argument(
"--quiet", action="store_true",
help="only show synthesis, not the full debate",
)
parser.add_argument(
"--timeout", type=int, default=180,
help="per-agent call timeout in seconds (default: 180)",
)
args = parser.parse_args()
global TIMEOUT
TIMEOUT = args.timeout
agent_a = AGENTS[args.agents[0]]
agent_b = AGENTS[args.agents[1]]
print(
f"\n{_bold('Tarka')} "
f"{_dim(f'{agent_a.name} vs {agent_b.name} \u2022 {args.rounds} rounds')}"
)
try:
deliberate(
args.task,
agents=(agent_a, agent_b),
rounds=args.rounds,
cwd=args.cwd,
quiet=args.quiet,
)
except RuntimeError as e:
print(f"\n{_bold('Error:')} {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print(f"\n{_dim('interrupted')}")
sys.exit(130)
if __name__ == "__main__":
main()