-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsample_code.py
More file actions
417 lines (366 loc) · 19.3 KB
/
sample_code.py
File metadata and controls
417 lines (366 loc) · 19.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
import random
import time
import csv
import os
from datetime import datetime
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Tuple
# -------------------------
# Utility Functions
# -------------------------
def safe_int(prompt: str, min_val: Optional[int] = None, max_val: Optional[int] = None) -> int:
while True:
try:
v = int(input(prompt).strip())
if min_val is not None and v < min_val:
print(f"Please enter a value >= {min_val}.")
continue
if max_val is not None and v > max_val:
print(f"Please enter a value <= {max_val}.")
continue
return v
except ValueError:
print("Invalid integer. Try again.")
def choose(prompt: str, options: List[str]) -> str:
opts = ", ".join(options)
while True:
val = input(f"{prompt} ({opts}): ").strip().lower()
if val in [o.lower() for o in options]:
return val
print("Invalid choice. Try again.")
def timestamp() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# -------------------------
# Data Structures
# -------------------------
@dataclass
class OfferRecord:
timestamp: str
round_no: int
speaker: str
price_per_unit: int
quantity: int
note: Optional[str] = ""
@dataclass
class SessionReport:
session_id: str
start_time: str
end_time: Optional[str] = None
rounds: int = 0
offers: List[OfferRecord] = field(default_factory=list)
agreed_price: Optional[int] = None
agreed_quantity: Optional[int] = None
winner: Optional[str] = None
def to_csv(self, path: str):
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["session_id", self.session_id])
writer.writerow(["start_time", self.start_time])
writer.writerow(["end_time", self.end_time or ""])
writer.writerow(["rounds", self.rounds])
writer.writerow([])
writer.writerow(["round_no", "timestamp", "speaker", "price_per_unit", "quantity", "note"])
for o in self.offers:
writer.writerow([o.round_no, o.timestamp, o.speaker, o.price_per_unit, o.quantity, o.note])
writer.writerow([])
writer.writerow(["agreed_price", self.agreed_price])
writer.writerow(["agreed_quantity", self.agreed_quantity])
writer.writerow(["winner", self.winner or ""])
# -------------------------
# Agent Implementations
# -------------------------
class BaseAgent:
def _init_(self, role: str, persona: str = "balanced", budget: Optional[int] = None, min_price: Optional[int] = None, quantity: int = 1):
self.role = role.lower() # 'buyer' or 'seller'
self.persona = persona.lower()
self.budget = budget
self.min_price = min_price
self.quantity = quantity
self.character_score = 0.0
self.last_offer: Optional[int] = None
def propose(self, round_no: int, time_left: float) -> int:
# Baseline behavior depending on role and persona
if self.role == "buyer":
base = self.budget if self.budget is not None else 100
# Buyers start low and move up
aggression = 1.0
if "aggressive" in self.persona:
aggression = 1.4
elif "friendly" in self.persona:
aggression = 0.8
# dynamic concession
step = int((round_no) * (1 + random.random() * 0.5))
offer = max(1, int(base * (0.5 + 0.5 / aggression) + step))
else:
base = self.min_price if self.min_price is not None else 80
# Sellers start higher and reduce
concession = 1.0
if "aggressive" in self.persona:
concession = 1.2
elif "friendly" in self.persona:
concession = 0.8
step = int((round_no) * (1 + random.random() * 0.5))
offer = max(1, int(base * (1.2 * concession) - step))
# Keep last offer
self.last_offer = offer
# Slight personality flavoring score
self.character_score += 0.2
return offer
def evaluate(self, incoming_offer: int, round_no: int, time_left: float) -> str:
# Return 'accept', 'reject', or 'counter'
if self.role == "buyer":
if self.budget is not None and incoming_offer <= self.budget:
# early rounds may reject unless very good
if round_no <= 2 and incoming_offer <= int(self.budget * 0.95):
return "accept"
if round_no > 2 and incoming_offer <= self.budget:
return "accept"
# if time low, accept slightly worse
if time_left < 10 and self.budget is not None and incoming_offer <= int(self.budget * 1.02):
return "accept"
return "counter"
else:
if self.min_price is not None and incoming_offer >= self.min_price:
if round_no <= 2 and incoming_offer >= int(self.min_price * 1.05):
return "accept"
if round_no > 2 and incoming_offer >= self.min_price:
return "accept"
if time_left < 10 and self.min_price is not None and incoming_offer >= int(self.min_price * 0.98):
return "accept"
return "counter"
# -------------------------
# Opponent Factory
# -------------------------
def build_opponents(count: int, role: str, base_budget: int = 120, base_min: int = 80) -> List[BaseAgent]:
personas = ["aggressive", "balanced", "friendly", "calculated", "smooth"]
opponents = []
for i in range(count):
p = random.choice(personas)
if role == "buyer":
budget = int(base_budget * (0.8 + random.random() * 0.8))
opponents.append(BaseAgent(role="buyer", persona=p, budget=budget))
else:
min_price = int(base_min * (0.8 + random.random() * 0.8))
opponents.append(BaseAgent(role="seller", persona=p, min_price=min_price))
return opponents
# -------------------------
# Negotiation Controller
# -------------------------
class NegotiationController:
def _init_(self):
self.session = SessionReport(session_id=str(int(time.time())), start_time=timestamp())
self.user_agent: Optional[BaseAgent] = None
self.opponents: List[BaseAgent] = []
self.max_rounds = 10
self.total_quantity = 1
self.global_time_limit = 180 # seconds
self.start_time = time.time()
def setup(self):
print("Welcome to the Advanced Negotiation Simulator!")
role = choose("Choose your role", ["Buyer", "Seller"]).lower()
persona = choose("Choose your persona", ["Aggressive", "Balanced", "Friendly"]).lower()
self.total_quantity = safe_int("Enter total quantity you want to trade: ", min_val=1)
self.max_rounds = safe_int("Enter maximum negotiation rounds: ", min_val=1, max_val=100)
self.global_time_limit = safe_int("Enter overall time limit (seconds, e.g., 120): ", min_val=10)
if role == "buyer":
budget = safe_int("Enter your maximum total budget (for all units): ", min_val=1)
# compute per-unit budget lookup
per_unit_budget = max(1, budget // self.total_quantity)
self.user_agent =BaseAgent(role="buyer", persona=persona, budget=per_unit_budget, quantity=self.total_quantity)
# opponents are sellers
opp_count = safe_int("How many seller opponents to face? ", min_val=1, max_val=3)
# For opponents, give base min prices around per_unit_budget
self.opponents = build_opponents(opp_count, role="seller", base_min=per_unit_budget)
else:
min_price = safe_int("Enter your minimum acceptable price per unit: ", min_val=1)
self.user_agent = BaseAgent(role="seller", persona=persona, min_price=min_price, quantity=self.total_quantity)
opp_count = safe_int("How many buyer opponents to face? ", min_val=1, max_val=3)
# For opponents, give base budgets around min_price
self.opponents = build_opponents(opp_count, role="buyer", base_budget=min_price * 2)
print("Setup complete. Opponents:")
for i, o in enumerate(self.opponents, 1):
if o.role == "seller":
print(f" Opponent {i}: Seller persona={o.persona}, min_price={o.min_price}")
else:
print(f" Opponent {i}: Buyer persona={o.persona}, budget={o.budget}")
def play(self):
rounds = 0
time_limit_at = time.time() + self.global_time_limit
active_opponents = self.opponents.copy()
# Each opponent negotiated sequentially. User can choose to engage which opponent each round.
while rounds < self.max_rounds and time.time() < time_limit_at and active_opponents:
rounds += 1
print('' + '=' * 40)
print(f"Round {rounds} (Time left: {int(time_limit_at - time.time())}s)")
print('Engage with opponents:')
for idx, opp in enumerate(active_opponents, 1):
if opp.role == 'seller':
print(f" [{idx}] Seller persona={opp.persona}, min={opp.min_price}")
else:
print(f" [{idx}] Buyer persona={opp.persona}, budget={opp.budget}")
# choose opponent
choice = safe_int("Choose an opponent index to engage this round: ", min_val=1, max_val=len(active_opponents))
opponent = active_opponents[choice - 1]
# user action
print("Your options: make_offer / accept / reject / pass / propose_backup")
action = input("Enter action: ").strip().lower()
if action == "make_offer":
price = safe_int("Enter your price per unit: ", min_val=1)
qty = safe_int(f"Enter quantity (1..{self.total_quantity}): ", min_val=1, max_val=self.total_quantity)
self.record_offer(rounds, self.user_agent, price, qty, note="user_offer")
# opponent evaluates
evaluation = opponent.evaluate(price, rounds, time_left=time_limit_at - time.time())
if evaluation == 'accept':
print(f"Opponent accepted your offer of {price} per unit for {qty} units.")
self.finalize_deal(price, qty, winner='user')
break
elif evaluation == 'reject':
print("Opponent rejected your offer outright.")
else:
# opponent counters
counter_price = opponent.propose(rounds, time_left=time_limit_at - time.time())
counter_qty = min(qty, opponent.quantity)
print(f"Opponent counters with {counter_price} per unit for {counter_qty} units.")
self.record_offer(rounds, opponent, counter_price, counter_qty, note='opponent_counter')
# give user option to accept counter
dec = choose("Do you accept opponent's counter?", ["yes", "no"]) # yes/no
if dec == 'yes':
print("You accepted the opponent's counter offer.")
self.finalize_deal(counter_price, counter_qty, winner='user')
break
else:
print("You declined the counter. Negotiation continues.")
elif action == 'accept':
if opponent.last_offer:
print(f"You accepted opponent's last offer: {opponent.last_offer}")
self.record_offer(rounds, self.user_agent, opponent.last_offer, self.user_agent.quantity, note='user_accept')
self.finalize_deal(opponent.last_offer, self.user_agent.quantity, winner='user')
break
else:
print("No active opponent offer to accept.")
elif action == 'reject':
print("You rejected immediately. Opponent removed from this session.")
self.record_offer(rounds, self.user_agent, 0, 0, note='user_reject')
active_opponents.remove(opponent)
elif action == 'pass':
print("You passed this turn. Opponent may act.")
# opponent may make a proactive offer
proactive = random.random() < 0.6
if proactive:
offer_price = opponent.propose(rounds, time_left=time_limit_at - time.time())
offer_qty = min(self.user_agent.quantity, opponent.quantity)
print(f"Opponent proactively offers {offer_price} per unit for {offer_qty} units.")
self.record_offer(rounds, opponent, offer_price, offer_qty, note='opponent_offer')
# user can accept
dec = choose("Do you accept this offer?", ["yes", "no"]) # yes/no
if dec == 'yes':
self.finalize_deal(offer_price, offer_qty, winner='user')
break
else:
print("You declined. Continuing.")
elif action == 'propose_backup':
backup_price = safe_int("Enter backup/fallback price per unit: ", min_val=1)
backup_qty = safe_int("Enter backup quantity: ", min_val=1, max_val=self.total_quantity)
print(f"You proposed a backup deal {backup_price} per unit for {backup_qty} units.")
self.record_offer(rounds, self.user_agent, backup_price, backup_qty, note='user_backup')
# opponent randomly decides to accept or not based on closeness
dist = abs((opponent.min_price or opponent.budget or 0) - backup_price)
accept_prob = max(0.1, 1.0 - dist / max(1, (opponent.min_price or opponent.budget or 50)))
if random.random() < accept_prob:
print("Opponent accepted your backup deal!")
self.finalize_deal(backup_price, backup_qty, winner='user')
break
else:
print("Opponent declined the backup deal.")
else:
print("Unknown action. Try again.")
# After user action / opponent interaction, allow opponent to act if still active
# Opponent may make an offer if there was no deal
if active_opponents and random.random() < 0.7:
opp = opponent
opp_offer = opp.propose(rounds, time_left=time_limit_at - time.time())
opp_qty = min(self.user_agent.quantity, opp.quantity)
print(f"Opponent {opp.persona} made an offer: {opp_offer} per unit for {opp_qty} units.")
self.record_offer(rounds, opp, opp_offer, opp_qty, note='opponent_offer')
# User can accept immediately
dec = choose("Do you accept this opponent offer?", ["yes", "no"]) # yes/no
if dec == 'yes':
self.finalize_deal(opp_offer, opp_qty, winner='user')
break
else:
print("You declined the opponent's offer this round.")
# end while
self.session.rounds = rounds
self.session.end_time = timestamp()
if not self.session.agreed_price:
print('No deal was finalized in the session.')
# Provide final fallback: allow mutual last-offer settlement
self.fallback_resolution()
# Score and print report
self.print_report()
def record_offer(self, round_no: int, agent: BaseAgent, price: int, qty: int, note: str = ""):
r = OfferRecord(timestamp=timestamp(), round_no=round_no, speaker=f"{agent.role}-{agent.persona}", price_per_unit=price, quantity=qty, note=note)
self.session.offers.append(r)
def finalize_deal(self, price: int, qty: int, winner: str):
self.session.agreed_price = price
self.session.agreed_quantity = qty
self.session.winner = winner
print('' + '*' * 30)
print(f"DEAL FINALIZED: {qty} units at {price} per unit. Winner: {winner}")
print('*' * 30 + '')
def fallback_resolution(self):
# Try to find if any last offers from both sides overlap and accept
user_last = next((o for o in reversed(self.session.offers) if o.speaker.startswith(self.user_agent.role)), None)
opp_last = next((o for o in reversed(self.session.offers) if not o.speaker.startswith(self.user_agent.role)), None)
if user_last and opp_last:
# If prices are close, propose midpoint
user_price = user_last.price_per_unit
opp_price = opp_last.price_per_unit
midpoint = (user_price + opp_price) // 2
print(f"Fallback midpoint proposal: {midpoint} per unit.")
dec = choose("Do you accept the midpoint fallback?", ["yes", "no"]) # yes/no
if dec == 'yes':
self.finalize_deal(midpoint, min(user_last.quantity, opp_last.quantity), winner='fallback_user')
return
print("No acceptable fallback found.")
def print_report(self):
print('SESSION REPORT')
print('Session ID:', self.session.session_id)
print('Start:', self.session.start_time)
print('End:', self.session.end_time)
print('Rounds:', self.session.rounds)
print('-' * 40)
for o in self.session.offers:
print(f"Round {o.round_no} | {o.timestamp} | {o.speaker} | {o.price_per_unit} x{o.quantity} | {o.note}")
print('-' * 40)
if self.session.agreed_price:
total = (self.session.agreed_price or 0) * (self.session.agreed_quantity or 0)
print(f"Agreed: {self.session.agreed_quantity} units at {self.session.agreed_price} each. Total: {total}")
print("Winner:", self.session.winner)
# compute simple scoring
if self.user_agent.role == 'buyer':
per_unit_budget = self.user_agent.budget or 0
savings = max(0, per_unit_budget - (self.session.agreed_price or 0))
print(f"Buyer savings per unit: {savings}")
else:
profit = max(0, (self.session.agreed_price or 0) - (self.user_agent.min_price or 0))
print(f"Seller profit per unit: {profit}")
else:
print('No deal.')
# offer to save CSV
save = choose("Save session report to CSV?", ["yes", "no"]) # yes/no
if save == 'yes':
fname = f"negotiation_{self.session.session_id}.csv"
path = os.path.join(os.getcwd(), fname)
self.session.to_csv(path)
print(f"Saved report to {path}")
# -------------------------
# Main
# -------------------------
def main():
random.seed()
controller = NegotiationController()
controller.setup()
controller.play()
if __name__ == '__main__':
main()