-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtest_script.py
More file actions
155 lines (121 loc) · 4.71 KB
/
test_script.py
File metadata and controls
155 lines (121 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import tempfile
import time
import scaledown as sd
# Optional Optimizers (Lazy Loaded)
from scaledown.optimizer import HasteOptimizer, SemanticOptimizer
from scaledown.exceptions import AuthenticationError, APIError
API_KEY = os.environ.get("SCALEDOWN_API_KEY", "yVlJ8qWWVF6wj8RZUfNHm7fUYqNBVEFr3Rrfep67")
sd.set_api_key(API_KEY)
if API_KEY == "your_api_key_here":
print("Warning: Using placeholder API key. API calls will fail.")
print("Export your key: export SCALEDOWN_API_KEY='sk_...'\n")
TEST_CODE = """
def calculate_sum(numbers):
\"\"\"Calculate sum of numbers.\"\"\"
total = 0
for num in numbers:
total += num
return total
def calculate_average(numbers):
\"\"\"Calculate average of numbers.\"\"\"
if len(numbers) == 0:
return 0
return calculate_sum(numbers) / len(numbers)
class DataProcessor:
def __init__(self, data):
self.data = data
def process(self):
return calculate_average(self.data)
"""
def print_header(title):
print("\n" + "-" * 60)
print(f"{title}")
print("-" * 60)
def print_step_details(step_name, content, metrics):
print(f"\n[{step_name}]")
# Handle different metric structures safely
in_tok = metrics.get('original_tokens', metrics.get('input_tokens', '?'))
out_tok = metrics.get('optimized_tokens', metrics.get('compressed_tokens', metrics.get('output_tokens', '?')))
print(f"Tokens: {in_tok} -> {out_tok}")
if 'latency_ms' in metrics:
print(f"Latency: {metrics['latency_ms']:.0f}ms")
preview = content.strip()[:150].replace('\n', ' ')
print(f"Preview: {preview}{'...' if len(content) > 150 else ''}")
# main test logic
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f:
f.write(TEST_CODE)
file_path_arg = f.name
try:
print_header("Component tests")
# 1. test semantic
print("\nTesting SemanticOptimizer...", end=" ")
try:
opt = SemanticOptimizer(top_k=1)
res = opt.optimize(context=TEST_CODE, query="DataProcessor", file_path=file_path_arg)
print("Passed")
metrics_dict = res.metrics.__dict__ if hasattr(res.metrics, '__dict__') else {}
print_step_details("Semantic output", res.content, metrics_dict)
except ImportError:
print("Skipped (missing dependencies)")
except Exception as e:
print(f"Failed: {e}")
# 2. test haste
print("\nTesting HasteOptimizer...", end=" ")
try:
opt = HasteOptimizer(top_k=2)
res = opt.optimize(context=TEST_CODE, query="calculate_average", file_path=file_path_arg, target_model="gpt-4o")
print("Passed")
metrics_dict = res.metrics.__dict__ if hasattr(res.metrics, '__dict__') else {}
print_step_details("Haste output", res.content, metrics_dict)
except ImportError:
print("Skipped (missing dependencies)")
except Exception as e:
print(f"Failed: {e}")
# 3. test compressor
print("\nTesting ScaleDownCompressor...", end=" ")
try:
comp = sd.ScaleDownCompressor(target_model="gpt-4o")
res = comp.compress(context=TEST_CODE, prompt="Summarize")
print("Passed")
metrics_dict = {
"original_tokens": res.tokens[0],
"compressed_tokens": res.tokens[1],
"latency_ms": 0
}
print_step_details("Compressor output", res.content, metrics_dict)
except Exception as e:
print(f"Failed: {e}")
# 4. full pipeline
print_header("Pipeline integration")
steps = []
try:
steps.append(('haste', HasteOptimizer(top_k=5)))
except ImportError: pass
try:
steps.append(('semantic', SemanticOptimizer(top_k=1)))
except ImportError: pass
steps.append(('compressor', sd.ScaleDownCompressor(target_model="gpt-4o")))
pipeline = sd.Pipeline(steps)
print(f"Configuration: {[s[0] for s in steps]}")
result = pipeline.run(
context=TEST_CODE,
query="logic for processing",
file_path=file_path_arg,
prompt="Explain logic"
)
print("Pipeline finished successfully")
print("\n--- Trace ---")
for i, step in enumerate(result.history):
print(f"\nStep {i+1}: {step.step_name}")
print(f" Latency: {step.latency_ms:.0f}ms")
print(f" Tokens: {step.input_tokens} -> {step.output_tokens}")
print_header("Summary")
print(f"Original size: {len(TEST_CODE)} chars")
print(f"Final size: {len(result.final_content)} chars")
print(f"Savings: {result.savings_percent:.1f}%")
except Exception as e:
print(f"\nError: {e}")
finally:
if os.path.exists(file_path_arg):
os.unlink(file_path_arg)