forked from DLC-link/cbtc-lib
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch_with_callback.rs
More file actions
129 lines (113 loc) · 4.96 KB
/
batch_with_callback.rs
File metadata and controls
129 lines (113 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::future::Future;
/// Example: Batch distribution with callback
///
/// This example demonstrates how to use the callback mechanism to handle
/// transfer results as they complete. The callback can be used for:
/// - Logging transfer results to a file or database
/// - Sending notifications for failed transfers
/// - Tracking progress in real-time
/// - Implementing custom retry logic
use std::pin::Pin;
#[tokio::main]
async fn main() -> Result<(), String> {
dotenvy::dotenv().ok();
env_logger::init();
let csv_path = std::env::var("CSV_PATH").unwrap_or_else(|_| "recipients.csv".to_string());
let sender = std::env::var("PARTY_ID").expect("PARTY_ID must be set");
let ledger_host = std::env::var("LEDGER_HOST").expect("LEDGER_HOST must be set");
let registry_url = std::env::var("REGISTRY_URL").expect("REGISTRY_URL must be set");
let decentralized_party_id =
std::env::var("DECENTRALIZED_PARTY_ID").expect("DECENTRALIZED_PARTY_ID must be set");
let keycloak_client_id =
std::env::var("KEYCLOAK_CLIENT_ID").expect("KEYCLOAK_CLIENT_ID must be set");
let keycloak_username =
std::env::var("KEYCLOAK_USERNAME").expect("KEYCLOAK_USERNAME must be set");
let keycloak_password =
std::env::var("KEYCLOAK_PASSWORD").expect("KEYCLOAK_PASSWORD must be set");
let keycloak_url = keycloak::login::password_url(
&std::env::var("KEYCLOAK_HOST").expect("KEYCLOAK_HOST must be set"),
&std::env::var("KEYCLOAK_REALM").expect("KEYCLOAK_REALM must be set"),
);
// Read CSV file
println!("Reading CSV from: {}", csv_path);
let mut reader =
csv::Reader::from_path(&csv_path).map_err(|e| format!("Failed to read CSV file: {}", e))?;
let mut recipients = Vec::new();
for result in reader.deserialize() {
let record: CsvRecord = result.map_err(|e| format!("Failed to parse CSV record: {}", e))?;
recipients.push(cbtc::distribute::Recipient {
receiver: record.receiver,
amount: record.amount,
});
}
println!("Found {} recipients", recipients.len());
// Create log file path
let log_file = format!(
"transfer_results_{}.log",
chrono::Utc::now().format("%Y%m%d_%H%M%S")
);
println!("Logging transfer results to: {}", log_file);
// Create a callback that writes one line per transfer to a file
let callback = Box::new(
move |result: cbtc::transfer::TransferResult| -> Pin<Box<dyn Future<Output = ()> + Send>> {
let log_file = log_file.clone();
Box::pin(async move {
use std::fs::OpenOptions;
use std::io::Write;
// Build a single line with all relevant info
let status = if result.success { "SUCCESS" } else { "FAILED" };
let reference = result.reference.as_deref().unwrap_or("N/A");
let offer_cid = result.transfer_offer_cid.as_deref().unwrap_or("N/A");
let update_id = result.update_id.as_deref().unwrap_or("N/A");
let error = result.error.as_deref().unwrap_or("N/A");
let log_line = format!(
"{} | {} | idx={} | to={} | amount={} | ref={} | offer={} | update_id={} | error={}\n",
chrono::Utc::now().to_rfc3339(),
status,
result.transfer_index,
result.receiver,
result.amount,
reference,
offer_cid,
update_id,
error
);
// Write to file
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(&log_file) {
let _ = file.write_all(log_line.as_bytes());
}
// Also print to console
print!("{}", log_line);
})
},
) as Box<cbtc::transfer::TransferResultCallback>;
// Execute batch distribution with callback
let result = cbtc::distribute::submit(cbtc::distribute::Params {
recipients,
sender,
instrument_id: common::transfer::InstrumentId {
admin: decentralized_party_id.clone(),
id: "CBTC".to_string(),
},
ledger_host,
registry_url,
decentralized_party_id,
keycloak_client_id,
keycloak_username,
keycloak_password,
keycloak_url,
reference_base: Some(format!("batch-{}", chrono::Utc::now().timestamp())),
on_transfer_complete: Some(callback),
})
.await?;
println!("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("Batch distribution complete!");
println!("✓ Successful: {}", result.successful_count);
println!("✗ Failed: {}", result.failed_count);
Ok(())
}
#[derive(Debug, serde::Deserialize)]
struct CsvRecord {
receiver: String,
amount: String,
}