-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblockchain_adapter.py
More file actions
242 lines (204 loc) · 8.36 KB
/
blockchain_adapter.py
File metadata and controls
242 lines (204 loc) · 8.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
"""
blockchain_adapter.py
=====================
Adapter that wraps MockDeFiProtocol with an optional blockchain audit layer.
Pattern: every state-changing action is executed against MockDeFiProtocol first
(authoritative, fast), then a non-blocking on-chain record call is dispatched
via BlockchainClient. The simulation never waits for blockchain confirmations.
When BLOCKCHAIN_ENABLED=false or the Hardhat node is unavailable, the adapter
falls back to pure MockDeFiProtocol behavior with zero performance impact.
"""
import asyncio
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from DeFiProtocolInterface import DeFiProtocolInterface
from MockDeFiProtocol import MockDeFiProtocol
from blockchain_client import BlockchainClient
import config
logger = logging.getLogger(__name__)
class BlockchainAdapter(DeFiProtocolInterface):
"""
Drop-in replacement for MockDeFiProtocol that simultaneously:
1. Executes actions in the Python mock (fast, authoritative)
2. Records each action on-chain via BlockchainClient (async, best-effort)
3. Maintains an audit_log list of {step, agent_type, action, tx_hash}
"""
def __init__(
self,
mock: Optional[MockDeFiProtocol] = None,
blockchain: Optional[BlockchainClient] = None,
):
self._mock = mock or MockDeFiProtocol()
self._bc = blockchain or BlockchainClient(rpc_url=config.HARDHAT_RPC_URL)
# In-memory audit log — each entry has step, agent, action, tx_hash, timestamp
self.audit_log: List[Dict[str, Any]] = []
self._current_step = 0
logger.info(
"BlockchainAdapter ready | blockchain_available=%s",
self._bc.available,
)
# ── Step tracking ──────────────────────────────────────────────
def set_step(self, step: int) -> None:
self._current_step = step
# ── Read-only delegations to mock ─────────────────────────────
def get_token_price(self) -> float:
return self._mock.get_token_price()
def get_oracle_price(self) -> float:
return self._mock.get_oracle_price()
def get_pool_liquidity(self) -> Dict[str, float]:
return self._mock.get_pool_liquidity()
def get_user_health_factor(self, address: str) -> float:
return self._mock.get_user_health_factor(address)
def has_bad_debt(self) -> bool:
return self._mock.has_bad_debt()
def get_all_borrower_addresses(self) -> List[str]:
return self._mock.get_all_borrower_addresses()
def get_lending_state(self) -> Dict:
return self._mock.get_lending_state()
def advance_oracle_price(self) -> float:
return self._mock.advance_oracle_price()
# ── State-changing actions ─────────────────────────────────────
def execute_swap(
self,
token_in: str,
amount_in: float,
caller_address: str,
) -> Dict[str, float]:
# 1. Execute in mock
result = self._mock.execute_swap(token_in, amount_in, caller_address)
# 2. Record on-chain (best-effort)
tx = self._bc.record_swap(
caller=caller_address,
token_in=token_in,
amount_in=amount_in,
token_out=result.get("token_out", ""),
amount_out=result.get("amount_out", 0.0),
new_amm_price=result.get("new_price", 0.0),
price_impact=result.get("price_impact", 0.0),
)
self._log_audit("execute_swap", caller_address, tx)
return result
def execute_liquidation(
self,
borrower_address: str,
liquidator_address: str,
) -> Dict[str, float]:
result = self._mock.execute_liquidation(borrower_address, liquidator_address)
tx = self._bc.record_liquidation(
liquidator=liquidator_address,
borrower=borrower_address,
debt_repaid=result.get("debt_repaid", 0.0),
collateral_seized=result.get("collateral_seized", 0.0),
bonus=result.get("liquidation_bonus", 0.0),
)
self._log_audit("execute_liquidation", liquidator_address, tx, extra={
"borrower": borrower_address,
"debt_repaid": result.get("debt_repaid", 0.0),
})
return result
def execute_borrow(
self,
borrower_address: str,
amount_usdc: float,
) -> Dict[str, float]:
result = self._mock.execute_borrow(borrower_address, amount_usdc)
tx = self._bc.record_borrow(
borrower=borrower_address,
amount_usdc=amount_usdc,
health_factor=result.get("new_health_factor", 1.0),
)
self._log_audit("execute_borrow", borrower_address, tx, extra={
"amount_usdc": amount_usdc,
"health_factor": result.get("new_health_factor", 1.0),
})
return result
def execute_supply(
self,
supplier_address: str,
token: str,
amount: float,
) -> Dict[str, float]:
result = self._mock.execute_supply(supplier_address, token, amount)
tx = self._bc.record_supply(
supplier=supplier_address,
token_sym=token,
amount=amount,
lp_minted=result.get("lp_tokens_minted", 0.0),
)
self._log_audit("execute_supply", supplier_address, tx, extra={
"token": token,
"amount": amount,
})
return result
def execute_withdraw(
self,
withdrawer_address: str,
fraction: float,
) -> Dict[str, float]:
result = self._mock.execute_withdraw(withdrawer_address, fraction)
# No specific on-chain record API for mock withdrawal yet
# We'll log it in the audit trail without a Tx
self._log_audit("execute_withdraw", withdrawer_address, None, extra={
"fraction": fraction,
"usdc_withdrawn": result.get("usdc_withdrawn", 0.0),
"eth_withdrawn": result.get("eth_withdrawn", 0.0),
})
return result
def execute_deposit_collateral(
self,
borrower_address: str,
amount_eth: float,
) -> Dict[str, float]:
result = self._mock.execute_deposit_collateral(borrower_address, amount_eth)
self._log_audit("execute_deposit_collateral", borrower_address, None, extra={
"amount_eth": amount_eth,
})
return result
def execute_withdraw_collateral(
self,
borrower_address: str,
amount_eth: float,
) -> Dict[str, float]:
result = self._mock.execute_withdraw_collateral(borrower_address, amount_eth)
self._log_audit("execute_withdraw_collateral", borrower_address, None, extra={
"amount_eth": amount_eth,
})
return result
def execute_repay(
self,
borrower_address: str,
amount_usdc: float,
) -> Dict[str, float]:
result = self._mock.execute_repay(borrower_address, amount_usdc)
self._log_audit("execute_repay", borrower_address, None, extra={
"amount_usdc": amount_usdc,
})
return result
# ── Audit log helpers ──────────────────────────────────────────
def _log_audit(
self,
action: str,
agent: str,
tx_hash: Optional[str],
extra: Optional[Dict] = None,
) -> None:
entry: Dict[str, Any] = {
"step": self._current_step,
"action": action,
"agent": agent,
"tx_hash": tx_hash,
"on_chain": tx_hash is not None,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if extra:
entry.update(extra)
self.audit_log.append(entry)
def get_audit_log(self) -> List[Dict[str, Any]]:
"""Return the full audit log for this simulation run."""
return list(self.audit_log)
def get_blockchain_status(self) -> Dict[str, Any]:
"""Return blockchain connection status."""
return self._bc.get_status()
def get_transaction(self, tx_hash: str) -> Optional[Dict[str, Any]]:
return self._bc.get_transaction(tx_hash)