diff --git a/.github/workflows/forester-tests.yml b/.github/workflows/forester-tests.yml
index 04d4f94591..5ddee18460 100644
--- a/.github/workflows/forester-tests.yml
+++ b/.github/workflows/forester-tests.yml
@@ -81,3 +81,48 @@ jobs:
- name: Test
run: cargo test --package forester e2e_test -- --nocapture
+
+ compressible-tests:
+ name: Forester compressible tests
+ runs-on: warp-ubuntu-latest-x64-4x
+ timeout-minutes: 60
+
+ services:
+ redis:
+ image: redis:8.0.1
+ ports:
+ - 6379:6379
+ options: >-
+ --health-cmd "redis-cli ping"
+ --health-interval 10s
+ --health-timeout 5s
+ --health-retries 5
+
+ env:
+ RUST_LOG: forester=debug,light_client=debug
+ REDIS_URL: redis://localhost:6379
+
+ steps:
+ - uses: actions/checkout@v6
+
+ - name: Setup and build
+ uses: ./.github/actions/setup-and-build
+ with:
+ skip-components: "go"
+ cache-key: "rust"
+
+ - name: Build CLI
+ run: npx nx build @lightprotocol/zk-compression-cli
+
+ - name: Build test programs
+ run: |
+ cargo build-sbf --manifest-path sdk-tests/csdk-anchor-full-derived-test/Cargo.toml
+
+ - name: Test compressible PDA
+ run: cargo test --package forester --test test_compressible_pda -- --nocapture
+
+ - name: Test compressible Mint
+ run: cargo test --package forester --test test_compressible_mint -- --nocapture
+
+ - name: Test compressible ctoken
+ run: cargo test --package forester --test test_compressible_ctoken -- --nocapture
diff --git a/Cargo.lock b/Cargo.lock
index a17411ce99..2c1b36ad5b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2301,11 +2301,13 @@ dependencies = [
"bs58",
"clap 4.5.53",
"create-address-test-program",
+ "csdk-anchor-full-derived-test",
"dashmap 6.1.0",
"dotenvy",
"env_logger 0.11.8",
"forester-utils",
"futures",
+ "hex",
"itertools 0.14.0",
"lazy_static",
"light-account-checks",
@@ -3661,6 +3663,7 @@ dependencies = [
"solana-msg 2.2.1",
"solana-program-error 2.2.2",
"solana-pubkey 2.4.0",
+ "solana-rent",
"solana-sysvar",
"thiserror 2.0.17",
"zerocopy",
diff --git a/Cargo.toml b/Cargo.toml
index bb645393a4..8c4ca05f39 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -106,6 +106,7 @@ solana-zk-token-sdk = "2.3"
solana-logger = "2.3"
solana-bn254 = "2.2"
solana-sysvar = { version = "2.2" }
+solana-rent = { version = "2.2" }
solana-program-error = { version = "2.2" }
solana-account-info = { version = "2.2" }
solana-transaction = { version = "2.2" }
@@ -240,6 +241,7 @@ light-indexed-array = { path = "program-libs/indexed-array", version = "0.3.0" }
light-array-map = { path = "program-libs/array-map", version = "0.1.1" }
light-program-profiler = { version = "0.1.0" }
create-address-program-test = { path = "program-tests/create-address-test-program", version = "1.0.0" }
+sdk-compressible-test = { path = "sdk-tests/sdk-compressible-test", version = "0.1.0" }
groth16-solana = { version = "0.2.0" }
bytemuck = { version = "1.19.0" }
arrayvec = "0.7"
diff --git a/cli/src/commands/test-validator/index.ts b/cli/src/commands/test-validator/index.ts
index 6f864ff050..d668c90e20 100644
--- a/cli/src/commands/test-validator/index.ts
+++ b/cli/src/commands/test-validator/index.ts
@@ -21,6 +21,7 @@ class SetupCommand extends Command {
"$ light test-validator --geyser-config ./config.json",
'$ light test-validator --validator-args "--limit-ledger-size 50000000"',
"$ light test-validator --sbf-program
",
+ "$ light test-validator --upgradeable-program ",
"$ light test-validator --devnet",
"$ light test-validator --mainnet",
];
@@ -111,6 +112,14 @@ class SetupCommand extends Command {
multiple: true,
summary: "Usage: --sbf-program ",
}),
+ "upgradeable-program": Flags.string({
+ description:
+ "Add an upgradeable SBF program to the genesis configuration. Required for programs that need compressible config initialization. If the ledger already exists then this parameter is silently ignored.",
+ required: false,
+ multiple: true,
+ summary:
+ "Usage: --upgradeable-program ",
+ }),
devnet: Flags.boolean({
description:
"Clone Light Protocol programs and accounts from devnet instead of loading local binaries.",
@@ -134,10 +143,22 @@ class SetupCommand extends Command {
}),
};
- validatePrograms(programs: { address: string; path: string }[]): void {
- // Check for duplicate addresses among provided programs
+ validatePrograms(
+ programs: { address: string; path: string }[],
+ upgradeablePrograms: {
+ address: string;
+ path: string;
+ upgradeAuthority: string;
+ }[],
+ ): void {
+ // Check for duplicate addresses among all provided programs
const addresses = new Set();
- for (const program of programs) {
+ const allPrograms = [
+ ...programs.map((p) => ({ ...p, type: "sbf" })),
+ ...upgradeablePrograms.map((p) => ({ ...p, type: "upgradeable" })),
+ ];
+
+ for (const program of allPrograms) {
if (addresses.has(program.address)) {
this.error(`Duplicate program address detected: ${program.address}`);
}
@@ -192,24 +213,46 @@ class SetupCommand extends Command {
});
this.log("\nTest validator stopped successfully \x1b[32m✔\x1b[0m");
} else {
- const rawValues = flags["sbf-program"] || [];
-
- if (rawValues.length % 2 !== 0) {
+ // Parse --sbf-program flags (2 arguments each: address, path)
+ const rawSbfValues = flags["sbf-program"] || [];
+ if (rawSbfValues.length % 2 !== 0) {
this.error("Each --sbf-program flag must have exactly two arguments");
}
const programs: { address: string; path: string }[] = [];
- for (let i = 0; i < rawValues.length; i += 2) {
+ for (let i = 0; i < rawSbfValues.length; i += 2) {
programs.push({
- address: rawValues[i],
- path: rawValues[i + 1],
+ address: rawSbfValues[i],
+ path: rawSbfValues[i + 1],
+ });
+ }
+
+ // Parse --upgradeable-program flags (3 arguments each: address, path, upgrade_authority)
+ const rawUpgradeableValues = flags["upgradeable-program"] || [];
+ if (rawUpgradeableValues.length % 3 !== 0) {
+ this.error(
+ "Each --upgradeable-program flag must have exactly three arguments: ",
+ );
+ }
+
+ const upgradeablePrograms: {
+ address: string;
+ path: string;
+ upgradeAuthority: string;
+ }[] = [];
+ for (let i = 0; i < rawUpgradeableValues.length; i += 3) {
+ upgradeablePrograms.push({
+ address: rawUpgradeableValues[i],
+ path: rawUpgradeableValues[i + 1],
+ upgradeAuthority: rawUpgradeableValues[i + 2],
});
}
- this.validatePrograms(programs);
+ this.validatePrograms(programs, upgradeablePrograms);
await initTestEnv({
additionalPrograms: programs,
+ upgradeablePrograms: upgradeablePrograms,
checkPhotonVersion: !flags["relax-indexer-version-constraint"],
indexer: !flags["skip-indexer"],
limitLedgerSize: flags["limit-ledger-size"],
diff --git a/cli/src/utils/initTestEnv.ts b/cli/src/utils/initTestEnv.ts
index 6438af9f41..63d42a0da0 100644
--- a/cli/src/utils/initTestEnv.ts
+++ b/cli/src/utils/initTestEnv.ts
@@ -125,6 +125,7 @@ export async function stopTestEnv(options: {
export async function initTestEnv({
additionalPrograms,
+ upgradeablePrograms,
skipSystemAccounts,
indexer = true,
prover = true,
@@ -142,6 +143,11 @@ export async function initTestEnv({
skipReset,
}: {
additionalPrograms?: { address: string; path: string }[];
+ upgradeablePrograms?: {
+ address: string;
+ path: string;
+ upgradeAuthority: string;
+ }[];
skipSystemAccounts?: boolean;
indexer: boolean;
prover: boolean;
@@ -161,6 +167,7 @@ export async function initTestEnv({
// We cannot await this promise directly because it will hang the process
startTestValidator({
additionalPrograms,
+ upgradeablePrograms,
skipSystemAccounts,
limitLedgerSize,
rpcPort,
@@ -175,6 +182,18 @@ export async function initTestEnv({
await confirmServerStability(`http://127.0.0.1:${rpcPort}/health`);
await confirmRpcReadiness(`http://127.0.0.1:${rpcPort}`);
+ if (prover) {
+ const config = getConfig();
+ config.proverUrl = `http://127.0.0.1:${proverPort}`;
+ setConfig(config);
+ try {
+ await startProver(proverPort);
+ } catch (error) {
+ console.error("Failed to start prover:", error);
+ throw error;
+ }
+ }
+
if (indexer) {
const config = getConfig();
config.indexerUrl = `http://127.0.0.1:${indexerPort}`;
@@ -190,20 +209,6 @@ export async function initTestEnv({
proverUrlForIndexer,
);
}
-
- if (prover) {
- const config = getConfig();
- config.proverUrl = `http://127.0.0.1:${proverPort}`;
- setConfig(config);
- try {
- // TODO: check if using redisUrl is better here.
- await startProver(proverPort);
- } catch (error) {
- console.error("Failed to start prover:", error);
- // Prover logs will be automatically displayed by spawnBinary in process.ts
- throw error;
- }
- }
}
export async function initTestEnvIfNeeded({
@@ -277,6 +282,7 @@ export function programFilePath(programName: string): string {
export async function getSolanaArgs({
additionalPrograms,
+ upgradeablePrograms,
skipSystemAccounts,
limitLedgerSize,
rpcPort,
@@ -287,6 +293,11 @@ export async function getSolanaArgs({
skipReset = false,
}: {
additionalPrograms?: { address: string; path: string }[];
+ upgradeablePrograms?: {
+ address: string;
+ path: string;
+ upgradeAuthority: string;
+ }[];
skipSystemAccounts?: boolean;
limitLedgerSize?: number;
rpcPort?: number;
@@ -301,7 +312,7 @@ export async function getSolanaArgs({
const solanaArgs = [
`--limit-ledger-size=${limitLedgerSize}`,
`--rpc-port=${rpcPort}`,
- `--gossip-host=${gossipHost}`,
+ `--bind-address=${gossipHost}`,
"--quiet",
];
@@ -367,6 +378,18 @@ export async function getSolanaArgs({
}
}
+ // Add upgradeable programs (with upgrade authority)
+ if (upgradeablePrograms) {
+ for (const program of upgradeablePrograms) {
+ solanaArgs.push(
+ "--upgradeable-program",
+ program.address,
+ program.path,
+ program.upgradeAuthority,
+ );
+ }
+ }
+
// Load local system accounts only if not cloning from network
if (!skipSystemAccounts && !cloneNetwork) {
const accountsRelPath = "../../accounts";
@@ -379,6 +402,7 @@ export async function getSolanaArgs({
export async function startTestValidator({
additionalPrograms,
+ upgradeablePrograms,
skipSystemAccounts,
limitLedgerSize,
rpcPort,
@@ -390,6 +414,11 @@ export async function startTestValidator({
skipReset,
}: {
additionalPrograms?: { address: string; path: string }[];
+ upgradeablePrograms?: {
+ address: string;
+ path: string;
+ upgradeAuthority: string;
+ }[];
skipSystemAccounts?: boolean;
limitLedgerSize?: number;
rpcPort?: number;
@@ -403,6 +432,7 @@ export async function startTestValidator({
const command = "solana-test-validator";
const solanaArgs = await getSolanaArgs({
additionalPrograms,
+ upgradeablePrograms,
skipSystemAccounts,
limitLedgerSize,
rpcPort,
diff --git a/cli/src/utils/processProverServer.ts b/cli/src/utils/processProverServer.ts
index c163f07dd6..15d47fdf0f 100644
--- a/cli/src/utils/processProverServer.ts
+++ b/cli/src/utils/processProverServer.ts
@@ -102,6 +102,7 @@ export async function startProver(proverPort: number, redisUrl?: string) {
spawnBinary(getProverPathByArch(), args);
await waitForServers([{ port: proverPort, path: "/" }]);
+ await new Promise((r) => setTimeout(r, 5000));
console.log(`Prover started successfully!`);
}
diff --git a/forester/.gitignore b/forester/.gitignore
index f6072a93ee..5a66a2f170 100644
--- a/forester/.gitignore
+++ b/forester/.gitignore
@@ -5,3 +5,5 @@ logs
.env.devnet
*.json
!package.json
+spawn.sh
+spawn_devnet.sh
diff --git a/forester/Cargo.toml b/forester/Cargo.toml
index 4d2a5fa4ee..2a1b177307 100644
--- a/forester/Cargo.toml
+++ b/forester/Cargo.toml
@@ -29,6 +29,7 @@ light-sdk = { workspace = true, features = ["anchor"] }
light-program-test = { workspace = true }
light-compressible = { workspace = true, default-features = false, features = ["solana"] }
light-token-interface = { workspace = true }
+light-token-client = { workspace = true }
light-token = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-transaction-status = { workspace = true }
@@ -42,6 +43,7 @@ futures = { workspace = true }
thiserror = { workspace = true }
borsh = { workspace = true }
bs58 = { workspace = true }
+hex = "0.4"
env_logger = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
@@ -64,7 +66,7 @@ serial_test = { workspace = true }
light-prover-client = { workspace = true, features = ["devenv"] }
light-test-utils = { workspace = true }
light-program-test = { workspace = true, features = ["devenv"] }
-light-token-client = { workspace = true }
light-compressed-token = { workspace = true }
rand = { workspace = true }
create-address-test-program = { workspace = true }
+csdk-anchor-full-derived-test = { path = "../sdk-tests/csdk-anchor-full-derived-test", features = ["no-entrypoint"] }
diff --git a/forester/package.json b/forester/package.json
index e1cdfb14f0..cc0173c735 100644
--- a/forester/package.json
+++ b/forester/package.json
@@ -5,6 +5,7 @@
"scripts": {
"build": "cargo build",
"test": "redis-start && TEST_MODE=local TEST_V1_STATE=true TEST_V2_STATE=true TEST_V1_ADDRESS=true TEST_V2_ADDRESS=true RUST_LOG=forester=debug,forester_utils=debug,light_prover_client=debug cargo test --package forester e2e_test -- --nocapture",
+ "test:compressible": "cargo build-sbf -- -p csdk-anchor-full-derived-test && RUST_LOG=forester=debug,light_client=debug cargo test --package forester --test test_compressible_pda --test test_compressible_mint --test test_compressible_ctoken -- --nocapture",
"docker:build": "docker build --tag forester -f Dockerfile .."
},
"devDependencies": {
diff --git a/forester/src/cli.rs b/forester/src/cli.rs
index ff8fe633a2..21c28e6787 100644
--- a/forester/src/cli.rs
+++ b/forester/src/cli.rs
@@ -236,6 +236,14 @@ pub struct StartArgs {
)]
pub enable_compressible: bool,
+ #[arg(
+ long = "compressible-pda-program",
+ env = "COMPRESSIBLE_PDA_PROGRAMS",
+ help = "Compressible PDA programs to track. Format: 'program_id:discriminator_base58'. Can be specified multiple times. Example: 'MyProg1111111111111111111111111111111111111:6kRvHBv2N3F'",
+ value_delimiter = ','
+ )]
+ pub compressible_pda_programs: Vec,
+
#[arg(
long,
env = "LOOKUP_TABLE_ADDRESS",
diff --git a/forester/src/compressible/bootstrap.rs b/forester/src/compressible/bootstrap.rs
deleted file mode 100644
index 802b1f21c1..0000000000
--- a/forester/src/compressible/bootstrap.rs
+++ /dev/null
@@ -1,370 +0,0 @@
-use std::sync::Arc;
-
-use borsh::BorshDeserialize;
-use light_token_interface::{state::Token, LIGHT_TOKEN_PROGRAM_ID};
-use serde_json::json;
-use solana_sdk::pubkey::Pubkey;
-use tokio::sync::oneshot;
-use tracing::{debug, error, info};
-
-use super::state::CompressibleAccountTracker;
-use crate::Result;
-
-const PAGE_SIZE: usize = 10_000;
-
-/// Bootstrap the compressible account tracker by fetching existing accounts
-/// Uses standard getProgramAccounts for localhost, getProgramAccountsV2 for remote networks
-pub async fn bootstrap_compressible_accounts(
- rpc_url: String,
- tracker: Arc,
- shutdown_rx: oneshot::Receiver<()>,
-) -> Result<()> {
- info!("Starting bootstrap of compressible accounts");
-
- let is_localhost = rpc_url.contains("localhost") || rpc_url.contains("127.0.0.1");
-
- if is_localhost {
- info!("Detected localhost, using standard getProgramAccounts");
- bootstrap_with_standard_api(rpc_url, tracker, shutdown_rx).await
- } else {
- info!("Using getProgramAccountsV2 with pagination");
- bootstrap_with_v2_api(rpc_url, tracker, shutdown_rx).await
- }
-}
-
-/// Process a single account from RPC response
-/// Returns Ok(true) if account was inserted, Ok(false) if skipped, Err on critical failure
-fn process_account(
- account_value: &serde_json::Value,
- tracker: &CompressibleAccountTracker,
-) -> Result {
- // Extract pubkey
- let pubkey_str = match account_value.get("pubkey").and_then(|v| v.as_str()) {
- Some(s) => s,
- None => {
- debug!("Skipping account with missing pubkey");
- return Ok(false);
- }
- };
-
- let pubkey = match pubkey_str.parse::() {
- Ok(pk) => pk,
- Err(e) => {
- debug!("Failed to parse pubkey {}: {:?}", pubkey_str, e);
- return Ok(false);
- }
- };
-
- // Extract account data
- let account_obj = match account_value.get("account") {
- Some(obj) => obj,
- None => {
- debug!("Skipping account {} with missing account object", pubkey);
- return Ok(false);
- }
- };
-
- // Check lamports - skip closed accounts (lamports == 0)
- let lamports = match account_obj.get("lamports").and_then(|v| v.as_u64()) {
- Some(0) => {
- debug!("Skipping closed account {} (lamports == 0)", pubkey);
- return Ok(false);
- }
- Some(lamports) => lamports,
- None => {
- debug!("Skipping account {} with missing lamports field", pubkey);
- return Ok(false);
- }
- };
-
- let data_array = match account_obj.get("data").and_then(|v| v.as_array()) {
- Some(arr) if !arr.is_empty() => arr,
- _ => {
- debug!("Skipping account {} with missing or empty data", pubkey);
- return Ok(false);
- }
- };
-
- let data_str = match data_array[0].as_str() {
- Some(s) => s,
- None => {
- debug!("Skipping account {} with invalid data format", pubkey);
- return Ok(false);
- }
- };
-
- let data_bytes = match base64::decode(data_str) {
- Ok(bytes) => bytes,
- Err(e) => {
- debug!("Failed to decode base64 for account {}: {:?}", pubkey, e);
- return Ok(false);
- }
- };
-
- // Deserialize Token
- let ctoken = match Token::try_from_slice(&data_bytes) {
- Ok(token) => token,
- Err(e) => {
- debug!(
- "Failed to deserialize Token for account {}: {:?}",
- pubkey, e
- );
- return Ok(false);
- }
- };
-
- // Check if account is a valid Token account (account_type == 2)
- if !ctoken.is_token_account() {
- debug!("Skipping account {} without compressible config", pubkey);
- return Ok(false);
- }
-
- // Use tracker's update_from_account to calculate compressible_slot
- if let Err(e) = tracker.update_from_account(pubkey, &data_bytes, lamports) {
- debug!("Failed to insert account {}: {:?}", pubkey, e);
- return Ok(false);
- }
-
- Ok(true)
-}
-
-/// Send RPC request with shutdown handling
-async fn send_rpc_request(
- client: &reqwest::Client,
- rpc_url: &str,
- payload: &serde_json::Value,
- shutdown_rx: &mut oneshot::Receiver<()>,
-) -> Result {
- let response_result = tokio::select! {
- response = client.post(rpc_url).json(payload).send() => response,
- _ = shutdown_rx => {
- return Err(anyhow::anyhow!("Shutdown requested"));
- }
- };
-
- let response = match response_result {
- Ok(resp) => resp,
- Err(e) => {
- error!("Bootstrap request error: {:?}", e);
- return Err(anyhow::anyhow!("Request failed: {:?}", e));
- }
- };
-
- if !response.status().is_success() {
- error!("Bootstrap HTTP error: {}", response.status());
- return Err(anyhow::anyhow!("HTTP error: {}", response.status()));
- }
-
- let json_response: serde_json::Value = match response.json().await {
- Ok(json) => json,
- Err(e) => {
- error!("Bootstrap failed to parse response: {:?}", e);
- return Err(anyhow::anyhow!("Parse error: {:?}", e));
- }
- };
-
- // Check for RPC error
- if let Some(error) = json_response.get("error") {
- error!("Bootstrap RPC error: {:?}", error);
- return Err(anyhow::anyhow!("RPC error: {:?}", error));
- }
-
- json_response
- .get("result")
- .cloned()
- .ok_or_else(|| anyhow::anyhow!("Unexpected response format"))
-}
-
-/// Bootstrap using Helius getProgramAccountsV2 with cursor pagination
-async fn bootstrap_with_v2_api(
- rpc_url: String,
- tracker: Arc,
- mut shutdown_rx: oneshot::Receiver<()>,
-) -> Result<()> {
- let client = reqwest::Client::new();
- let program_id = Pubkey::new_from_array(LIGHT_TOKEN_PROGRAM_ID);
-
- let mut total_fetched = 0;
- let mut total_inserted = 0;
- let mut page_count = 0;
- let mut cursor: Option = None;
-
- loop {
- page_count += 1;
-
- // Build request payload
- // Filter for accounts with account_type = 2 at position 165
- // This indicates a Light Token account with extensions (e.g., Compressible)
- let mut params = json!([
- program_id.to_string(),
- {
- "encoding": "base64",
- "commitment": "confirmed",
- "filters": [
- {"memcmp": {"offset": 165, "bytes": "3"}}
- ],
- "limit": PAGE_SIZE
- }
- ]);
-
- // Add cursor for pagination
- if let Some(ref c) = cursor {
- params[1]["paginationKey"] = json!(c);
- }
-
- let payload = json!({
- "jsonrpc": "2.0",
- "id": page_count,
- "method": "getProgramAccountsV2",
- "params": params
- });
-
- // Send request
- let result = match send_rpc_request(&client, &rpc_url, &payload, &mut shutdown_rx).await {
- Ok(res) => res,
- Err(e) if e.to_string().contains("Shutdown requested") => {
- info!(
- "Bootstrap shutting down at page {}, {} accounts inserted",
- page_count, total_inserted
- );
- return Ok(());
- }
- Err(e) => {
- error!("Bootstrap failed on page {}: {:?}", page_count, e);
- return Err(e);
- }
- };
-
- // Extract accounts array
- let accounts_array = if let Some(arr) = result.get("accounts").and_then(|v| v.as_array()) {
- arr
- } else if let Some(arr) = result.as_array() {
- arr
- } else if let Some(value) = result.get("value").and_then(|v| v.as_array()) {
- value
- } else {
- error!(
- "Bootstrap could not find accounts array on page {}",
- page_count
- );
- return Err(anyhow::anyhow!("Could not find accounts array"));
- };
-
- let accounts_count = accounts_array.len();
-
- if accounts_count == 0 {
- info!("Bootstrap complete: no more accounts (page {})", page_count);
- break;
- }
-
- total_fetched += accounts_count;
-
- // Process each account
- let mut page_inserted = 0;
- for account_value in accounts_array {
- if let Ok(true) = process_account(account_value, &tracker) {
- page_inserted += 1;
- total_inserted += 1;
- }
- }
-
- info!(
- "Bootstrap page {}: fetched {} accounts, inserted {} compressible accounts (total: {})",
- page_count, accounts_count, page_inserted, total_inserted
- );
-
- // Get cursor for next page
- cursor = result
- .get("paginationKey")
- .and_then(|c| c.as_str())
- .map(|s| s.to_string());
-
- // If no cursor, we've reached the end
- if cursor.is_none() {
- info!(
- "Bootstrap complete: reached end of results at page {}",
- page_count
- );
- break;
- }
-
- // Add small delay between requests to avoid rate limiting
- tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
- }
-
- info!(
- "Bootstrap finished: {} pages, {} total fetched, {} compressible accounts inserted",
- page_count, total_fetched, total_inserted
- );
-
- Ok(())
-}
-
-/// Bootstrap using standard getProgramAccounts (for localhost/test validator)
-async fn bootstrap_with_standard_api(
- rpc_url: String,
- tracker: Arc,
- mut shutdown_rx: oneshot::Receiver<()>,
-) -> Result<()> {
- let client = reqwest::Client::new();
- let program_id = Pubkey::new_from_array(LIGHT_TOKEN_PROGRAM_ID);
-
- // Filter for accounts with account_type = 2 at position 165
- // This indicates a Light Token account with extensions (e.g., Compressible)
- let payload = json!({
- "jsonrpc": "2.0",
- "id": 1,
- "method": "getProgramAccounts",
- "params": [
- program_id.to_string(),
- {
- "encoding": "base64",
- "commitment": "confirmed",
- "filters": [
- {"memcmp": {"offset": 165, "bytes": "3"}}
- ]
- }
- ]
- });
-
- // Send request
- let result = match send_rpc_request(&client, &rpc_url, &payload, &mut shutdown_rx).await {
- Ok(res) => res,
- Err(e) if e.to_string().contains("Shutdown requested") => {
- info!("Bootstrap shutting down before request");
- return Ok(());
- }
- Err(e) => {
- error!("Bootstrap failed: {:?}", e);
- return Err(e);
- }
- };
-
- // Standard API returns array directly
- let accounts_array = match result.as_array() {
- Some(arr) => arr,
- None => {
- error!("Bootstrap could not find accounts array");
- return Err(anyhow::anyhow!("Could not find accounts array"));
- }
- };
-
- let total_fetched = accounts_array.len();
- let mut total_inserted = 0;
-
- info!("Bootstrap fetched {} total accounts", total_fetched);
-
- // Process each account
- for account_value in accounts_array {
- if let Ok(true) = process_account(account_value, &tracker) {
- total_inserted += 1;
- }
- }
-
- info!(
- "Bootstrap complete: {} total fetched, {} compressible accounts inserted",
- total_fetched, total_inserted
- );
-
- Ok(())
-}
diff --git a/forester/src/compressible/bootstrap_helpers.rs b/forester/src/compressible/bootstrap_helpers.rs
new file mode 100644
index 0000000000..c358bacbfc
--- /dev/null
+++ b/forester/src/compressible/bootstrap_helpers.rs
@@ -0,0 +1,346 @@
+//! Shared bootstrap helpers for fetching program accounts from RPC.
+//!
+//! This module provides common functionality used by both token and PDA bootstrap:
+//! - RPC request sending with shutdown handling and timeout
+//! - Account field extraction from JSON responses
+//! - Standard and V2 API patterns
+
+use std::time::Duration;
+
+use serde_json::json;
+use solana_sdk::pubkey::Pubkey;
+use tokio::time::timeout;
+use tracing::debug;
+
+use super::config::{DEFAULT_PAGE_SIZE, DEFAULT_PAGINATION_DELAY_MS};
+use crate::Result;
+
+const RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
+
+/// Re-export page size for use in other modules
+pub const PAGE_SIZE: usize = DEFAULT_PAGE_SIZE;
+
+/// Raw account data extracted from RPC response
+pub struct RawAccountData {
+ pub pubkey: Pubkey,
+ pub lamports: u64,
+ pub data: Vec,
+}
+
+pub async fn send_rpc_request(
+ client: &reqwest::Client,
+ rpc_url: &str,
+ payload: &serde_json::Value,
+) -> Result {
+ let result = timeout(RPC_REQUEST_TIMEOUT, async {
+ let response = client
+ .post(rpc_url)
+ .json(payload)
+ .send()
+ .await
+ .map_err(|e| anyhow::anyhow!("Request failed: {:?}", e))?;
+
+ if !response.status().is_success() {
+ return Err(anyhow::anyhow!("HTTP error: {}", response.status()));
+ }
+
+ let json_response: serde_json::Value = response
+ .json()
+ .await
+ .map_err(|e| anyhow::anyhow!("Parse error: {:?}", e))?;
+
+ // Check for RPC error
+ if let Some(error) = json_response.get("error") {
+ return Err(anyhow::anyhow!("RPC error: {:?}", error));
+ }
+
+ json_response
+ .get("result")
+ .cloned()
+ .ok_or_else(|| anyhow::anyhow!("Unexpected response format"))
+ })
+ .await;
+
+ match result {
+ Ok(inner) => inner,
+ Err(_) => Err(anyhow::anyhow!(
+ "RPC request timed out after {}s",
+ RPC_REQUEST_TIMEOUT.as_secs()
+ )),
+ }
+}
+
+/// Extract raw account data from a JSON account value
+/// Returns None if account should be skipped (missing fields, closed, etc.)
+pub fn extract_account_fields(account_value: &serde_json::Value) -> Option {
+ // Extract pubkey
+ let pubkey_str = account_value.get("pubkey").and_then(|v| v.as_str())?;
+ let pubkey = match pubkey_str.parse::() {
+ Ok(pk) => pk,
+ Err(e) => {
+ debug!("Failed to parse pubkey {}: {:?}", pubkey_str, e);
+ return None;
+ }
+ };
+
+ // Extract account data
+ let account_obj = account_value.get("account")?;
+
+ // Check lamports - skip closed accounts (lamports == 0)
+ let lamports = match account_obj.get("lamports").and_then(|v| v.as_u64()) {
+ Some(0) => {
+ debug!("Skipping closed account {} (lamports == 0)", pubkey);
+ return None;
+ }
+ Some(lamports) => lamports,
+ None => {
+ debug!("Skipping account {} with missing lamports field", pubkey);
+ return None;
+ }
+ };
+
+ let data_array = match account_obj.get("data").and_then(|v| v.as_array()) {
+ Some(arr) if !arr.is_empty() => arr,
+ _ => {
+ debug!("Skipping account {} with missing or empty data", pubkey);
+ return None;
+ }
+ };
+
+ let data_str = data_array.first()?.as_str()?;
+ let data = match base64::decode(data_str) {
+ Ok(bytes) => bytes,
+ Err(e) => {
+ debug!("Failed to decode base64 for account {}: {:?}", pubkey, e);
+ return None;
+ }
+ };
+
+ Some(RawAccountData {
+ pubkey,
+ lamports,
+ data,
+ })
+}
+
+/// Fetch current slot from RPC with timeout
+pub async fn get_current_slot(client: &reqwest::Client, rpc_url: &str) -> Result {
+ let payload = json!({
+ "jsonrpc": "2.0",
+ "id": 0,
+ "method": "getSlot",
+ "params": [{"commitment": "confirmed"}]
+ });
+
+ let result = send_rpc_request(client, rpc_url, &payload).await?;
+
+ result
+ .as_u64()
+ .ok_or_else(|| anyhow::anyhow!("Failed to extract slot from response: expected u64"))
+}
+
+/// Extract accounts array from V2 API response (handles various response formats)
+pub fn extract_accounts_array(result: &serde_json::Value) -> Option<&Vec> {
+ // Try different possible locations
+ if let Some(arr) = result.get("accounts").and_then(|v| v.as_array()) {
+ return Some(arr);
+ }
+ if let Some(arr) = result.as_array() {
+ return Some(arr);
+ }
+ if let Some(value) = result.get("value").and_then(|v| v.as_array()) {
+ return Some(value);
+ }
+ None
+}
+
+/// Extract pagination cursor from response
+pub fn extract_pagination_cursor(result: &serde_json::Value) -> Option {
+ result
+ .get("paginationKey")
+ .and_then(|c| c.as_str())
+ .map(|s| s.to_string())
+}
+
+/// Build payload for standard getProgramAccounts request
+pub fn build_standard_api_payload(
+ program_id: &Pubkey,
+ filters: Option>,
+) -> serde_json::Value {
+ let mut params = json!({
+ "encoding": "base64",
+ "commitment": "confirmed"
+ });
+
+ if let Some(filters) = filters {
+ params["filters"] = json!(filters);
+ }
+
+ json!({
+ "jsonrpc": "2.0",
+ "id": 1,
+ "method": "getProgramAccounts",
+ "params": [program_id.to_string(), params]
+ })
+}
+
+/// Build payload for V2 getProgramAccountsV2 request with pagination
+pub fn build_v2_api_payload(
+ program_id: &Pubkey,
+ page_id: i32,
+ cursor: Option<&str>,
+ filters: Option>,
+) -> serde_json::Value {
+ let mut params = json!({
+ "encoding": "base64",
+ "commitment": "confirmed",
+ "limit": PAGE_SIZE
+ });
+
+ if let Some(filters) = filters {
+ params["filters"] = json!(filters);
+ }
+
+ if let Some(c) = cursor {
+ params["paginationKey"] = json!(c);
+ }
+
+ json!({
+ "jsonrpc": "2.0",
+ "id": page_id,
+ "method": "getProgramAccountsV2",
+ "params": [program_id.to_string(), params]
+ })
+}
+
+/// Check if URL is localhost
+pub fn is_localhost(rpc_url: &str) -> bool {
+ rpc_url.contains("localhost") || rpc_url.contains("127.0.0.1")
+}
+
+/// Generic bootstrap using standard getProgramAccounts API
+///
+/// Calls `process_fn` for each account that passes initial extraction.
+/// Returns (total_fetched, total_inserted) counts.
+pub async fn bootstrap_standard_api(
+ client: &reqwest::Client,
+ rpc_url: &str,
+ program_id: &Pubkey,
+ filters: Option>,
+ shutdown_flag: Option<&std::sync::atomic::AtomicBool>,
+ mut process_fn: F,
+) -> Result<(usize, usize)>
+where
+ F: FnMut(RawAccountData) -> bool,
+{
+ let payload = build_standard_api_payload(program_id, filters);
+ let result = send_rpc_request(client, rpc_url, &payload).await?;
+
+ let accounts_array = result
+ .as_array()
+ .ok_or_else(|| anyhow::anyhow!("Could not find accounts array"))?;
+
+ let total_fetched = accounts_array.len();
+ let mut total_inserted = 0;
+
+ for account_value in accounts_array {
+ if let Some(flag) = shutdown_flag {
+ if flag.load(std::sync::atomic::Ordering::SeqCst) {
+ break;
+ }
+ }
+
+ if let Some(raw_data) = extract_account_fields(account_value) {
+ if process_fn(raw_data) {
+ total_inserted += 1;
+ }
+ }
+ }
+
+ Ok((total_fetched, total_inserted))
+}
+
+/// Generic bootstrap using V2 getProgramAccountsV2 API with pagination
+///
+/// Calls `process_fn` for each account that passes initial extraction.
+/// Returns (total_pages, total_fetched, total_inserted) counts.
+pub async fn bootstrap_v2_api(
+ client: &reqwest::Client,
+ rpc_url: &str,
+ program_id: &Pubkey,
+ filters: Option>,
+ shutdown_flag: Option<&std::sync::atomic::AtomicBool>,
+ mut process_fn: F,
+) -> Result<(usize, usize, usize)>
+where
+ F: FnMut(RawAccountData) -> bool,
+{
+ let mut total_fetched = 0;
+ let mut total_inserted = 0;
+ let mut page_count = 0;
+ let mut cursor: Option = None;
+
+ // Build the base payload once before the loop to avoid cloning filters on each iteration.
+ // We'll update only the page id and pagination cursor per iteration.
+ let mut payload = build_v2_api_payload(program_id, 1, None, filters);
+
+ loop {
+ if let Some(flag) = shutdown_flag {
+ if flag.load(std::sync::atomic::Ordering::SeqCst) {
+ break;
+ }
+ }
+
+ page_count += 1;
+
+ // Update only the page-specific fields
+ payload["id"] = json!(page_count as i32);
+ if let Some(ref c) = cursor {
+ payload["params"][1]["paginationKey"] = json!(c);
+ }
+
+ let result = send_rpc_request(client, rpc_url, &payload).await?;
+
+ let accounts_array = extract_accounts_array(&result)
+ .ok_or_else(|| anyhow::anyhow!("Could not find accounts array"))?;
+
+ let accounts_count = accounts_array.len();
+ if accounts_count == 0 {
+ debug!(
+ "Pagination returned 0 accounts on page {}, ending pagination",
+ page_count
+ );
+ break;
+ }
+
+ total_fetched += accounts_count;
+
+ for account_value in accounts_array {
+ if let Some(flag) = shutdown_flag {
+ if flag.load(std::sync::atomic::Ordering::SeqCst) {
+ break;
+ }
+ }
+
+ if let Some(raw_data) = extract_account_fields(account_value) {
+ if process_fn(raw_data) {
+ total_inserted += 1;
+ }
+ }
+ }
+
+ // Get cursor for next page
+ cursor = extract_pagination_cursor(&result);
+ if cursor.is_none() {
+ break;
+ }
+
+ // Rate limit between paginated requests
+ tokio::time::sleep(tokio::time::Duration::from_millis(
+ DEFAULT_PAGINATION_DELAY_MS,
+ ))
+ .await;
+ }
+
+ Ok((page_count, total_fetched, total_inserted))
+}
diff --git a/forester/src/compressible/config.rs b/forester/src/compressible/config.rs
index ac10229a86..46b65e35b9 100644
--- a/forester/src/compressible/config.rs
+++ b/forester/src/compressible/config.rs
@@ -1,4 +1,139 @@
+use std::str::FromStr;
+
use serde::{Deserialize, Serialize};
+use solana_sdk::pubkey::Pubkey;
+
+// =============================================================================
+// Shared Constants
+// =============================================================================
+
+/// Registry program ID for compress_and_close operations
+pub const REGISTRY_PROGRAM_ID: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX";
+
+/// Offset in CToken/Mint account data where account_type byte is stored.
+/// Used for memcmp filters to identify decompressed accounts.
+pub const ACCOUNT_TYPE_OFFSET: usize = 165;
+
+/// Base58-encoded byte value for decompressed CToken accounts (account_type = 2).
+/// In base58: "3" represents the byte value 2.
+pub const CTOKEN_ACCOUNT_TYPE_FILTER: &str = "3";
+
+/// Base58-encoded byte value for decompressed Mint accounts (account_type = 1).
+/// In base58: "2" represents the byte value 1.
+pub const MINT_ACCOUNT_TYPE_FILTER: &str = "2";
+
+/// Default page size for bootstrap pagination (number of accounts per RPC request)
+pub const DEFAULT_PAGE_SIZE: usize = 10_000;
+
+/// Default delay between paginated RPC requests (milliseconds)
+pub const DEFAULT_PAGINATION_DELAY_MS: u64 = 100;
+
+// =============================================================================
+// Configuration Structs
+// =============================================================================
+
+/// Configuration for a compressible PDA program.
+///
+/// Can be specified via CLI (using `program_id:discriminator_base58` format)
+/// or via config file using the serialized struct format.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PdaProgramConfig {
+ /// Program ID that owns the compressible PDAs (base58 string)
+ #[serde(with = "pubkey_string")]
+ pub program_id: Pubkey,
+ /// Discriminator for the compress_accounts_idempotent instruction (base58 string)
+ #[serde(with = "discriminator_base58")]
+ pub discriminator: [u8; 8],
+}
+
+mod pubkey_string {
+ use std::str::FromStr;
+
+ use serde::{Deserialize, Deserializer, Serializer};
+ use solana_sdk::pubkey::Pubkey;
+
+ pub fn serialize(pubkey: &Pubkey, serializer: S) -> Result
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&pubkey.to_string())
+ }
+
+ pub fn deserialize<'de, D>(deserializer: D) -> Result
+ where
+ D: Deserializer<'de>,
+ {
+ let s = String::deserialize(deserializer)?;
+ Pubkey::from_str(&s).map_err(serde::de::Error::custom)
+ }
+}
+
+mod discriminator_base58 {
+ use serde::{Deserialize, Deserializer, Serializer};
+
+ pub fn serialize(discriminator: &[u8; 8], serializer: S) -> Result
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&bs58::encode(discriminator).into_string())
+ }
+
+ pub fn deserialize<'de, D>(deserializer: D) -> Result<[u8; 8], D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let s = String::deserialize(deserializer)?;
+ let bytes = bs58::decode(&s)
+ .into_vec()
+ .map_err(serde::de::Error::custom)?;
+ bytes
+ .try_into()
+ .map_err(|_| serde::de::Error::custom("discriminator must be exactly 8 bytes"))
+ }
+}
+
+impl PdaProgramConfig {
+ pub fn new(program_id: Pubkey, discriminator: [u8; 8]) -> Self {
+ Self {
+ program_id,
+ discriminator,
+ }
+ }
+}
+
+impl FromStr for PdaProgramConfig {
+ type Err = String;
+
+ /// Parse from string format: "program_id:discriminator_base58"
+ /// Example: "MyProgram1111111111111111111111111111111:6kRvHBv2N3F"
+ fn from_str(s: &str) -> Result {
+ let parts: Vec<&str> = s.split(':').collect();
+ if parts.len() != 2 {
+ return Err(format!(
+ "Invalid format. Expected 'program_id:discriminator_base58', got '{}'",
+ s
+ ));
+ }
+
+ let program_id = parts[0]
+ .parse::()
+ .map_err(|e| format!("Invalid program ID '{}': {}", parts[0], e))?;
+
+ let disc_bytes = bs58::decode(parts[1])
+ .into_vec()
+ .map_err(|e| format!("Invalid discriminator base58 '{}': {}", parts[1], e))?;
+
+ let disc_len = disc_bytes.len();
+ let discriminator: [u8; 8] = disc_bytes
+ .try_into()
+ .map_err(|_| format!("Discriminator must be exactly 8 bytes, got {}", disc_len))?;
+
+ Ok(Self {
+ program_id,
+ discriminator,
+ })
+ }
+}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressibleConfig {
@@ -10,6 +145,11 @@ pub struct CompressibleConfig {
/// Maximum number of concurrent compression batches
#[serde(default = "default_max_concurrent_batches")]
pub max_concurrent_batches: usize,
+ /// Compressible PDA programs to track and compress.
+ /// Can be specified in config file or via CLI `--pda-program` flags.
+ /// CLI values are merged with config file values.
+ #[serde(default)]
+ pub pda_programs: Vec,
}
fn default_batch_size() -> usize {
@@ -26,6 +166,12 @@ impl CompressibleConfig {
ws_url,
batch_size: default_batch_size(),
max_concurrent_batches: default_max_concurrent_batches(),
+ pda_programs: Vec::new(),
}
}
+
+ pub fn with_pda_programs(mut self, pda_programs: Vec) -> Self {
+ self.pda_programs = pda_programs;
+ self
+ }
}
diff --git a/forester/src/compressible/ctoken/bootstrap.rs b/forester/src/compressible/ctoken/bootstrap.rs
new file mode 100644
index 0000000000..caf03b79d9
--- /dev/null
+++ b/forester/src/compressible/ctoken/bootstrap.rs
@@ -0,0 +1,126 @@
+use std::{sync::Arc, time::Duration};
+
+use borsh::BorshDeserialize;
+use light_token_interface::{state::Token, LIGHT_TOKEN_PROGRAM_ID};
+use serde_json::json;
+use solana_sdk::pubkey::Pubkey;
+use tokio::sync::oneshot;
+use tracing::{debug, info};
+
+use super::state::CTokenAccountTracker;
+use crate::{
+ compressible::{
+ bootstrap_helpers::{
+ bootstrap_standard_api, bootstrap_v2_api, is_localhost, RawAccountData,
+ },
+ config::{ACCOUNT_TYPE_OFFSET, CTOKEN_ACCOUNT_TYPE_FILTER},
+ },
+ Result,
+};
+
+/// Bootstrap the CToken account tracker by fetching existing accounts
+/// Uses standard getProgramAccounts for localhost, getProgramAccountsV2 for remote networks
+pub async fn bootstrap_ctoken_accounts(
+ rpc_url: String,
+ tracker: Arc,
+ shutdown_rx: Option>,
+) -> Result<()> {
+ info!("Starting bootstrap of CToken accounts");
+
+ let program_id = Pubkey::new_from_array(LIGHT_TOKEN_PROGRAM_ID);
+
+ // Set up shutdown flag
+ let shutdown_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
+
+ if let Some(rx) = shutdown_rx {
+ let shutdown_flag_clone = shutdown_flag.clone();
+ tokio::spawn(async move {
+ let _ = rx.await;
+ shutdown_flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
+ });
+ }
+
+ // Filter for decompressed CToken accounts (account_type = 2)
+ let filters = vec![json!({
+ "memcmp": {
+ "offset": ACCOUNT_TYPE_OFFSET,
+ "bytes": CTOKEN_ACCOUNT_TYPE_FILTER,
+ "encoding": "base58"
+ }
+ })];
+
+ let client = reqwest::Client::builder()
+ .timeout(Duration::from_secs(30))
+ .build()?;
+
+ // Process function that deserializes Token and updates tracker
+ let process_account = |raw_data: RawAccountData| -> bool {
+ // Deserialize Token
+ let ctoken = match Token::try_from_slice(&raw_data.data) {
+ Ok(token) => token,
+ Err(e) => {
+ debug!(
+ "Failed to deserialize Token for account {}: {:?}",
+ raw_data.pubkey, e
+ );
+ return false;
+ }
+ };
+
+ // Check if account is a valid Token account (account_type == 2)
+ if !ctoken.is_token_account() {
+ debug!(
+ "Skipping account {} - not a token account (is_token_account() == false)",
+ raw_data.pubkey
+ );
+ return false;
+ }
+
+ // Use tracker's update_from_token to avoid re-deserializing the Token
+ let account_size = raw_data.data.len();
+ if let Err(e) =
+ tracker.update_from_token(raw_data.pubkey, ctoken, raw_data.lamports, account_size)
+ {
+ debug!("Failed to insert account {}: {:?}", raw_data.pubkey, e);
+ return false;
+ }
+
+ true
+ };
+
+ if is_localhost(&rpc_url) {
+ info!("Detected localhost, using standard getProgramAccounts");
+ let (total_fetched, total_inserted) = bootstrap_standard_api(
+ &client,
+ &rpc_url,
+ &program_id,
+ Some(filters),
+ Some(&shutdown_flag),
+ process_account,
+ )
+ .await?;
+
+ info!(
+ "Bootstrap complete: {} total fetched, {} CToken accounts inserted",
+ total_fetched, total_inserted
+ );
+ } else {
+ info!("Using getProgramAccountsV2 with pagination");
+ let (page_count, total_fetched, total_inserted) = bootstrap_v2_api(
+ &client,
+ &rpc_url,
+ &program_id,
+ Some(filters),
+ Some(&shutdown_flag),
+ process_account,
+ )
+ .await?;
+
+ info!(
+ "Bootstrap finished: {} pages, {} total fetched, {} CToken accounts inserted",
+ page_count, total_fetched, total_inserted
+ );
+ }
+
+ Ok(())
+}
diff --git a/forester/src/compressible/compressor.rs b/forester/src/compressible/ctoken/compressor.rs
similarity index 79%
rename from forester/src/compressible/compressor.rs
rename to forester/src/compressible/ctoken/compressor.rs
index d460aab7a3..450d3c9484 100644
--- a/forester/src/compressible/compressor.rs
+++ b/forester/src/compressible/ctoken/compressor.rs
@@ -2,17 +2,15 @@ use std::{str::FromStr, sync::Arc};
use anchor_lang::{InstructionData, ToAccountMetas};
use forester_utils::rpc_pool::SolanaRpcPool;
-use light_client::{indexer::TreeInfo, rpc::Rpc};
-use light_compressed_account::TreeType;
+use light_client::{indexer::Indexer, rpc::Rpc};
use light_compressible::config::CompressibleConfig;
use light_registry::{
accounts::CompressAndCloseContext, compressible::compressed_token::CompressAndCloseIndices,
instruction::CompressAndClose,
};
use light_sdk::instruction::PackedAccounts;
-use light_token::compressed_token::compress_and_close::CompressAndCloseAccounts as CTokenAccounts;
+use light_token::compressed_token::CompressAndCloseAccounts as CTokenAccounts;
use light_token_interface::LIGHT_TOKEN_PROGRAM_ID;
-use solana_pubkey::pubkey;
use solana_sdk::{
instruction::Instruction,
pubkey::Pubkey,
@@ -21,19 +19,20 @@ use solana_sdk::{
};
use tracing::{debug, info};
-use super::{state::CompressibleAccountTracker, types::CompressibleAccountState};
-use crate::Result;
-
-const REGISTRY_PROGRAM_ID_STR: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX";
+use super::{state::CTokenAccountTracker, types::CTokenAccountState};
+use crate::{
+ compressible::{config::REGISTRY_PROGRAM_ID, traits::CompressibleTracker},
+ Result,
+};
-/// Compression executor that builds and sends compress_and_close transactions via registry program
-pub struct Compressor {
+/// Compression executor for CToken accounts via the registry program's compress_and_close instruction.
+pub struct CTokenCompressor {
rpc_pool: Arc>,
- tracker: Arc,
+ tracker: Arc,
payer_keypair: Keypair,
}
-impl Clone for Compressor {
+impl Clone for CTokenCompressor {
fn clone(&self) -> Self {
Self {
rpc_pool: Arc::clone(&self.rpc_pool),
@@ -43,10 +42,10 @@ impl Clone for Compressor {
}
}
-impl Compressor {
+impl CTokenCompressor {
pub fn new(
rpc_pool: Arc>,
- tracker: Arc,
+ tracker: Arc,
payer_keypair: Keypair,
) -> Self {
Self {
@@ -58,10 +57,10 @@ impl Compressor {
pub async fn compress_batch(
&self,
- account_states: &[CompressibleAccountState],
+ account_states: &[CTokenAccountState],
registered_forester_pda: Pubkey,
) -> Result {
- let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID_STR)?;
+ let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID)?;
let compressed_token_program_id = Pubkey::new_from_array(LIGHT_TOKEN_PROGRAM_ID);
// Derive compression_authority PDA deterministically (version = 1)
@@ -86,21 +85,13 @@ impl Compressor {
// Get output tree from RPC
let mut rpc = self.rpc_pool.get_connection().await?;
- // FIXME: Use latest active state tree after updating lookup tables
- // rpc.get_latest_active_state_trees()
- // .await
- // .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?;
- // let output_tree_info = rpc
- // .get_random_state_tree_info()
- // .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?;
-
- let output_tree_info = TreeInfo {
- tree: pubkey!("bmt1LryLZUMmF7ZtqESaw7wifBXLfXHQYoE4GAmrahU"),
- queue: pubkey!("oq1na8gojfdUhsfCpyjNt6h4JaDWtHf1yQj4koBWfto"),
- cpi_context: Some(pubkey!("cpi15BoVPKgEPw5o8wc2T816GE7b378nMXnhH3Xbq4y")),
- tree_type: TreeType::StateV2,
- next_tree_info: None,
- };
+ // Fetch latest active state trees and get a random one
+ rpc.get_latest_active_state_trees()
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to get state tree info: {}", e))?;
+ let output_tree_info = rpc
+ .get_random_state_tree_info()
+ .map_err(|e| anyhow::anyhow!("Failed to get random state tree info: {}", e))?;
let output_queue = output_tree_info
.get_output_pubkey()
@@ -224,8 +215,6 @@ impl Compressor {
};
// Send transaction
- // Note: Account removal from tracker is handled by LogSubscriber which parses
- // the "compress_and_close:" logs emitted by the registry program
let signature = rpc
.create_and_send_transaction(
&[ix],
@@ -241,6 +230,26 @@ impl Compressor {
signature
);
+ // Wait for confirmation before removing from tracker
+ let confirmed = rpc
+ .confirm_transaction(signature)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {}", e))?;
+
+ if confirmed {
+ // Only remove from tracker after confirmed
+ for account_state in account_states {
+ self.tracker.remove(&account_state.pubkey);
+ }
+ info!("compress_and_close tx confirmed: {}", signature);
+ } else {
+ // Transaction not confirmed - keep accounts in tracker for retry
+ tracing::warn!(
+ "compress_and_close tx not confirmed: {} - accounts kept in tracker for retry",
+ signature
+ );
+ }
+
Ok(signature)
}
}
diff --git a/forester/src/compressible/ctoken/mod.rs b/forester/src/compressible/ctoken/mod.rs
new file mode 100644
index 0000000000..272d630d91
--- /dev/null
+++ b/forester/src/compressible/ctoken/mod.rs
@@ -0,0 +1,9 @@
+mod bootstrap;
+mod compressor;
+mod state;
+mod types;
+
+pub use bootstrap::bootstrap_ctoken_accounts;
+pub use compressor::CTokenCompressor;
+pub use state::CTokenAccountTracker;
+pub use types::CTokenAccountState;
diff --git a/forester/src/compressible/ctoken/state.rs b/forester/src/compressible/ctoken/state.rs
new file mode 100644
index 0000000000..eaf0272fac
--- /dev/null
+++ b/forester/src/compressible/ctoken/state.rs
@@ -0,0 +1,144 @@
+use borsh::BorshDeserialize;
+use dashmap::DashMap;
+use light_compressible::rent::{get_rent_exemption_lamports, SLOTS_PER_EPOCH};
+use light_token_interface::state::Token;
+use solana_sdk::pubkey::Pubkey;
+use tracing::{debug, warn};
+
+use super::types::CTokenAccountState;
+use crate::{
+ compressible::traits::{CompressibleTracker, SubscriptionHandler},
+ Result,
+};
+
+fn calculate_compressible_slot(account: &Token, lamports: u64, account_size: usize) -> Result {
+ use light_token_interface::state::extensions::ExtensionStruct;
+
+ let rent_exemption = get_rent_exemption_lamports(account_size as u64)
+ .map_err(|e| anyhow::anyhow!("Failed to get rent exemption: {:?}", e))?;
+
+ let compression_info = account
+ .extensions
+ .as_ref()
+ .and_then(|exts| {
+ exts.iter().find_map(|ext| match ext {
+ ExtensionStruct::Compressible(comp) => Some(&comp.info),
+ _ => None,
+ })
+ })
+ .ok_or_else(|| anyhow::anyhow!("Missing Compressible extension on Token account"))?;
+
+ let last_funded_epoch = compression_info
+ .get_last_funded_epoch(account_size as u64, lamports, rent_exemption)
+ .map_err(|e| {
+ anyhow::anyhow!(
+ "Failed to calculate last funded epoch for account with {} lamports: {:?}",
+ lamports,
+ e
+ )
+ })?;
+
+ Ok(last_funded_epoch * SLOTS_PER_EPOCH)
+}
+
+#[derive(Debug)]
+pub struct CTokenAccountTracker {
+ accounts: DashMap,
+}
+
+impl CTokenAccountTracker {
+ pub fn new() -> Self {
+ Self {
+ accounts: DashMap::new(),
+ }
+ }
+
+ /// Returns all tracked token accounts (not mints), ignoring compressible_slot.
+ /// Use `get_ready_to_compress(current_slot)` to get only accounts ready for compression.
+ pub fn get_all_token_accounts(&self) -> Vec {
+ self.get_ready_to_compress(u64::MAX)
+ .into_iter()
+ .filter(|state| state.account.is_token_account())
+ .collect()
+ }
+
+ pub fn update_from_account(
+ &self,
+ pubkey: Pubkey,
+ account_data: &[u8],
+ lamports: u64,
+ ) -> Result<()> {
+ let ctoken = Token::try_from_slice(account_data)
+ .map_err(|e| anyhow::anyhow!("Failed to deserialize Token: {:?}", e))?;
+
+ self.update_from_token(pubkey, ctoken, lamports, account_data.len())
+ }
+
+ /// Update tracker with an already-deserialized Token.
+ /// Use this to avoid double deserialization when the Token is already available.
+ pub fn update_from_token(
+ &self,
+ pubkey: Pubkey,
+ ctoken: Token,
+ lamports: u64,
+ account_size: usize,
+ ) -> Result<()> {
+ let compressible_slot = match calculate_compressible_slot(&ctoken, lamports, account_size) {
+ Ok(slot) => slot,
+ Err(e) => {
+ warn!(
+ "Failed to calculate compressible slot for {}: {}",
+ pubkey, e
+ );
+ return Ok(());
+ }
+ };
+
+ let state = CTokenAccountState {
+ pubkey,
+ account: ctoken,
+ lamports,
+ compressible_slot,
+ };
+
+ debug!(
+ "Updated account {}: mint={:?}, owner={:?}, amount={}, compressible_slot={}",
+ pubkey,
+ state.account.mint,
+ state.account.owner,
+ state.account.amount,
+ compressible_slot
+ );
+
+ self.insert(state);
+ Ok(())
+ }
+}
+
+impl CompressibleTracker for CTokenAccountTracker {
+ fn accounts(&self) -> &DashMap {
+ &self.accounts
+ }
+}
+
+impl Default for CTokenAccountTracker {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl SubscriptionHandler for CTokenAccountTracker {
+ fn handle_update(
+ &self,
+ pubkey: Pubkey,
+ _program_id: Pubkey,
+ data: &[u8],
+ lamports: u64,
+ ) -> Result<()> {
+ self.update_from_account(pubkey, data, lamports)
+ }
+
+ fn handle_removal(&self, pubkey: &Pubkey) {
+ self.remove(pubkey);
+ }
+}
diff --git a/forester/src/compressible/ctoken/types.rs b/forester/src/compressible/ctoken/types.rs
new file mode 100644
index 0000000000..21ab03d3d1
--- /dev/null
+++ b/forester/src/compressible/ctoken/types.rs
@@ -0,0 +1,27 @@
+use light_token_interface::state::Token;
+use solana_sdk::pubkey::Pubkey;
+
+use crate::compressible::traits::CompressibleState;
+
+#[derive(Clone, Debug)]
+pub struct CTokenAccountState {
+ pub pubkey: Pubkey,
+ pub account: Token,
+ pub lamports: u64,
+ /// Ready to compress when current_slot > compressible_slot
+ pub compressible_slot: u64,
+}
+
+impl CompressibleState for CTokenAccountState {
+ fn pubkey(&self) -> &Pubkey {
+ &self.pubkey
+ }
+
+ fn lamports(&self) -> u64 {
+ self.lamports
+ }
+
+ fn compressible_slot(&self) -> u64 {
+ self.compressible_slot
+ }
+}
diff --git a/forester/src/compressible/mint/bootstrap.rs b/forester/src/compressible/mint/bootstrap.rs
new file mode 100644
index 0000000000..b20aebfe9d
--- /dev/null
+++ b/forester/src/compressible/mint/bootstrap.rs
@@ -0,0 +1,108 @@
+use std::{sync::Arc, time::Duration};
+
+use tokio::sync::oneshot;
+use tracing::{debug, info};
+
+use super::state::MintAccountTracker;
+use crate::{
+ compressible::{
+ bootstrap_helpers::{
+ bootstrap_standard_api, bootstrap_v2_api, is_localhost, RawAccountData,
+ },
+ config::{ACCOUNT_TYPE_OFFSET, MINT_ACCOUNT_TYPE_FILTER},
+ traits::CompressibleTracker,
+ },
+ Result,
+};
+
+/// Bootstrap the Mint account tracker by fetching decompressed mints
+pub async fn bootstrap_mint_accounts(
+ rpc_url: String,
+ tracker: Arc,
+ shutdown_rx: Option>,
+) -> Result<()> {
+ info!("Starting bootstrap of decompressed Mint accounts");
+
+ // Set up shutdown flag
+ let shutdown_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
+
+ if let Some(rx) = shutdown_rx {
+ let shutdown_flag_clone = shutdown_flag.clone();
+ tokio::spawn(async move {
+ let _ = rx.await;
+ shutdown_flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
+ });
+ }
+
+ let client = reqwest::Client::builder()
+ .timeout(Duration::from_secs(30))
+ .build()?;
+
+ // Light Token Program ID
+ let program_id =
+ solana_sdk::pubkey::Pubkey::new_from_array(light_token_interface::LIGHT_TOKEN_PROGRAM_ID);
+
+ info!(
+ "Bootstrapping decompressed Mint accounts from program {}",
+ program_id
+ );
+
+ // Process function that updates tracker
+ let process_account = |raw_data: RawAccountData| -> bool {
+ if let Err(e) =
+ tracker.update_from_account(raw_data.pubkey, &raw_data.data, raw_data.lamports)
+ {
+ debug!("Failed to insert mint {}: {:?}", raw_data.pubkey, e);
+ return false;
+ }
+ true
+ };
+
+ // Filter for decompressed Mint accounts (account_type = 1)
+ let filters = Some(vec![serde_json::json!({
+ "memcmp": {
+ "offset": ACCOUNT_TYPE_OFFSET,
+ "bytes": MINT_ACCOUNT_TYPE_FILTER,
+ "encoding": "base58"
+ }
+ })]);
+
+ if is_localhost(&rpc_url) {
+ let (total_fetched, total_inserted) = bootstrap_standard_api(
+ &client,
+ &rpc_url,
+ &program_id,
+ filters,
+ Some(&shutdown_flag),
+ process_account,
+ )
+ .await?;
+
+ info!(
+ "Mint bootstrap complete: {} fetched, {} decompressed mints tracked",
+ total_fetched, total_inserted
+ );
+ } else {
+ let (page_count, total_fetched, total_inserted) = bootstrap_v2_api(
+ &client,
+ &rpc_url,
+ &program_id,
+ filters,
+ Some(&shutdown_flag),
+ process_account,
+ )
+ .await?;
+
+ info!(
+ "Mint bootstrap finished: {} pages, {} fetched, {} decompressed mints tracked",
+ page_count, total_fetched, total_inserted
+ );
+ }
+
+ info!(
+ "Mint bootstrap finished: {} total mints tracked",
+ tracker.len()
+ );
+
+ Ok(())
+}
diff --git a/forester/src/compressible/mint/compressor.rs b/forester/src/compressible/mint/compressor.rs
new file mode 100644
index 0000000000..259416f05d
--- /dev/null
+++ b/forester/src/compressible/mint/compressor.rs
@@ -0,0 +1,278 @@
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
+
+use forester_utils::rpc_pool::SolanaRpcPool;
+use futures::StreamExt;
+use light_client::{indexer::Indexer, rpc::Rpc};
+use light_token_client::instructions::mint_action::{
+ create_mint_action_instruction, MintActionParams, MintActionType,
+};
+use solana_sdk::{
+ instruction::Instruction,
+ signature::{Keypair, Signature},
+ signer::Signer,
+};
+use tracing::{debug, info};
+
+use super::{state::MintAccountTracker, types::MintAccountState};
+use crate::{compressible::traits::CompressibleTracker, Result};
+
+/// Compressor for decompressed Mint accounts - builds and sends CompressAndCloseMint transactions.
+pub struct MintCompressor {
+ rpc_pool: Arc>,
+ tracker: Arc,
+ payer_keypair: Keypair,
+}
+
+impl Clone for MintCompressor {
+ fn clone(&self) -> Self {
+ Self {
+ rpc_pool: Arc::clone(&self.rpc_pool),
+ tracker: Arc::clone(&self.tracker),
+ payer_keypair: self.payer_keypair.insecure_clone(),
+ }
+ }
+}
+
+impl MintCompressor {
+ pub fn new(
+ rpc_pool: Arc>,
+ tracker: Arc,
+ payer_keypair: Keypair,
+ ) -> Self {
+ Self {
+ rpc_pool,
+ tracker,
+ payer_keypair,
+ }
+ }
+
+ /// Compress multiple Mint accounts in a single transaction.
+ pub async fn compress_batch(&self, mint_states: &[MintAccountState]) -> Result {
+ if mint_states.is_empty() {
+ return Err(anyhow::anyhow!("No mints to compress"));
+ }
+
+ debug!(
+ "Building {} CompressAndCloseMint instructions in parallel",
+ mint_states.len()
+ );
+
+ // Build all instructions in parallel
+ let instruction_futures = mint_states.iter().map(|mint_state| {
+ let rpc_pool = self.rpc_pool.clone();
+ let payer = self.payer_keypair.pubkey();
+ let mint_seed = mint_state.mint_seed;
+ let compressed_address = mint_state.compressed_address;
+ let mint_pda = mint_state.pubkey;
+
+ async move {
+ let mut rpc = rpc_pool.get_connection().await?;
+
+ let params = MintActionParams {
+ compressed_mint_address: compressed_address,
+ mint_seed,
+ authority: payer,
+ payer,
+ actions: vec![MintActionType::CompressAndCloseMint { idempotent: true }],
+ new_mint: None,
+ };
+
+ let ix = create_mint_action_instruction(&mut *rpc, params)
+ .await
+ .map_err(|e| {
+ anyhow::anyhow!(
+ "Failed to build CompressAndCloseMint instruction for {}: {:?}",
+ mint_pda,
+ e
+ )
+ })?;
+
+ Ok::(ix)
+ }
+ });
+
+ // Wait for all instructions to be built
+ let instructions: Vec =
+ futures::future::try_join_all(instruction_futures).await?;
+
+ debug!(
+ "Built {} instructions, sending in single transaction",
+ instructions.len()
+ );
+
+ // Send all instructions in a single transaction
+ let mut rpc = self.rpc_pool.get_connection().await?;
+ let signature = rpc
+ .create_and_send_transaction(
+ &instructions,
+ &self.payer_keypair.pubkey(),
+ &[&self.payer_keypair],
+ )
+ .await
+ .map_err(|e| {
+ anyhow::anyhow!(
+ "Failed to send batched CompressAndCloseMint transaction: {:?}",
+ e
+ )
+ })?;
+
+ info!(
+ "Batched CompressAndCloseMint tx for {} mints sent: {}",
+ mint_states.len(),
+ signature
+ );
+
+ // Wait for confirmation before removing from tracker
+ let confirmed = rpc
+ .confirm_transaction(signature)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {:?}", e))?;
+
+ if confirmed {
+ // Only remove from tracker after confirmed
+ for mint_state in mint_states {
+ self.tracker.remove(&mint_state.pubkey);
+ }
+ info!("Batched CompressAndCloseMint tx confirmed: {}", signature);
+ Ok(signature)
+ } else {
+ tracing::warn!(
+ "Batch CompressAndCloseMint tx not confirmed: {} - accounts kept in tracker for retry",
+ signature
+ );
+ Err(anyhow::anyhow!(
+ "Batch CompressAndCloseMint tx not confirmed: {}",
+ signature
+ ))
+ }
+ }
+
+ /// Compress a batch of decompressed Mint accounts with concurrent execution.
+ ///
+ /// Each mint gets its own transaction, executed concurrently with cancellation support.
+ /// Successfully compressed accounts are removed from the tracker.
+ /// Use this when you need fine-grained control over individual compressions.
+ pub async fn compress_batch_concurrent(
+ &self,
+ mint_states: &[MintAccountState],
+ max_concurrent: usize,
+ cancelled: Arc,
+ ) -> Vec>
+ {
+ if mint_states.is_empty() {
+ return Vec::new();
+ }
+
+ // Guard against max_concurrent == 0 to avoid buffer_unordered panic
+ if max_concurrent == 0 {
+ return mint_states
+ .iter()
+ .cloned()
+ .map(|mint_state| Err((mint_state, anyhow::anyhow!("max_concurrent must be > 0"))))
+ .collect();
+ }
+
+ // Create futures for each mint
+ let compression_futures = mint_states.iter().cloned().map(|mint_state| {
+ let compressor = self.clone();
+ let cancelled = cancelled.clone();
+ async move {
+ // Check cancellation before processing
+ if cancelled.load(Ordering::Relaxed) {
+ return Err((mint_state, anyhow::anyhow!("Cancelled")));
+ }
+
+ match compressor.compress(&mint_state).await {
+ Ok(sig) => Ok((sig, mint_state)),
+ Err(e) => Err((mint_state, e)),
+ }
+ }
+ });
+
+ // Execute concurrently with limit
+ let results: Vec<_> = futures::stream::iter(compression_futures)
+ .buffer_unordered(max_concurrent)
+ .collect()
+ .await;
+
+ // Remove successfully compressed mints from tracker
+ for (_, mint_state) in results.iter().flatten() {
+ self.tracker.remove(&mint_state.pubkey);
+ }
+
+ results
+ }
+
+ /// Compress a single decompressed Mint account.
+ async fn compress(&self, mint_state: &MintAccountState) -> Result {
+ let mint_pda = &mint_state.pubkey;
+ let mint_seed = &mint_state.mint_seed;
+ let compressed_address = mint_state.compressed_address;
+
+ debug!(
+ "Compressing Mint PDA {} (seed: {}, compressed_address: {:?})",
+ mint_pda, mint_seed, compressed_address
+ );
+
+ let mut rpc = self.rpc_pool.get_connection().await?;
+
+ // Build the CompressAndCloseMint instruction using the mint action builder
+ // This is idempotent - succeeds silently if mint doesn't exist or is already compressed
+ let params = MintActionParams {
+ compressed_mint_address: compressed_address,
+ mint_seed: *mint_seed,
+ authority: self.payer_keypair.pubkey(),
+ payer: self.payer_keypair.pubkey(),
+ actions: vec![MintActionType::CompressAndCloseMint { idempotent: true }],
+ new_mint: None,
+ };
+
+ let ix = create_mint_action_instruction(&mut *rpc, params)
+ .await
+ .map_err(|e| {
+ anyhow::anyhow!("Failed to build CompressAndCloseMint instruction: {:?}", e)
+ })?;
+
+ debug!(
+ "Built CompressAndCloseMint instruction for Mint {}",
+ mint_pda
+ );
+
+ // Send transaction
+ let signature = rpc
+ .create_and_send_transaction(
+ &[ix],
+ &self.payer_keypair.pubkey(),
+ &[&self.payer_keypair],
+ )
+ .await
+ .map_err(|e| {
+ anyhow::anyhow!("Failed to send CompressAndCloseMint transaction: {:?}", e)
+ })?;
+
+ info!(
+ "CompressAndCloseMint tx for Mint {} sent: {}",
+ mint_pda, signature
+ );
+
+ // Wait for confirmation
+ let confirmed = rpc
+ .confirm_transaction(signature)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {:?}", e))?;
+
+ if confirmed {
+ info!("CompressAndCloseMint tx for Mint {} confirmed", mint_pda);
+ Ok(signature)
+ } else {
+ Err(anyhow::anyhow!(
+ "Transaction {} not confirmed for Mint {}",
+ signature,
+ mint_pda
+ ))
+ }
+ }
+}
diff --git a/forester/src/compressible/mint/mod.rs b/forester/src/compressible/mint/mod.rs
new file mode 100644
index 0000000000..4687f8be16
--- /dev/null
+++ b/forester/src/compressible/mint/mod.rs
@@ -0,0 +1,9 @@
+mod bootstrap;
+mod compressor;
+mod state;
+mod types;
+
+pub use bootstrap::bootstrap_mint_accounts;
+pub use compressor::MintCompressor;
+pub use state::MintAccountTracker;
+pub use types::MintAccountState;
diff --git a/forester/src/compressible/mint/state.rs b/forester/src/compressible/mint/state.rs
new file mode 100644
index 0000000000..db19e6dd10
--- /dev/null
+++ b/forester/src/compressible/mint/state.rs
@@ -0,0 +1,144 @@
+use borsh::BorshDeserialize;
+use dashmap::DashMap;
+use light_compressible::rent::{
+ get_last_funded_epoch, get_rent_exemption_lamports, SLOTS_PER_EPOCH,
+};
+use light_token_interface::state::{Mint, ACCOUNT_TYPE_MINT};
+use solana_sdk::pubkey::Pubkey;
+use tracing::{debug, warn};
+
+use super::types::MintAccountState;
+use crate::{
+ compressible::traits::{CompressibleTracker, SubscriptionHandler},
+ Result,
+};
+
+pub const ACCOUNT_TYPE_OFFSET: usize = 165;
+
+fn calculate_compressible_slot(mint: &Mint, lamports: u64, account_size: usize) -> Result {
+ let rent_exemption = get_rent_exemption_lamports(account_size as u64)
+ .map_err(|e| anyhow::anyhow!("Failed to get rent exemption: {:?}", e))?;
+ let compression_info = &mint.compression;
+
+ let last_funded_epoch = get_last_funded_epoch(
+ account_size as u64,
+ lamports,
+ compression_info.last_claimed_slot,
+ &compression_info.rent_config,
+ rent_exemption,
+ );
+
+ Ok(last_funded_epoch * SLOTS_PER_EPOCH)
+}
+
+#[derive(Debug)]
+pub struct MintAccountTracker {
+ accounts: DashMap,
+}
+
+impl MintAccountTracker {
+ pub fn new() -> Self {
+ Self {
+ accounts: DashMap::new(),
+ }
+ }
+
+ pub fn update_from_account(
+ &self,
+ pubkey: Pubkey,
+ account_data: &[u8],
+ lamports: u64,
+ ) -> Result<()> {
+ if account_data.len() <= ACCOUNT_TYPE_OFFSET {
+ debug!("Mint account {} too short, skipping", pubkey);
+ return Ok(());
+ }
+
+ if account_data[ACCOUNT_TYPE_OFFSET] != ACCOUNT_TYPE_MINT {
+ debug!("Account {} is not a Mint, skipping", pubkey);
+ return Ok(());
+ }
+
+ let mint = match Mint::try_from_slice(account_data) {
+ Ok(m) => m,
+ Err(e) => {
+ debug!("Failed to deserialize Mint {}: {:?}", pubkey, e);
+ return Ok(());
+ }
+ };
+
+ if !mint.metadata.mint_decompressed {
+ debug!("Mint {} is not decompressed, skipping", pubkey);
+ return Ok(());
+ }
+
+ let expected_mint = Pubkey::new_from_array(mint.metadata.mint.to_bytes());
+ if expected_mint != pubkey {
+ warn!(
+ "Mint PDA mismatch: expected {} but found {}",
+ expected_mint, pubkey
+ );
+ return Ok(());
+ }
+
+ let compressible_slot =
+ match calculate_compressible_slot(&mint, lamports, account_data.len()) {
+ Ok(slot) => slot,
+ Err(e) => {
+ warn!(
+ "Failed to calculate compressible slot for {}: {:?}",
+ pubkey, e
+ );
+ return Ok(());
+ }
+ };
+
+ let mint_seed = Pubkey::new_from_array(mint.metadata.mint_signer);
+ let compressed_address = mint.metadata.compressed_address();
+
+ let state = MintAccountState {
+ pubkey,
+ mint_seed,
+ compressed_address,
+ mint,
+ lamports,
+ compressible_slot,
+ };
+
+ debug!(
+ "Updated Mint {}: mint_seed={}, compressible_slot={}",
+ pubkey, mint_seed, compressible_slot
+ );
+
+ self.insert(state);
+ Ok(())
+ }
+}
+
+impl CompressibleTracker for MintAccountTracker {
+ fn accounts(&self) -> &DashMap {
+ &self.accounts
+ }
+}
+
+impl Default for MintAccountTracker {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl SubscriptionHandler for MintAccountTracker {
+ fn handle_update(
+ &self,
+ pubkey: Pubkey,
+ _program_id: Pubkey,
+ data: &[u8],
+ lamports: u64,
+ ) -> Result<()> {
+ self.update_from_account(pubkey, data, lamports)
+ }
+
+ fn handle_removal(&self, pubkey: &Pubkey) {
+ self.remove(pubkey);
+ }
+}
diff --git a/forester/src/compressible/mint/types.rs b/forester/src/compressible/mint/types.rs
new file mode 100644
index 0000000000..655e7532eb
--- /dev/null
+++ b/forester/src/compressible/mint/types.rs
@@ -0,0 +1,29 @@
+use light_token_interface::state::Mint;
+use solana_sdk::pubkey::Pubkey;
+
+use crate::compressible::traits::CompressibleState;
+
+#[derive(Clone, Debug)]
+pub struct MintAccountState {
+ pub pubkey: Pubkey,
+ pub mint_seed: Pubkey,
+ pub compressed_address: [u8; 32],
+ pub mint: Mint,
+ pub lamports: u64,
+ /// Ready to compress when current_slot > compressible_slot
+ pub compressible_slot: u64,
+}
+
+impl CompressibleState for MintAccountState {
+ fn pubkey(&self) -> &Pubkey {
+ &self.pubkey
+ }
+
+ fn lamports(&self) -> u64 {
+ self.lamports
+ }
+
+ fn compressible_slot(&self) -> u64 {
+ self.compressible_slot
+ }
+}
diff --git a/forester/src/compressible/mod.rs b/forester/src/compressible/mod.rs
index 2cd8d30b5b..849f42990b 100644
--- a/forester/src/compressible/mod.rs
+++ b/forester/src/compressible/mod.rs
@@ -1,13 +1,21 @@
-pub mod bootstrap;
-pub mod compressor;
+pub mod bootstrap_helpers;
pub mod config;
-pub mod state;
+pub mod ctoken;
+pub mod mint;
+pub mod pda;
pub mod subscriber;
-pub mod types;
+pub mod traits;
+pub mod validation;
-pub use bootstrap::bootstrap_compressible_accounts;
-pub use compressor::Compressor;
-pub use config::CompressibleConfig;
-pub use state::CompressibleAccountTracker;
-pub use subscriber::{AccountSubscriber, LogSubscriber};
-pub use types::CompressibleAccountState;
+pub use config::{
+ CompressibleConfig, PdaProgramConfig, ACCOUNT_TYPE_OFFSET, CTOKEN_ACCOUNT_TYPE_FILTER,
+ DEFAULT_PAGE_SIZE, DEFAULT_PAGINATION_DELAY_MS, MINT_ACCOUNT_TYPE_FILTER, REGISTRY_PROGRAM_ID,
+};
+pub use ctoken::{
+ bootstrap_ctoken_accounts, CTokenAccountState, CTokenAccountTracker, CTokenCompressor,
+};
+pub use mint::{bootstrap_mint_accounts, MintAccountState, MintAccountTracker, MintCompressor};
+pub use pda::{PdaAccountState, PdaAccountTracker, PdaCompressor};
+pub use subscriber::{AccountSubscriber, MemcmpFilter, ReconnectConfig, SubscriptionConfig};
+pub use traits::SubscriptionHandler;
+pub use validation::validate_compressible_config;
diff --git a/forester/src/compressible/pda/bootstrap.rs b/forester/src/compressible/pda/bootstrap.rs
new file mode 100644
index 0000000000..0142b56570
--- /dev/null
+++ b/forester/src/compressible/pda/bootstrap.rs
@@ -0,0 +1,144 @@
+use std::{sync::Arc, time::Duration};
+
+use tokio::sync::oneshot;
+use tracing::{debug, error, info};
+
+use super::state::PdaAccountTracker;
+use crate::{
+ compressible::{
+ bootstrap_helpers::{
+ bootstrap_standard_api, bootstrap_v2_api, is_localhost, RawAccountData,
+ },
+ config::PdaProgramConfig,
+ traits::CompressibleTracker,
+ },
+ Result,
+};
+
+/// Bootstrap the PDA account tracker by fetching existing accounts for all configured programs
+pub async fn bootstrap_pda_accounts(
+ rpc_url: String,
+ tracker: Arc,
+ shutdown_rx: Option>,
+) -> Result<()> {
+ info!("Starting bootstrap of compressible PDA accounts");
+
+ let programs = tracker.programs().to_vec();
+ if programs.is_empty() {
+ info!("No PDA programs configured, skipping PDA bootstrap");
+ return Ok(());
+ }
+
+ let shutdown_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
+
+ if let Some(rx) = shutdown_rx {
+ let shutdown_flag_clone = shutdown_flag.clone();
+ tokio::spawn(async move {
+ let _ = rx.await;
+ shutdown_flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
+ });
+ }
+
+ let client = reqwest::Client::builder()
+ .timeout(Duration::from_secs(30))
+ .build()?;
+
+ for program_config in programs {
+ if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
+ info!("Shutdown requested, stopping PDA bootstrap");
+ break;
+ }
+
+ info!(
+ "Bootstrapping PDA accounts for program {}",
+ program_config.program_id
+ );
+
+ let result =
+ bootstrap_program(&client, &rpc_url, &tracker, &program_config, &shutdown_flag).await;
+
+ if let Err(e) = result {
+ error!(
+ "Failed to bootstrap program {}: {:?}",
+ program_config.program_id, e
+ );
+ // Continue with other programs
+ }
+ }
+
+ info!(
+ "PDA bootstrap finished: {} total accounts tracked",
+ tracker.len()
+ );
+
+ Ok(())
+}
+
+/// Bootstrap a single program's accounts
+async fn bootstrap_program(
+ client: &reqwest::Client,
+ rpc_url: &str,
+ tracker: &PdaAccountTracker,
+ program_config: &PdaProgramConfig,
+ shutdown_flag: &std::sync::atomic::AtomicBool,
+) -> Result<()> {
+ let program_id = &program_config.program_id;
+
+ // Process function that updates tracker
+ let process_account = |raw_data: RawAccountData| -> bool {
+ if let Err(e) = tracker.update_from_account(
+ raw_data.pubkey,
+ *program_id,
+ &raw_data.data,
+ raw_data.lamports,
+ ) {
+ debug!("Failed to insert account {}: {:?}", raw_data.pubkey, e);
+ return false;
+ }
+ true
+ };
+
+ // Build memcmp filter for discriminator at offset 0
+ let discriminator_base58 = bs58::encode(&program_config.discriminator).into_string();
+ let filters = Some(vec![serde_json::json!({
+ "memcmp": {
+ "offset": 0,
+ "bytes": discriminator_base58,
+ "encoding": "base58"
+ }
+ })]);
+
+ if is_localhost(rpc_url) {
+ let (total_fetched, total_inserted) = bootstrap_standard_api(
+ client,
+ rpc_url,
+ program_id,
+ filters,
+ Some(shutdown_flag),
+ process_account,
+ )
+ .await?;
+
+ info!(
+ "Bootstrap complete for program {}: {} fetched, {} compressible",
+ program_id, total_fetched, total_inserted
+ );
+ } else {
+ let (page_count, total_fetched, total_inserted) = bootstrap_v2_api(
+ client,
+ rpc_url,
+ program_id,
+ filters,
+ Some(shutdown_flag),
+ process_account,
+ )
+ .await?;
+
+ info!(
+ "Bootstrap finished for program {}: {} pages, {} fetched, {} compressible",
+ program_id, page_count, total_fetched, total_inserted
+ );
+ }
+
+ Ok(())
+}
diff --git a/forester/src/compressible/pda/compressor.rs b/forester/src/compressible/pda/compressor.rs
new file mode 100644
index 0000000000..5b875c997b
--- /dev/null
+++ b/forester/src/compressible/pda/compressor.rs
@@ -0,0 +1,405 @@
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
+
+use borsh::BorshDeserialize;
+use forester_utils::rpc_pool::SolanaRpcPool;
+use futures::StreamExt;
+use light_client::{
+ indexer::Indexer,
+ interface::instructions::{
+ build_compress_accounts_idempotent, COMPRESS_ACCOUNTS_IDEMPOTENT_DISCRIMINATOR,
+ },
+ rpc::Rpc,
+};
+use light_compressed_account::address::derive_address;
+use light_sdk::interface::config::LightConfig;
+use solana_sdk::{
+ instruction::AccountMeta,
+ pubkey::Pubkey,
+ signature::{Keypair, Signature},
+ signer::Signer,
+};
+use tracing::{debug, info};
+
+use super::{state::PdaAccountTracker, types::PdaAccountState};
+use crate::{
+ compressible::{config::PdaProgramConfig, traits::CompressibleTracker},
+ Result,
+};
+
+/// Cached program configuration to avoid repeated RPC calls
+#[derive(Clone)]
+pub struct CachedProgramConfig {
+ pub config_pda: Pubkey,
+ pub rent_sponsor: Pubkey,
+ pub compression_authority: Pubkey,
+ pub address_tree: Pubkey,
+ pub program_metas: Vec,
+}
+
+/// Compressor for PDA accounts - builds and sends compress_accounts_idempotent transactions
+/// with concurrent execution support and config caching.
+pub struct PdaCompressor {
+ rpc_pool: Arc>,
+ tracker: Arc,
+ payer_keypair: Keypair,
+}
+
+impl Clone for PdaCompressor {
+ fn clone(&self) -> Self {
+ Self {
+ rpc_pool: Arc::clone(&self.rpc_pool),
+ tracker: Arc::clone(&self.tracker),
+ payer_keypair: self.payer_keypair.insecure_clone(),
+ }
+ }
+}
+
+impl PdaCompressor {
+ pub fn new(
+ rpc_pool: Arc>,
+ tracker: Arc,
+ payer_keypair: Keypair,
+ ) -> Self {
+ Self {
+ rpc_pool,
+ tracker,
+ payer_keypair,
+ }
+ }
+
+ /// Fetch and cache the program configuration.
+ /// This should be called once per program before processing accounts.
+ pub async fn fetch_program_config(
+ &self,
+ program_config: &PdaProgramConfig,
+ ) -> Result {
+ let program_id = &program_config.program_id;
+
+ // Get the compressible config PDA for this program (config_bump = 0)
+ let (config_pda, _) = LightConfig::derive_pda(program_id, 0);
+
+ // Fetch the config to get rent_sponsor and address_space
+ let rpc = self.rpc_pool.get_connection().await?;
+ let config_account = rpc
+ .get_account(config_pda)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to get config account: {:?}", e))?
+ .ok_or_else(|| {
+ anyhow::anyhow!("Config account not found for program {}", program_id)
+ })?;
+
+ // LightConfig is stored with raw Borsh serialization (no Anchor discriminator)
+ let config = LightConfig::try_from_slice(&config_account.data)
+ .map_err(|e| anyhow::anyhow!("Failed to deserialize config: {:?}", e))?;
+
+ // Validate config at startup to fail fast on misconfigurations
+ config.validate().map_err(|e| {
+ anyhow::anyhow!(
+ "LightConfig validation failed for program {}: {:?}",
+ program_id,
+ e
+ )
+ })?;
+
+ let rent_sponsor = config.rent_sponsor;
+ let compression_authority = config.compression_authority;
+ let address_tree = *config
+ .address_space
+ .first()
+ .ok_or_else(|| anyhow::anyhow!("Config has no address space"))?;
+
+ // CompressAccountsIdempotent expects 4 accounts:
+ // 1. fee_payer (signer, writable)
+ // 2. config (read-only)
+ // 3. rent_sponsor (writable)
+ // 4. compression_authority (writable) - must match config.compression_authority
+ let program_metas = vec![
+ AccountMeta::new(self.payer_keypair.pubkey(), true), // fee_payer
+ AccountMeta::new_readonly(config_pda, false), // config
+ AccountMeta::new(rent_sponsor, false), // rent_sponsor
+ AccountMeta::new(compression_authority, false), // compression_authority
+ ];
+
+ Ok(CachedProgramConfig {
+ config_pda,
+ rent_sponsor,
+ compression_authority,
+ address_tree,
+ program_metas,
+ })
+ }
+
+ /// Compress a batch of PDA accounts with concurrent execution.
+ ///
+ /// Successfully compressed accounts are removed from the tracker.
+ pub async fn compress_batch_concurrent(
+ &self,
+ account_states: &[PdaAccountState],
+ program_config: &PdaProgramConfig,
+ cached_config: &CachedProgramConfig,
+ max_concurrent: usize,
+ cancelled: Arc,
+ ) -> Vec>
+ {
+ if account_states.is_empty() {
+ return Vec::new();
+ }
+
+ // Create futures for each account
+ let compression_futures = account_states.iter().cloned().map(|account_state| {
+ let compressor = self.clone();
+ let program_config = program_config.clone();
+ let cached_config = cached_config.clone();
+ let cancelled = cancelled.clone();
+
+ async move {
+ // Check cancellation before processing
+ if cancelled.load(Ordering::Relaxed) {
+ return Err((account_state, anyhow::anyhow!("Cancelled")));
+ }
+
+ match compressor
+ .compress(&account_state, &program_config, &cached_config)
+ .await
+ {
+ Ok(sig) => Ok((sig, account_state)),
+ Err(e) => Err((account_state, e)),
+ }
+ }
+ });
+
+ // Execute concurrently with limit
+ let results: Vec<_> = futures::stream::iter(compression_futures)
+ .buffer_unordered(max_concurrent)
+ .collect()
+ .await;
+
+ // Remove successfully compressed PDAs from tracker
+ for (_, pda_state) in results.iter().flatten() {
+ self.tracker.remove(&pda_state.pubkey);
+ }
+
+ results
+ }
+
+ /// Compress multiple PDA accounts in a single transaction.
+ ///
+ /// This method:
+ /// 1. Fetches all compressed accounts in parallel
+ /// 2. Gets a single validity proof for all accounts
+ /// 3. Builds a single instruction with all accounts
+ /// 4. Sends a single transaction
+ ///
+ /// Returns the transaction signature on success.
+ pub async fn compress_batch(
+ &self,
+ account_states: &[PdaAccountState],
+ program_config: &PdaProgramConfig,
+ ) -> Result {
+ if account_states.is_empty() {
+ return Err(anyhow::anyhow!("No accounts to compress"));
+ }
+
+ let program_id = &program_config.program_id;
+
+ // Fetch and cache config
+ let cached_config = self.fetch_program_config(program_config).await?;
+
+ let mut rpc = self.rpc_pool.get_connection().await?;
+
+ // Derive compressed addresses for all accounts
+ let compressed_addresses: Vec<[u8; 32]> = account_states
+ .iter()
+ .map(|state| {
+ derive_address(
+ &state.pubkey.to_bytes(),
+ &cached_config.address_tree.to_bytes(),
+ &program_id.to_bytes(),
+ )
+ })
+ .collect();
+
+ // Fetch all compressed accounts in parallel
+ let compressed_account_futures = compressed_addresses.iter().map(|addr| {
+ let rpc_clone = self.rpc_pool.clone();
+ let addr = *addr;
+ async move {
+ let rpc = rpc_clone.get_connection().await?;
+ rpc.get_compressed_account(addr, None)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to get compressed account: {:?}", e))?
+ .value
+ .ok_or_else(|| anyhow::anyhow!("Compressed account not found: {:?}", addr))
+ }
+ });
+
+ let compressed_accounts: Vec<_> = futures::future::try_join_all(compressed_account_futures)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to fetch compressed accounts: {:?}", e))?;
+
+ // Collect all hashes for a single validity proof request
+ let hashes: Vec<[u8; 32]> = compressed_accounts.iter().map(|acc| acc.hash).collect();
+
+ debug!(
+ "Fetching batched validity proof for {} accounts",
+ hashes.len()
+ );
+
+ // Get single validity proof for all accounts
+ let proof_with_context = rpc
+ .get_validity_proof(hashes, vec![], None)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to get validity proof: {:?}", e))?
+ .value;
+
+ // Build pubkeys array
+ let pubkeys: Vec = account_states.iter().map(|s| s.pubkey).collect();
+
+ // Build single batched instruction
+ let ix = build_compress_accounts_idempotent(
+ program_id,
+ &COMPRESS_ACCOUNTS_IDEMPOTENT_DISCRIMINATOR,
+ &pubkeys,
+ &cached_config.program_metas,
+ proof_with_context,
+ )
+ .map_err(|e| anyhow::anyhow!("Failed to build instruction: {:?}", e))?;
+
+ debug!(
+ "Built batched compress_accounts_idempotent for {} PDAs (program {})",
+ account_states.len(),
+ program_id
+ );
+
+ // Send single transaction
+ let signature = rpc
+ .create_and_send_transaction(
+ &[ix],
+ &self.payer_keypair.pubkey(),
+ &[&self.payer_keypair],
+ )
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to send transaction: {:?}", e))?;
+
+ info!(
+ "Batched compress_accounts_idempotent tx for {} PDAs sent: {}",
+ account_states.len(),
+ signature
+ );
+
+ // Wait for confirmation before removing from tracker
+ let confirmed = rpc
+ .confirm_transaction(signature)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {:?}", e))?;
+
+ if confirmed {
+ // Only remove from tracker after confirmed
+ for state in account_states {
+ self.tracker.remove(&state.pubkey);
+ }
+ info!(
+ "Batched compress_accounts_idempotent tx confirmed: {}",
+ signature
+ );
+ } else {
+ tracing::warn!(
+ "compress_accounts_idempotent tx not confirmed: {} - accounts kept in tracker for retry",
+ signature
+ );
+ }
+
+ Ok(signature)
+ }
+
+ /// Compress a single PDA account using cached config
+ async fn compress(
+ &self,
+ account_state: &PdaAccountState,
+ program_config: &PdaProgramConfig,
+ cached_config: &CachedProgramConfig,
+ ) -> Result {
+ let program_id = &program_config.program_id;
+ let pda = &account_state.pubkey;
+
+ // Derive the compressed address
+ let compressed_address = derive_address(
+ &pda.to_bytes(),
+ &cached_config.address_tree.to_bytes(),
+ &program_id.to_bytes(),
+ );
+
+ let mut rpc = self.rpc_pool.get_connection().await?;
+
+ // Get the compressed account
+ let compressed_account = rpc
+ .get_compressed_account(compressed_address, None)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to get compressed account: {:?}", e))?
+ .value
+ .ok_or_else(|| {
+ anyhow::anyhow!(
+ "Compressed account not found for PDA {}. Address: {:?}",
+ pda,
+ compressed_address
+ )
+ })?;
+
+ // Get validity proof
+ let proof_with_context = rpc
+ .get_validity_proof(vec![compressed_account.hash], vec![], None)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to get validity proof: {:?}", e))?
+ .value;
+
+ // Build compress_accounts_idempotent instruction
+ let ix = build_compress_accounts_idempotent(
+ program_id,
+ &COMPRESS_ACCOUNTS_IDEMPOTENT_DISCRIMINATOR,
+ &[*pda],
+ &cached_config.program_metas,
+ proof_with_context,
+ )
+ .map_err(|e| anyhow::anyhow!("Failed to build instruction: {:?}", e))?;
+
+ debug!(
+ "Built compress_accounts_idempotent for PDA {} (program {})",
+ pda, program_id
+ );
+
+ // Send transaction
+ let signature = rpc
+ .create_and_send_transaction(
+ &[ix],
+ &self.payer_keypair.pubkey(),
+ &[&self.payer_keypair],
+ )
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to send transaction: {:?}", e))?;
+
+ info!(
+ "compress_accounts_idempotent tx for PDA {} sent: {}",
+ pda, signature
+ );
+
+ // Wait for confirmation
+ let confirmed = rpc
+ .confirm_transaction(signature)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {:?}", e))?;
+
+ if confirmed {
+ info!("compress_accounts_idempotent tx for PDA {} confirmed", pda);
+ Ok(signature)
+ } else {
+ Err(anyhow::anyhow!(
+ "Transaction {} not confirmed for PDA {}",
+ signature,
+ pda
+ ))
+ }
+ }
+}
diff --git a/forester/src/compressible/pda/mod.rs b/forester/src/compressible/pda/mod.rs
new file mode 100644
index 0000000000..e518b3bf6a
--- /dev/null
+++ b/forester/src/compressible/pda/mod.rs
@@ -0,0 +1,11 @@
+mod bootstrap;
+mod compressor;
+mod state;
+mod types;
+
+pub use bootstrap::bootstrap_pda_accounts;
+pub use compressor::{CachedProgramConfig, PdaCompressor};
+pub use state::PdaAccountTracker;
+pub use types::PdaAccountState;
+
+pub use super::config::PdaProgramConfig;
diff --git a/forester/src/compressible/pda/state.rs b/forester/src/compressible/pda/state.rs
new file mode 100644
index 0000000000..df8c0c3ae3
--- /dev/null
+++ b/forester/src/compressible/pda/state.rs
@@ -0,0 +1,175 @@
+use borsh::BorshDeserialize;
+use dashmap::DashMap;
+use light_compressible::rent::{
+ get_last_funded_epoch, get_rent_exemption_lamports, SLOTS_PER_EPOCH,
+};
+use light_sdk::compressible::compression_info::CompressionInfo;
+use solana_sdk::pubkey::Pubkey;
+use tracing::{debug, warn};
+
+use super::types::PdaAccountState;
+use crate::{
+ compressible::{
+ config::PdaProgramConfig,
+ traits::{CompressibleTracker, SubscriptionHandler},
+ },
+ Result,
+};
+
+/// Layout: [8-byte discriminator][Option][rest of data]
+fn extract_compression_info(data: &[u8]) -> Option {
+ const DISCRIMINATOR_SIZE: usize = 8;
+ if data.len() <= DISCRIMINATOR_SIZE {
+ return None;
+ }
+ Option::::deserialize(&mut &data[DISCRIMINATOR_SIZE..]).ok()?
+}
+
+fn calculate_compressible_slot(
+ compression_info: &CompressionInfo,
+ lamports: u64,
+ account_size: usize,
+) -> Result {
+ let rent_exemption = get_rent_exemption_lamports(account_size as u64)
+ .map_err(|e| anyhow::anyhow!("Failed to get rent exemption: {:?}", e))?;
+
+ let last_funded_epoch = get_last_funded_epoch(
+ account_size as u64,
+ lamports,
+ compression_info.last_claimed_slot(),
+ &compression_info.rent_config,
+ rent_exemption,
+ );
+
+ Ok(last_funded_epoch * SLOTS_PER_EPOCH)
+}
+
+#[derive(Debug)]
+pub struct PdaAccountTracker {
+ accounts: DashMap,
+ programs: Vec,
+}
+
+impl PdaAccountTracker {
+ pub fn new(programs: Vec) -> Self {
+ Self {
+ accounts: DashMap::new(),
+ programs,
+ }
+ }
+
+ pub fn programs(&self) -> &[PdaProgramConfig] {
+ &self.programs
+ }
+
+ pub fn get_ready_to_compress_for_program(
+ &self,
+ program_id: &Pubkey,
+ current_slot: u64,
+ ) -> Vec {
+ self.get_ready_to_compress(current_slot)
+ .into_iter()
+ .filter(|state| state.program_id == *program_id)
+ .collect()
+ }
+
+ pub fn update_from_account(
+ &self,
+ pubkey: Pubkey,
+ program_id: Pubkey,
+ account_data: &[u8],
+ lamports: u64,
+ ) -> Result<()> {
+ const DISCRIMINATOR_SIZE: usize = 8;
+
+ if account_data.len() < DISCRIMINATOR_SIZE {
+ debug!("Account {} too short, skipping", pubkey);
+ return Ok(());
+ }
+
+ if let Some(program_config) = self.programs.iter().find(|c| c.program_id == program_id) {
+ let account_discriminator: [u8; 8] = account_data[..DISCRIMINATOR_SIZE]
+ .try_into()
+ .map_err(|_| anyhow::anyhow!("Failed to convert discriminator slice"))?;
+
+ if account_discriminator != program_config.discriminator {
+ debug!("Account {} discriminator mismatch, skipping", pubkey);
+ return Ok(());
+ }
+ } else {
+ debug!("No config for program {}, skipping {}", program_id, pubkey);
+ return Ok(());
+ }
+
+ let compression_info = match extract_compression_info(account_data) {
+ Some(info) => info,
+ None => {
+ debug!("Account {} has no CompressionInfo, skipping", pubkey);
+ return Ok(());
+ }
+ };
+
+ if compression_info.is_compressed() {
+ debug!(
+ "Account {} is already compressed; skipping re-compression",
+ pubkey
+ );
+ return Ok(());
+ }
+
+ let compressible_slot =
+ match calculate_compressible_slot(&compression_info, lamports, account_data.len()) {
+ Ok(slot) => slot,
+ Err(e) => {
+ warn!(
+ "Failed to calculate compressible slot for {}: {}",
+ pubkey, e
+ );
+ return Ok(());
+ }
+ };
+
+ let state = PdaAccountState {
+ pubkey,
+ program_id,
+ lamports,
+ compressible_slot,
+ };
+
+ debug!(
+ "Updated PDA {}: program={}, slot={}",
+ pubkey, program_id, compressible_slot
+ );
+
+ self.insert(state);
+ Ok(())
+ }
+}
+
+impl CompressibleTracker for PdaAccountTracker {
+ fn accounts(&self) -> &DashMap {
+ &self.accounts
+ }
+}
+
+impl Default for PdaAccountTracker {
+ fn default() -> Self {
+ Self::new(Vec::new())
+ }
+}
+
+impl SubscriptionHandler for PdaAccountTracker {
+ fn handle_update(
+ &self,
+ pubkey: Pubkey,
+ program_id: Pubkey,
+ data: &[u8],
+ lamports: u64,
+ ) -> Result<()> {
+ self.update_from_account(pubkey, program_id, data, lamports)
+ }
+
+ fn handle_removal(&self, pubkey: &Pubkey) {
+ self.remove(pubkey);
+ }
+}
diff --git a/forester/src/compressible/pda/types.rs b/forester/src/compressible/pda/types.rs
new file mode 100644
index 0000000000..a93010ff94
--- /dev/null
+++ b/forester/src/compressible/pda/types.rs
@@ -0,0 +1,28 @@
+use solana_sdk::pubkey::Pubkey;
+
+use crate::compressible::traits::CompressibleState;
+
+/// Stores only metadata. The compressor fetches the compressed account
+/// from the indexer to get validity proofs before closing the on-chain PDA.
+#[derive(Clone, Debug)]
+pub struct PdaAccountState {
+ pub pubkey: Pubkey,
+ pub program_id: Pubkey,
+ pub lamports: u64,
+ /// Ready to compress when current_slot > compressible_slot
+ pub compressible_slot: u64,
+}
+
+impl CompressibleState for PdaAccountState {
+ fn pubkey(&self) -> &Pubkey {
+ &self.pubkey
+ }
+
+ fn lamports(&self) -> u64 {
+ self.lamports
+ }
+
+ fn compressible_slot(&self) -> u64 {
+ self.compressible_slot
+ }
+}
diff --git a/forester/src/compressible/state.rs b/forester/src/compressible/state.rs
deleted file mode 100644
index ed27ad07fe..0000000000
--- a/forester/src/compressible/state.rs
+++ /dev/null
@@ -1,160 +0,0 @@
-use std::sync::Arc;
-
-use borsh::BorshDeserialize;
-use dashmap::DashMap;
-use light_token_interface::state::Token;
-use solana_sdk::{pubkey::Pubkey, rent::Rent};
-use tracing::{debug, warn};
-
-use super::types::CompressibleAccountState;
-use crate::Result;
-
-/// Calculate the slot at which an account becomes compressible
-/// Returns the last funded slot; accounts are compressible when current_slot > this value
-fn calculate_compressible_slot(account: &Token, lamports: u64, account_size: usize) -> Result {
- use light_compressible::rent::SLOTS_PER_EPOCH;
- use light_token_interface::state::extensions::ExtensionStruct;
-
- // Calculate rent exemption dynamically
- let rent_exemption = Rent::default().minimum_balance(account_size);
-
- // Get CompressionInfo from Compressible extension
- let compression_info = account
- .extensions
- .as_ref()
- .and_then(|exts| {
- exts.iter().find_map(|ext| match ext {
- ExtensionStruct::Compressible(comp) => Some(&comp.info),
- _ => None,
- })
- })
- .ok_or_else(|| anyhow::anyhow!("Missing Compressible extension on Token account"))?;
-
- // Calculate last funded epoch using embedded compression info
- let last_funded_epoch = compression_info
- .get_last_funded_epoch(account_size as u64, lamports, rent_exemption)
- .map_err(|e| {
- anyhow::anyhow!(
- "Failed to calculate last funded epoch for account with {} lamports: {:?}",
- lamports,
- e
- )
- })?;
-
- // Convert to slot
- Ok(last_funded_epoch * SLOTS_PER_EPOCH)
-}
-
-/// Tracker for compressible Light Token accounts
-#[derive(Debug)]
-pub struct CompressibleAccountTracker {
- accounts: Arc>,
-}
-
-impl CompressibleAccountTracker {
- /// Create a new tracker
- pub fn new() -> Self {
- Self {
- accounts: Arc::new(DashMap::new()),
- }
- }
-
- /// Insert or update an account state
- pub fn insert(&self, state: CompressibleAccountState) {
- self.accounts.insert(state.pubkey, state);
- }
-
- /// Remove an account from tracking
- pub fn remove(&self, pubkey: &Pubkey) -> Option {
- self.accounts.remove(pubkey).map(|(_, v)| v)
- }
-
- /// Get all accounts with compressible configuration
- pub fn get_compressible_accounts(&self) -> Vec {
- self.accounts
- .iter()
- .filter(|entry| {
- let state = entry.value();
- // Check if account is a valid Token (account_type == 2)
- state.account.is_token_account()
- })
- .map(|entry| entry.value().clone())
- .collect()
- }
-
- /// Get accounts that are ready to be compressed (rent expired)
- pub fn get_ready_to_compress(&self, current_slot: u64) -> Vec {
- self.accounts
- .iter()
- .filter(|entry| {
- let state = entry.value();
- // Account is compressible if current slot is past the compressible slot
- state.compressible_slot < current_slot
- })
- .map(|entry| entry.value().clone())
- .collect()
- }
-
- /// Get total number of tracked accounts
- pub fn len(&self) -> usize {
- self.accounts.len()
- }
-
- /// Check if tracker is empty
- pub fn is_empty(&self) -> bool {
- self.accounts.is_empty()
- }
-
- /// Update account state from raw account data
- pub fn update_from_account(
- &self,
- pubkey: Pubkey,
- account_data: &[u8],
- lamports: u64,
- ) -> Result<()> {
- // Deserialize Token using borsh
- let ctoken = Token::try_from_slice(account_data)
- .map_err(|e| anyhow::anyhow!("Failed to deserialize Token with borsh: {:?}", e))?;
-
- // Calculate compressible slot
- let compressible_slot =
- match calculate_compressible_slot(&ctoken, lamports, account_data.len()) {
- Ok(slot) => slot,
- Err(e) => {
- warn!(
- "Failed to calculate compressible slot for account {}: {}. Skipping account.",
- pubkey, e
- );
- return Ok(());
- }
- };
-
- // Create state with full Light Token account
- let state = CompressibleAccountState {
- pubkey,
- account: ctoken,
- lamports,
- compressible_slot,
- };
-
- debug!(
- "Updated account {}: mint={:?}, owner={:?}, amount={}, compressible_slot={}",
- pubkey,
- state.account.mint,
- state.account.owner,
- state.account.amount,
- compressible_slot
- );
-
- // Store in DashMap
- self.insert(state);
-
- Ok(())
- }
-}
-
-impl Default for CompressibleAccountTracker {
- fn default() -> Self {
- Self::new()
- }
-}
diff --git a/forester/src/compressible/subscriber.rs b/forester/src/compressible/subscriber.rs
index 5cf2044e96..096f457bb4 100644
--- a/forester/src/compressible/subscriber.rs
+++ b/forester/src/compressible/subscriber.rs
@@ -1,70 +1,217 @@
-use std::{str::FromStr, sync::Arc};
+use std::{str::FromStr, sync::Arc, time::Duration};
use futures::StreamExt;
use light_token_interface::LIGHT_TOKEN_PROGRAM_ID;
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
nonblocking::pubsub_client::PubsubClient,
- rpc_config::{
- RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
- RpcTransactionLogsFilter,
- },
- rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse},
+ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
+ rpc_response::{Response as RpcResponse, RpcKeyedAccount},
};
use solana_rpc_client_api::filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use tokio::sync::broadcast;
-use tracing::{debug, error, info};
+use tracing::{debug, error, info, warn};
-use super::state::CompressibleAccountTracker;
+use super::{
+ config::{ACCOUNT_TYPE_OFFSET, CTOKEN_ACCOUNT_TYPE_FILTER, MINT_ACCOUNT_TYPE_FILTER},
+ traits::SubscriptionHandler,
+};
use crate::Result;
-/// Registry program ID for subscribing to compress_and_close logs
-const REGISTRY_PROGRAM_ID_STR: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX";
+/// Configuration for a program subscription
+#[derive(Debug, Clone)]
+pub struct SubscriptionConfig {
+ /// Program ID to subscribe to
+ pub program_id: Pubkey,
+ /// Optional memcmp filter (offset and base58-encoded bytes)
+ /// None means no filter (subscribe to all accounts)
+ pub filter: Option,
+ /// Human-readable name for logging
+ pub name: String,
+}
+
+/// Memcmp filter configuration
+#[derive(Debug, Clone)]
+pub struct MemcmpFilter {
+ pub offset: usize,
+ pub bytes: String, // Base58-encoded
+}
-/// Log prefix emitted by registry program when closing accounts
-const COMPRESS_AND_CLOSE_LOG_PREFIX: &str = "compress_and_close:";
+/// Configuration for WebSocket reconnection with exponential backoff
+#[derive(Debug, Clone)]
+pub struct ReconnectConfig {
+ /// Initial delay before first reconnection attempt
+ pub initial_delay: Duration,
+ /// Maximum delay between reconnection attempts
+ pub max_delay: Duration,
+ /// Multiplier for exponential backoff (e.g., 2.0 doubles delay each attempt)
+ pub backoff_multiplier: f64,
+}
-/// Subscribes to account changes for all compressible Light Token accounts
-pub struct AccountSubscriber {
+impl Default for ReconnectConfig {
+ fn default() -> Self {
+ Self {
+ initial_delay: Duration::from_secs(1),
+ max_delay: Duration::from_secs(60),
+ backoff_multiplier: 2.0,
+ }
+ }
+}
+
+/// Result of a single connection session
+enum ConnectionResult {
+ /// Shutdown signal received
+ Shutdown,
+ /// Stream closed unexpectedly (should reconnect)
+ StreamClosed,
+}
+
+impl SubscriptionConfig {
+ /// Create subscription config for Light Token accounts (ctokens)
+ pub fn ctoken() -> Self {
+ Self {
+ program_id: Pubkey::new_from_array(LIGHT_TOKEN_PROGRAM_ID),
+ filter: Some(MemcmpFilter {
+ offset: ACCOUNT_TYPE_OFFSET,
+ bytes: CTOKEN_ACCOUNT_TYPE_FILTER.to_string(),
+ }),
+ name: "ctoken".to_string(),
+ }
+ }
+
+ /// Create subscription config for Light Mint accounts
+ pub fn mint() -> Self {
+ Self {
+ program_id: Pubkey::new_from_array(LIGHT_TOKEN_PROGRAM_ID),
+ filter: Some(MemcmpFilter {
+ offset: ACCOUNT_TYPE_OFFSET,
+ bytes: MINT_ACCOUNT_TYPE_FILTER.to_string(),
+ }),
+ name: "mint".to_string(),
+ }
+ }
+
+ /// Create subscription config for a PDA program with discriminator filter.
+ /// The discriminator is an 8-byte value at the start of the account data (offset 0).
+ pub fn pda(program_id: Pubkey, discriminator: [u8; 8], name: String) -> Self {
+ // Convert discriminator to base58 for the memcmp filter
+ let discriminator_base58 = bs58::encode(&discriminator).into_string();
+
+ Self {
+ program_id,
+ filter: Some(MemcmpFilter {
+ offset: 0,
+ bytes: discriminator_base58,
+ }),
+ name,
+ }
+ }
+}
+
+/// Generic subscriber for account changes.
+/// Works with any tracker that implements SubscriptionHandler.
+/// Automatically reconnects with exponential backoff on connection loss.
+pub struct AccountSubscriber {
ws_url: String,
- tracker: Arc,
+ handler: Arc,
+ config: SubscriptionConfig,
+ reconnect_config: ReconnectConfig,
shutdown_rx: broadcast::Receiver<()>,
}
-impl AccountSubscriber {
+impl AccountSubscriber {
pub fn new(
ws_url: String,
- tracker: Arc,
+ handler: Arc,
+ config: SubscriptionConfig,
shutdown_rx: broadcast::Receiver<()>,
) -> Self {
Self {
ws_url,
- tracker,
+ handler,
+ config,
+ reconnect_config: ReconnectConfig::default(),
shutdown_rx,
}
}
+ pub fn with_reconnect_config(mut self, reconnect_config: ReconnectConfig) -> Self {
+ self.reconnect_config = reconnect_config;
+ self
+ }
+
pub async fn run(&mut self) -> Result<()> {
- info!("Starting account subscriber at {}", self.ws_url);
+ info!(
+ "Starting {} account subscriber at {}",
+ self.config.name, self.ws_url
+ );
+
+ let mut current_delay = self.reconnect_config.initial_delay;
+ let mut attempt: u32 = 0;
+
+ loop {
+ match self.run_connection().await {
+ Ok(ConnectionResult::Shutdown) => {
+ info!("{} subscriber stopped", self.config.name);
+ return Ok(());
+ }
+ Ok(ConnectionResult::StreamClosed) => {
+ attempt += 1;
+ warn!(
+ "{} connection lost (attempt {}), reconnecting in {:?}...",
+ self.config.name, attempt, current_delay
+ );
+ }
+ Err(e) => {
+ attempt += 1;
+ warn!(
+ "{} connection error (attempt {}): {:?}, reconnecting in {:?}...",
+ self.config.name, attempt, e, current_delay
+ );
+ }
+ }
+ // Wait with backoff, but check for shutdown signal
+ tokio::select! {
+ _ = tokio::time::sleep(current_delay) => {}
+ _ = self.shutdown_rx.recv() => {
+ info!("Shutdown signal received for {} subscriber during reconnect backoff", self.config.name);
+ return Ok(());
+ }
+ }
+
+ // Exponential backoff
+ current_delay = Duration::from_secs_f64(
+ (current_delay.as_secs_f64() * self.reconnect_config.backoff_multiplier)
+ .min(self.reconnect_config.max_delay.as_secs_f64()),
+ );
+ }
+ }
+
+ /// Runs a single connection session. Returns when:
+ /// - Shutdown signal received (Ok(Shutdown))
+ /// - Stream closed unexpectedly (Ok(StreamClosed))
+ /// - Connection/subscription error (Err)
+ async fn run_connection(&mut self) -> Result {
// Connect to WebSocket
let pubsub_client = PubsubClient::new(&self.ws_url)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?;
- let program_id = Pubkey::new_from_array(LIGHT_TOKEN_PROGRAM_ID);
- // Subscribe to compressed token program accounts with filter for account_type = 2 at position 165
- // This indicates a Light Token account with extensions (e.g., Compressible)
- // "3" is base58 encoding of byte value 2 (ACCOUNT_TYPE_TOKEN_ACCOUNT)
+ // Build filters based on config
+ let filters = self.config.filter.as_ref().map(|f| {
+ vec![RpcFilterType::Memcmp(Memcmp::new(
+ f.offset,
+ MemcmpEncodedBytes::Base58(f.bytes.clone()),
+ ))]
+ });
+
let (mut subscription, unsubscribe) = pubsub_client
.program_subscribe(
- &program_id,
+ &self.config.program_id,
Some(RpcProgramAccountsConfig {
- filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new(
- 165,
- MemcmpEncodedBytes::Base58("3".to_string()),
- ))]),
+ filters,
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::confirmed()),
@@ -79,8 +226,8 @@ impl AccountSubscriber {
.map_err(|e| anyhow::anyhow!("Failed to subscribe to program accounts: {}", e))?;
info!(
- "Account subscription established for program {}",
- program_id
+ "{} subscription established for program {}",
+ self.config.name, self.config.program_id
);
// Process subscription messages
@@ -92,22 +239,19 @@ impl AccountSubscriber {
self.handle_account_update(response).await;
}
None => {
- error!("Account subscription stream closed unexpectedly");
+ error!("{} subscription stream closed unexpectedly", self.config.name);
unsubscribe().await;
- return Err(anyhow::anyhow!("Account subscription stream closed"));
+ return Ok(ConnectionResult::StreamClosed);
}
}
}
_ = self.shutdown_rx.recv() => {
- info!("Shutdown signal received");
+ info!("Shutdown signal received for {} subscriber", self.config.name);
unsubscribe().await;
- break;
+ return Ok(ConnectionResult::Shutdown);
}
}
}
-
- info!("Account subscriber stopped");
- Ok(())
}
async fn handle_account_update(&self, response: RpcResponse) {
@@ -142,143 +286,25 @@ impl AccountSubscriber {
}
};
- // Update tracker
- match self.tracker.update_from_account(
+ // Call handler
+ match self.handler.handle_update(
pubkey,
+ self.config.program_id,
&account_data,
response.value.account.lamports,
) {
Ok(()) => {
debug!(
- "Updated account {} at slot {}",
- pubkey, response.context.slot
+ "Updated {} account {} at slot {}",
+ self.config.name, pubkey, response.context.slot
);
}
Err(e) => {
- error!("Failed to update tracker for {}: {}", pubkey, e);
- }
- }
- }
-}
-
-/// Subscribes to registry program logs to detect compress_and_close operations
-/// and remove closed accounts from the tracker by parsing log messages directly
-pub struct LogSubscriber {
- ws_url: String,
- tracker: Arc,
- shutdown_rx: broadcast::Receiver<()>,
-}
-
-impl LogSubscriber {
- pub fn new(
- ws_url: String,
- tracker: Arc,
- shutdown_rx: broadcast::Receiver<()>,
- ) -> Self {
- Self {
- ws_url,
- tracker,
- shutdown_rx,
- }
- }
-
- pub async fn run(&mut self) -> Result<()> {
- info!("Starting log subscriber at {}", self.ws_url);
-
- // Connect to WebSocket
- let pubsub_client = PubsubClient::new(&self.ws_url)
- .await
- .map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?;
-
- let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID_STR)
- .map_err(|e| anyhow::anyhow!("Invalid registry program ID: {}", e))?;
-
- // Subscribe to logs mentioning the registry program
- let filter = RpcTransactionLogsFilter::Mentions(vec![registry_program_id.to_string()]);
- let config = RpcTransactionLogsConfig {
- commitment: Some(CommitmentConfig::confirmed()),
- };
-
- let (mut subscription, unsubscribe) = pubsub_client
- .logs_subscribe(filter, config)
- .await
- .map_err(|e| anyhow::anyhow!("Failed to subscribe to logs: {}", e))?;
-
- info!(
- "Log subscription established for registry program {}",
- registry_program_id
- );
-
- // Process subscription messages
- loop {
- tokio::select! {
- result = subscription.next() => {
- match result {
- Some(response) => {
- self.handle_log_notification(response);
- }
- None => {
- error!("Log subscription stream closed unexpectedly");
- unsubscribe().await;
- return Err(anyhow::anyhow!("Log subscription stream closed"));
- }
- }
- }
- _ = self.shutdown_rx.recv() => {
- info!("Shutdown signal received for log subscriber");
- unsubscribe().await;
- break;
- }
- }
- }
-
- info!("Log subscriber stopped");
- Ok(())
- }
-
- fn handle_log_notification(&self, response: RpcResponse) {
- let logs_response = response.value;
-
- // Skip failed transactions
- if logs_response.err.is_some() {
- debug!("Skipping failed transaction {}", logs_response.signature);
- return;
- }
-
- // Parse logs looking for compress_and_close entries
- let mut removed_count = 0;
- for log in &logs_response.logs {
- // Look for our log prefix: "Program log: compress_and_close:"
- // The actual log format is "Program log: compress_and_close:"
- if let Some(pubkey_str) = log
- .strip_prefix("Program log: ")
- .and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
- {
- match Pubkey::from_str(pubkey_str) {
- Ok(pubkey) => {
- if self.tracker.remove(&pubkey).is_some() {
- debug!(
- "Removed closed account {} from tracker (compress_and_close log)",
- pubkey
- );
- removed_count += 1;
- }
- }
- Err(e) => {
- error!(
- "Invalid pubkey in compress_and_close log '{}': {}",
- pubkey_str, e
- );
- }
- }
+ error!(
+ "Failed to update {} tracker for {}: {}",
+ self.config.name, pubkey, e
+ );
}
}
-
- if removed_count > 0 {
- info!(
- "Removed {} closed accounts from transaction {}",
- removed_count, logs_response.signature
- );
- }
}
}
diff --git a/forester/src/compressible/traits.rs b/forester/src/compressible/traits.rs
new file mode 100644
index 0000000000..d7df33abb3
--- /dev/null
+++ b/forester/src/compressible/traits.rs
@@ -0,0 +1,58 @@
+//! Shared traits for compressible account tracking.
+
+use dashmap::DashMap;
+use solana_sdk::pubkey::Pubkey;
+
+use crate::Result;
+
+pub trait CompressibleState: Clone + Send + Sync {
+ fn pubkey(&self) -> &Pubkey;
+ fn lamports(&self) -> u64;
+ fn compressible_slot(&self) -> u64;
+
+ fn is_ready_to_compress(&self, current_slot: u64) -> bool {
+ current_slot > self.compressible_slot()
+ }
+}
+
+/// Implementors only need to provide `accounts()` - all other methods have default implementations.
+pub trait CompressibleTracker: Send + Sync {
+ fn accounts(&self) -> &DashMap;
+
+ fn insert(&self, state: S) {
+ self.accounts().insert(*state.pubkey(), state);
+ }
+
+ fn remove(&self, pubkey: &Pubkey) -> Option {
+ self.accounts().remove(pubkey).map(|(_, v)| v)
+ }
+
+ fn len(&self) -> usize {
+ self.accounts().len()
+ }
+
+ fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ fn get_ready_to_compress(&self, current_slot: u64) -> Vec {
+ self.accounts()
+ .iter()
+ .filter(|entry| entry.value().is_ready_to_compress(current_slot))
+ .map(|entry| entry.value().clone())
+ .collect()
+ }
+}
+
+/// Allows AccountSubscriber to work with any tracker type.
+pub trait SubscriptionHandler: Send + Sync {
+ fn handle_update(
+ &self,
+ pubkey: Pubkey,
+ program_id: Pubkey,
+ data: &[u8],
+ lamports: u64,
+ ) -> Result<()>;
+
+ fn handle_removal(&self, pubkey: &Pubkey);
+}
diff --git a/forester/src/compressible/types.rs b/forester/src/compressible/types.rs
deleted file mode 100644
index 75c4831034..0000000000
--- a/forester/src/compressible/types.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-use light_token_interface::state::Token;
-use solana_sdk::pubkey::Pubkey;
-
-/// State of a compressible Token account
-#[derive(Clone, Debug)]
-pub struct CompressibleAccountState {
- /// Account public key
- pub pubkey: Pubkey,
- pub account: Token,
- pub lamports: u64,
- /// The slot at which this account becomes compressible (last_funded_epoch * SLOTS_PER_EPOCH)
- /// Accounts are ready to compress when current_slot > compressible_slot
- pub compressible_slot: u64,
-}
diff --git a/forester/src/compressible/validation.rs b/forester/src/compressible/validation.rs
new file mode 100644
index 0000000000..7378966084
--- /dev/null
+++ b/forester/src/compressible/validation.rs
@@ -0,0 +1,73 @@
+//! Startup validation for compressible configurations.
+//!
+//! This module provides functions to validate on-chain configuration accounts
+//! at forester startup, allowing fail-fast behavior on misconfigurations.
+
+use std::str::FromStr;
+
+use anchor_lang::AccountDeserialize;
+use light_compressible::config::CompressibleConfig as OnChainCompressibleConfig;
+use solana_client::nonblocking::rpc_client::RpcClient;
+use solana_sdk::pubkey::Pubkey;
+
+use super::config::REGISTRY_PROGRAM_ID;
+use crate::Result;
+
+/// Validates the on-chain CompressibleConfig for CToken/Mint compression.
+///
+/// Fetches the CompressibleConfig PDA from the registry program and validates:
+/// - Account exists
+/// - State is not Inactive (0 = paused)
+///
+/// Active (1) and Deprecated (2) states are both valid for compression operations.
+///
+/// # Errors
+///
+/// Returns an error if:
+/// - The config account doesn't exist
+/// - The config state is Inactive (paused)
+/// - RPC communication fails
+pub async fn validate_compressible_config(rpc_url: &str) -> Result<()> {
+ let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID)?;
+
+ // Derive the CompressibleConfig PDA
+ let (config_pda, _) = OnChainCompressibleConfig::derive_v1_config_pda(®istry_program_id);
+
+ // Fetch the account
+ let rpc_client = RpcClient::new(rpc_url.to_string());
+ let account = rpc_client.get_account(&config_pda).await.map_err(|e| {
+ anyhow::anyhow!(
+ "Failed to fetch CompressibleConfig at {}: {}",
+ config_pda,
+ e
+ )
+ })?;
+
+ // Deserialize using AccountDeserialize to validate the discriminator
+ let config = OnChainCompressibleConfig::try_deserialize(&mut account.data.as_slice())
+ .map_err(|e| anyhow::anyhow!("Failed to deserialize CompressibleConfig: {:?}", e))?;
+
+ // Validate state is not Inactive (0)
+ // State values: 0 = Inactive (paused), 1 = Active, 2 = Deprecated
+ // Both Active and Deprecated are valid for compression operations
+ config.validate_not_inactive().map_err(|e| {
+ anyhow::anyhow!(
+ "CompressibleConfig validation failed: {:?}. Config PDA: {}",
+ e,
+ config_pda
+ )
+ })?;
+
+ tracing::info!(
+ "CompressibleConfig validated: PDA={}, state={} ({})",
+ config_pda,
+ config.state,
+ match config.state {
+ 1 => "Active",
+ 2 => "Deprecated",
+ _ => "Unknown",
+ }
+ );
+
+ Ok(())
+}
diff --git a/forester/src/config.rs b/forester/src/config.rs
index 2a345589f5..460343c903 100644
--- a/forester/src/config.rs
+++ b/forester/src/config.rs
@@ -362,16 +362,34 @@ impl ForesterConfig {
derivation_pubkey: derivation,
address_tree_data: vec![],
state_tree_data: vec![],
- compressible_config: if args.enable_compressible {
+ compressible_config: if args.enable_compressible
+ || !args.compressible_pda_programs.is_empty()
+ {
match &args.ws_rpc_url {
- Some(ws_url) => Some(crate::compressible::config::CompressibleConfig::new(
- ws_url.clone(),
- )),
+ Some(ws_url) => {
+ // Parse PDA program configurations
+ let pda_programs: Vec = args
+ .compressible_pda_programs
+ .iter()
+ .map(|s| {
+ s.parse::()
+ .map_err(|e| ConfigError::InvalidArguments {
+ field: "compressible_pda_programs",
+ invalid_values: vec![e],
+ })
+ })
+ .collect::, _>>()?;
+
+ Some(
+ crate::compressible::config::CompressibleConfig::new(ws_url.clone())
+ .with_pda_programs(pda_programs),
+ )
+ }
None => {
return Err(ConfigError::InvalidArguments {
- field: "enable_compressible",
+ field: "ws_rpc_url",
invalid_values: vec![
- "--ws-rpc-url is required when --enable-compressible is true"
+ "--ws-rpc-url is required when --enable-compressible is true or --compressible-pda-program is specified"
.to_string(),
],
}
diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs
index b02ad2d74b..a365885e73 100644
--- a/forester/src/epoch_manager.rs
+++ b/forester/src/epoch_manager.rs
@@ -16,7 +16,7 @@ use forester_utils::{
};
use futures::future::join_all;
use light_client::{
- indexer::{MerkleProof, NewAddressProofWithContext},
+ indexer::{Indexer, MerkleProof, NewAddressProofWithContext},
rpc::{LightClient, LightClientConfig, RetryConfig, Rpc, RpcError},
};
use light_compressed_account::TreeType;
@@ -46,7 +46,7 @@ use tokio::{
use tracing::{debug, error, info, info_span, instrument, trace, warn};
use crate::{
- compressible::{CompressibleAccountTracker, Compressor},
+ compressible::{traits::CompressibleTracker, CTokenAccountTracker, CTokenCompressor},
errors::{
ChannelError, ForesterError, InitializationError, RegistrationError, WorkReportError,
},
@@ -189,7 +189,7 @@ pub enum MerkleProofType {
}
#[derive(Debug)]
-pub struct EpochManager {
+pub struct EpochManager {
config: Arc,
protocol_config: Arc,
rpc_pool: Arc>,
@@ -207,13 +207,15 @@ pub struct EpochManager {
proof_caches: Arc>>,
state_processors: StateBatchProcessorMap,
address_processors: AddressBatchProcessorMap,
- compressible_tracker: Option>,
+ compressible_tracker: Option>,
+ pda_tracker: Option>,
+ mint_tracker: Option>,
/// Cached zkp_batch_size per tree to filter queue updates below threshold
zkp_batch_sizes: Arc>,
address_lookup_tables: Arc>,
}
-impl Clone for EpochManager {
+impl Clone for EpochManager {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
@@ -233,13 +235,15 @@ impl Clone for EpochManager {
state_processors: self.state_processors.clone(),
address_processors: self.address_processors.clone(),
compressible_tracker: self.compressible_tracker.clone(),
+ pda_tracker: self.pda_tracker.clone(),
+ mint_tracker: self.mint_tracker.clone(),
zkp_batch_sizes: self.zkp_batch_sizes.clone(),
address_lookup_tables: self.address_lookup_tables.clone(),
}
}
}
-impl EpochManager {
+impl EpochManager {
#[allow(clippy::too_many_arguments)]
pub async fn new(
config: Arc,
@@ -251,7 +255,9 @@ impl EpochManager {
new_tree_sender: broadcast::Sender,
tx_cache: Arc>,
ops_cache: Arc>,
- compressible_tracker: Option>,
+ compressible_tracker: Option>,
+ pda_tracker: Option>,
+ mint_tracker: Option>,
address_lookup_tables: Arc>,
) -> Result {
let authority = Arc::new(config.payer_keypair.insecure_clone());
@@ -273,6 +279,8 @@ impl EpochManager {
state_processors: Arc::new(DashMap::new()),
address_processors: Arc::new(DashMap::new()),
compressible_tracker,
+ pda_tracker,
+ mint_tracker,
zkp_batch_sizes: Arc::new(DashMap::new()),
address_lookup_tables,
})
@@ -1780,7 +1788,7 @@ impl EpochManager {
config.batch_size
);
- let compressor = Compressor::new(
+ let compressor = CTokenCompressor::new(
self.rpc_pool.clone(),
tracker.clone(),
self.config.payer_keypair.insecure_clone(),
@@ -1901,9 +1909,219 @@ impl EpochManager {
}
info!(
- "Completed compression for epoch {}: compressed {} accounts",
+ "Completed ctoken compression for epoch {}: compressed {} accounts",
epoch_info.epoch, total_compressed
);
+
+ // Process PDA compression if configured
+ let pda_compressed = self
+ .dispatch_pda_compression(consecutive_eligibility_end)
+ .await
+ .unwrap_or_else(|e| {
+ error!("PDA compression failed: {:?}", e);
+ 0
+ });
+
+ // Process Mint compression
+ let mint_compressed = self
+ .dispatch_mint_compression(consecutive_eligibility_end)
+ .await
+ .unwrap_or_else(|e| {
+ error!("Mint compression failed: {:?}", e);
+ 0
+ });
+
+ let total = total_compressed + pda_compressed + mint_compressed;
+ info!(
+ "Completed all compression for epoch {}: {} ctoken + {} PDA + {} Mint = {} total",
+ epoch_info.epoch, total_compressed, pda_compressed, mint_compressed, total
+ );
+ Ok(total)
+ }
+
+ async fn dispatch_pda_compression(&self, consecutive_eligibility_end: u64) -> Result {
+ let pda_tracker = match &self.pda_tracker {
+ Some(tracker) => tracker,
+ None => return Ok(0),
+ };
+
+ let config = match &self.config.compressible_config {
+ Some(cfg) => cfg,
+ None => return Ok(0),
+ };
+
+ if config.pda_programs.is_empty() {
+ return Ok(0);
+ }
+
+ let current_slot = self.slot_tracker.estimated_current_slot();
+ if current_slot >= consecutive_eligibility_end {
+ debug!(
+ "Skipping PDA compression: forester no longer eligible (current_slot={}, eligibility_end={})",
+ current_slot, consecutive_eligibility_end
+ );
+ return Ok(0);
+ }
+
+ let mut total_compressed = 0;
+
+ // Shared cancellation flag across all programs
+ let cancelled = Arc::new(AtomicBool::new(false));
+
+ // Process each configured PDA program
+ for program_config in &config.pda_programs {
+ // Check cancellation at program level
+ if cancelled.load(Ordering::Relaxed) {
+ break;
+ }
+
+ let accounts = pda_tracker
+ .get_ready_to_compress_for_program(&program_config.program_id, current_slot);
+
+ if accounts.is_empty() {
+ trace!(
+ "No compressible PDA accounts ready for program {}",
+ program_config.program_id
+ );
+ continue;
+ }
+
+ info!(
+ "Processing {} compressible PDA accounts for program {}",
+ accounts.len(),
+ program_config.program_id
+ );
+
+ let pda_compressor = crate::compressible::pda::PdaCompressor::new(
+ self.rpc_pool.clone(),
+ pda_tracker.clone(),
+ self.config.payer_keypair.insecure_clone(),
+ );
+
+ // Fetch and cache config once per program
+ let cached_config = match pda_compressor.fetch_program_config(program_config).await {
+ Ok(cfg) => cfg,
+ Err(e) => {
+ error!(
+ "Failed to fetch config for program {}: {:?}",
+ program_config.program_id, e
+ );
+ continue;
+ }
+ };
+
+ // Check eligibility before processing
+ let current_slot = self.slot_tracker.estimated_current_slot();
+ if current_slot >= consecutive_eligibility_end {
+ cancelled.store(true, Ordering::Relaxed);
+ warn!(
+ "Stopping PDA compression: forester no longer eligible (current_slot={}, eligibility_end={})",
+ current_slot, consecutive_eligibility_end
+ );
+ break;
+ }
+
+ // Process all accounts for this program concurrently
+ let results = pda_compressor
+ .compress_batch_concurrent(
+ &accounts,
+ program_config,
+ &cached_config,
+ config.max_concurrent_batches,
+ cancelled.clone(),
+ )
+ .await;
+
+ // Process results (tracker cleanup already done by compressor)
+ for result in results {
+ match result {
+ Ok((sig, account_state)) => {
+ debug!(
+ "Compressed PDA {} for program {}: {}",
+ account_state.pubkey, program_config.program_id, sig
+ );
+ total_compressed += 1;
+ }
+ Err((account_state, e)) => {
+ if e.to_string() != "Cancelled" {
+ error!(
+ "Failed to compress PDA {} for program {}: {:?}",
+ account_state.pubkey, program_config.program_id, e
+ );
+ }
+ }
+ }
+ }
+ }
+
+ info!("Completed PDA compression: {} accounts", total_compressed);
+ Ok(total_compressed)
+ }
+
+ async fn dispatch_mint_compression(&self, consecutive_eligibility_end: u64) -> Result {
+ let mint_tracker = match &self.mint_tracker {
+ Some(tracker) => tracker,
+ None => return Ok(0),
+ };
+
+ let config = match &self.config.compressible_config {
+ Some(cfg) => cfg,
+ None => return Ok(0),
+ };
+
+ let current_slot = self.slot_tracker.estimated_current_slot();
+ if current_slot >= consecutive_eligibility_end {
+ debug!(
+ "Skipping Mint compression: forester no longer eligible (current_slot={}, eligibility_end={})",
+ current_slot, consecutive_eligibility_end
+ );
+ return Ok(0);
+ }
+
+ let accounts = mint_tracker.get_ready_to_compress(current_slot);
+
+ if accounts.is_empty() {
+ trace!("No compressible Mint accounts ready");
+ return Ok(0);
+ }
+
+ info!(
+ "Processing {} compressible Mint accounts concurrently (max_concurrent={})",
+ accounts.len(),
+ config.max_concurrent_batches
+ );
+
+ let mint_compressor = crate::compressible::mint::MintCompressor::new(
+ self.rpc_pool.clone(),
+ mint_tracker.clone(),
+ self.config.payer_keypair.insecure_clone(),
+ );
+
+ // Shared cancellation flag
+ let cancelled = Arc::new(AtomicBool::new(false));
+
+ // Process all mints concurrently
+ let results = mint_compressor
+ .compress_batch_concurrent(&accounts, config.max_concurrent_batches, cancelled)
+ .await;
+
+ // Process results (tracker cleanup already done by compressor)
+ let mut total_compressed = 0;
+ for result in results {
+ match result {
+ Ok((sig, mint_state)) => {
+ debug!("Compressed Mint {}: {}", mint_state.pubkey, sig);
+ total_compressed += 1;
+ }
+ Err((mint_state, e)) => {
+ if e.to_string() != "Cancelled" {
+ error!("Failed to compress Mint {}: {:?}", mint_state.pubkey, e);
+ }
+ }
+ }
+ }
+
+ info!("Completed Mint compression: {} accounts", total_compressed);
Ok(total_compressed)
}
@@ -2841,7 +3059,7 @@ fn should_skip_tree(config: &ForesterConfig, tree_type: &TreeType) -> bool {
fields(forester = %config.payer_keypair.pubkey())
)]
#[allow(clippy::too_many_arguments)]
-pub async fn run_service(
+pub async fn run_service(
config: Arc,
protocol_config: Arc,
rpc_pool: Arc>,
@@ -2850,7 +3068,9 @@ pub async fn run_service(
slot_tracker: Arc,
tx_cache: Arc>,
ops_cache: Arc>,
- compressible_tracker: Option>,
+ compressible_tracker: Option>,
+ pda_tracker: Option>,
+ mint_tracker: Option>,
) -> Result<()> {
info_span!("run_service", forester = %config.payer_keypair.pubkey())
.in_scope(|| async {
@@ -3038,6 +3258,8 @@ pub async fn run_service(
tx_cache.clone(),
ops_cache.clone(),
compressible_tracker.clone(),
+ pda_tracker.clone(),
+ mint_tracker.clone(),
address_lookup_tables,
)
.await
diff --git a/forester/src/lib.rs b/forester/src/lib.rs
index 85f136db0c..2167db96c2 100644
--- a/forester/src/lib.rs
+++ b/forester/src/lib.rs
@@ -28,7 +28,10 @@ use forester_utils::{
forester_epoch::TreeAccounts, rate_limiter::RateLimiter, rpc_pool::SolanaRpcPoolBuilder,
};
use itertools::Itertools;
-use light_client::rpc::{LightClient, LightClientConfig, Rpc};
+use light_client::{
+ indexer::Indexer,
+ rpc::{LightClient, LightClientConfig, Rpc},
+};
use light_compressed_account::TreeType;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::sync::{mpsc, oneshot, Mutex};
@@ -43,7 +46,7 @@ use crate::{
print_state_v2_output_queue_info,
},
slot_tracker::SlotTracker,
- utils::{get_protocol_config_with_retry, get_slot_with_retry},
+ utils::{get_protocol_config_with_retry, get_slot_with_retry, retry_with_backoff, RetryConfig},
};
pub async fn run_queue_info(
@@ -138,7 +141,7 @@ pub async fn run_queue_info(
Ok(())
}
-pub async fn run_pipeline(
+pub async fn run_pipeline(
config: Arc,
rpc_rate_limiter: Option,
send_tx_rate_limiter: Option,
@@ -207,66 +210,203 @@ pub async fn run_pipeline(
config.transaction_config.ops_cache_ttl_seconds,
)));
- let compressible_tracker = if let Some(compressible_config) = &config.compressible_config {
+ let (compressible_tracker, pda_tracker, mint_tracker) = if let Some(compressible_config) =
+ &config.compressible_config
+ {
+ // Validate on-chain CompressibleConfig at startup (fail fast on misconfiguration)
+ compressible::validate_compressible_config(&config.external_services.rpc_url).await?;
+
if let Some(shutdown_rx) = shutdown_compressible {
- let tracker = Arc::new(compressible::CompressibleAccountTracker::new());
- let tracker_clone = tracker.clone();
- let ws_url = compressible_config.ws_url.clone();
+ // Create all shutdown receivers upfront (before any are moved)
+ let shutdown_rx_ctoken = shutdown_rx.resubscribe();
+ let shutdown_rx_mint = shutdown_rx.resubscribe();
+ let shutdown_rx_mint_bootstrap = shutdown_rx.resubscribe();
+ // Keep original for PDA subscriptions (will resubscribe per-program)
+ let shutdown_rx_pda_base = shutdown_rx;
- // Create a second receiver for the log subscriber
- let shutdown_rx_log = shutdown_rx.resubscribe();
+ // Create ctoken tracker
+ let ctoken_tracker = Arc::new(compressible::CTokenAccountTracker::new());
+ let tracker_clone = ctoken_tracker.clone();
+ let ws_url = compressible_config.ws_url.clone();
- // Spawn account subscriber
+ // Spawn account subscriber for ctokens
tokio::spawn(async move {
- let mut subscriber =
- compressible::AccountSubscriber::new(ws_url, tracker_clone, shutdown_rx);
+ let mut subscriber = compressible::AccountSubscriber::new(
+ ws_url,
+ tracker_clone,
+ compressible::SubscriptionConfig::ctoken(),
+ shutdown_rx_ctoken,
+ );
if let Err(e) = subscriber.run().await {
tracing::error!("Compressible subscriber error: {:?}", e);
}
});
- // Spawn log subscriber to detect compress_and_close operations
- let tracker_clone_log = tracker.clone();
- let ws_url_log = compressible_config.ws_url.clone();
+ // Spawn bootstrap task for ctokens with shutdown support
+ if let Some(mut shutdown_bootstrap_rx) = shutdown_bootstrap {
+ let tracker_clone = ctoken_tracker.clone();
+ let rpc_url = config.external_services.rpc_url.clone();
- tokio::spawn(async move {
- let mut log_subscriber = compressible::LogSubscriber::new(
- ws_url_log,
- tracker_clone_log,
- shutdown_rx_log,
- );
- if let Err(e) = log_subscriber.run().await {
- tracing::error!("Log subscriber error: {:?}", e);
+ tokio::spawn(async move {
+ let retry_config = RetryConfig::new("CToken bootstrap")
+ .with_max_attempts(3)
+ .with_initial_delay(Duration::from_secs(5));
+
+ let bootstrap_future = retry_with_backoff(retry_config, || {
+ let rpc_url = rpc_url.clone();
+ let tracker = tracker_clone.clone();
+ async move {
+ compressible::bootstrap_ctoken_accounts(rpc_url, tracker, None).await
+ }
+ });
+
+ tokio::select! {
+ result = bootstrap_future => {
+ match result {
+ Ok(()) => tracing::info!("CToken bootstrap complete"),
+ Err(e) => tracing::error!("CToken bootstrap failed after retries: {:?}", e),
+ }
+ }
+ _ = &mut shutdown_bootstrap_rx => {
+ tracing::info!("CToken bootstrap interrupted by shutdown signal");
+ }
+ }
+ });
+ }
+
+ // Create PDA tracker if there are PDA programs configured
+ let pda_tracker = if !compressible_config.pda_programs.is_empty() {
+ let pda_tracker = Arc::new(compressible::pda::PdaAccountTracker::new(
+ compressible_config.pda_programs.clone(),
+ ));
+
+ // Spawn account subscribers for each PDA program
+ for pda_config in &compressible_config.pda_programs {
+ let pda_tracker_sub = pda_tracker.clone();
+ let ws_url_pda = compressible_config.ws_url.clone();
+ let shutdown_rx_pda = shutdown_rx_pda_base.resubscribe();
+ let program_id = pda_config.program_id;
+ let discriminator = pda_config.discriminator;
+ let program_name = format!(
+ "pda-{}",
+ program_id.to_string().chars().take(8).collect::()
+ );
+
+ tokio::spawn(async move {
+ let mut subscriber = compressible::AccountSubscriber::new(
+ ws_url_pda,
+ pda_tracker_sub,
+ compressible::SubscriptionConfig::pda(
+ program_id,
+ discriminator,
+ program_name.clone(),
+ ),
+ shutdown_rx_pda,
+ );
+ if let Err(e) = subscriber.run().await {
+ tracing::error!("PDA subscriber error for {}: {:?}", program_name, e);
+ }
+ });
}
- });
- // Spawn bootstrap task
- if let Some(shutdown_bootstrap_rx) = shutdown_bootstrap {
- let tracker_clone = tracker.clone();
+ // Spawn bootstrap task for PDAs with shutdown support
+ let pda_tracker_clone = pda_tracker.clone();
let rpc_url = config.external_services.rpc_url.clone();
+ let mut shutdown_rx_pda_bootstrap = shutdown_rx_pda_base.resubscribe();
tokio::spawn(async move {
- if let Err(e) = compressible::bootstrap_compressible_accounts(
- rpc_url,
- tracker_clone,
- shutdown_bootstrap_rx,
- )
- .await
- {
- tracing::error!("Bootstrap failed: {:?}", e);
- } else {
- tracing::info!("Bootstrap complete");
+ let retry_config = RetryConfig::new("PDA bootstrap")
+ .with_max_attempts(3)
+ .with_initial_delay(Duration::from_secs(5));
+
+ let bootstrap_future = retry_with_backoff(retry_config, || {
+ let rpc_url = rpc_url.clone();
+ let tracker = pda_tracker_clone.clone();
+ async move {
+ compressible::pda::bootstrap_pda_accounts(rpc_url, tracker, None).await
+ }
+ });
+
+ tokio::select! {
+ result = bootstrap_future => {
+ match result {
+ Ok(()) => tracing::info!("PDA bootstrap complete"),
+ Err(e) => tracing::error!("PDA bootstrap failed after retries: {:?}", e),
+ }
+ }
+ _ = shutdown_rx_pda_bootstrap.recv() => {
+ tracing::info!("PDA bootstrap interrupted by shutdown signal");
+ }
}
});
- }
- Some(tracker)
+ Some(pda_tracker)
+ } else {
+ None
+ };
+
+ // Create Mint tracker and spawn subscriptions + bootstrap
+ let mint_tracker = {
+ let mint_tracker = Arc::new(compressible::mint::MintAccountTracker::new());
+
+ // Spawn account subscriber for mints
+ let mint_tracker_sub = mint_tracker.clone();
+ let ws_url_mint = compressible_config.ws_url.clone();
+
+ tokio::spawn(async move {
+ let mut subscriber = compressible::AccountSubscriber::new(
+ ws_url_mint,
+ mint_tracker_sub,
+ compressible::SubscriptionConfig::mint(),
+ shutdown_rx_mint,
+ );
+ if let Err(e) = subscriber.run().await {
+ tracing::error!("Mint subscriber error: {:?}", e);
+ }
+ });
+
+ // Spawn bootstrap task for Mints with shutdown support
+ let mint_tracker_clone = mint_tracker.clone();
+ let rpc_url = config.external_services.rpc_url.clone();
+ let mut shutdown_rx_mint_bootstrap = shutdown_rx_mint_bootstrap;
+
+ tokio::spawn(async move {
+ let retry_config = RetryConfig::new("Mint bootstrap")
+ .with_max_attempts(3)
+ .with_initial_delay(Duration::from_secs(5));
+
+ let bootstrap_future = retry_with_backoff(retry_config, || {
+ let rpc_url = rpc_url.clone();
+ let tracker = mint_tracker_clone.clone();
+ async move {
+ compressible::mint::bootstrap_mint_accounts(rpc_url, tracker, None)
+ .await
+ }
+ });
+
+ tokio::select! {
+ result = bootstrap_future => {
+ match result {
+ Ok(()) => tracing::info!("Mint bootstrap complete"),
+ Err(e) => tracing::error!("Mint bootstrap failed after retries: {:?}", e),
+ }
+ }
+ _ = shutdown_rx_mint_bootstrap.recv() => {
+ tracing::info!("Mint bootstrap interrupted by shutdown signal");
+ }
+ }
+ });
+
+ Some(mint_tracker)
+ };
+
+ (Some(ctoken_tracker), pda_tracker, mint_tracker)
} else {
tracing::warn!("Compressible config enabled but no shutdown receiver provided");
- None
+ (None, None, None)
}
} else {
- None
+ (None, None, None)
};
debug!("Starting Forester pipeline");
@@ -280,6 +420,8 @@ pub async fn run_pipeline(
tx_cache,
ops_cache,
compressible_tracker,
+ pda_tracker,
+ mint_tracker,
)
.await;
diff --git a/forester/src/utils.rs b/forester/src/utils.rs
index 58b6c4b657..a1f64d7ac1 100644
--- a/forester/src/utils.rs
+++ b/forester/src/utils.rs
@@ -1,4 +1,7 @@
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use std::{
+ future::Future,
+ time::{Duration, SystemTime, UNIX_EPOCH},
+};
use light_client::rpc::{errors::RpcError, Rpc};
use light_registry::{
@@ -7,6 +10,131 @@ use light_registry::{
};
use tracing::{debug, info, warn};
+/// Configuration for retry with exponential backoff.
+#[derive(Debug, Clone)]
+pub struct RetryConfig {
+ /// Maximum number of attempts. None means infinite retry.
+ pub max_attempts: Option,
+ /// Initial delay before first retry.
+ pub initial_delay: Duration,
+ /// Maximum delay between retries.
+ pub max_delay: Duration,
+ /// Name of the operation for logging.
+ pub operation_name: String,
+}
+
+impl RetryConfig {
+ /// Creates a new RetryConfig with the given operation name.
+ /// Defaults to infinite retry with 5s initial delay and 60s max delay.
+ pub fn new(operation_name: impl Into) -> Self {
+ Self {
+ max_attempts: None, // Infinite by default
+ initial_delay: Duration::from_secs(5),
+ max_delay: Duration::from_secs(60),
+ operation_name: operation_name.into(),
+ }
+ }
+
+ /// Sets the maximum number of attempts.
+ pub fn with_max_attempts(mut self, max_attempts: u64) -> Self {
+ self.max_attempts = Some(max_attempts);
+ self
+ }
+
+ /// Sets the initial delay.
+ pub fn with_initial_delay(mut self, delay: Duration) -> Self {
+ self.initial_delay = delay;
+ self
+ }
+
+ /// Sets the maximum delay.
+ pub fn with_max_delay(mut self, delay: Duration) -> Self {
+ self.max_delay = delay;
+ self
+ }
+}
+
+impl Default for RetryConfig {
+ fn default() -> Self {
+ Self::new("operation")
+ }
+}
+
+/// Generic retry with exponential backoff.
+///
+/// Executes the given async operation with retry logic. On failure, waits with
+/// exponential backoff before retrying.
+///
+/// # Arguments
+/// * `config` - Retry configuration (max attempts, delays, operation name)
+/// * `f` - Async closure that returns `Result`. Must be `'static` and own its captures.
+///
+/// # Returns
+/// * `Ok(T)` - On success
+/// * `Err(E)` - If max_attempts is reached (only when max_attempts is Some)
+///
+/// # Example
+/// ```ignore
+/// let config = RetryConfig::new("bootstrap")
+/// .with_max_attempts(3)
+/// .with_initial_delay(Duration::from_secs(2));
+///
+/// let rpc_url = rpc_url.clone();
+/// let tracker = tracker.clone();
+/// retry_with_backoff(config, move || {
+/// let rpc_url = rpc_url.clone();
+/// let tracker = tracker.clone();
+/// async move {
+/// bootstrap_accounts(rpc_url, tracker).await
+/// }
+/// }).await
+/// ```
+pub async fn retry_with_backoff(config: RetryConfig, mut f: F) -> Result
+where
+ F: FnMut() -> Fut,
+ Fut: Future