Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions cli/src/commands/test-validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,18 @@ class SetupCommand extends Command {
default: 8784,
exclusive: ["skip-indexer"],
}),
"prover-port": Flags.integer({
description: "Enable Light Prover server on this port.",
required: false,
default: 3001,
exclusive: ["skip-prover"],
}),
"grpc-port": Flags.integer({
description: "Enable Photon indexer gRPC on this port.",
required: false,
default: 50051,
exclusive: ["skip-indexer"],
}),
"prover-port": Flags.integer({
description: "Enable Light Prover server on this port.",
required: false,
default: 3001,
exclusive: ["skip-prover"],
}),
"limit-ledger-size": Flags.integer({
description: "Keep this amount of shreds in root slots.",
required: false,
Expand Down
3 changes: 1 addition & 2 deletions cli/src/utils/initTestEnv.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { airdropSol } from "@lightprotocol/stateless.js";
import { getConfig, getPayer, setAnchorProvider, setConfig } from "./utils";
import {
BASE_PATH,
Expand Down Expand Up @@ -131,9 +130,9 @@ export async function initTestEnv({
await startIndexer(
`http://127.0.0.1:${rpcPort}`,
indexerPort,
grpcPort,
checkPhotonVersion,
photonDatabaseUrl,
grpcPort,
);
}

Expand Down
4 changes: 4 additions & 0 deletions cli/src/utils/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ export function spawnBinary(command: string, args: string[] = []) {
stdio: ["ignore", out, err],
shell: false,
detached: true,
env: {
...process.env,
RUST_LOG: process.env.RUST_LOG || "debug",
},
});

spawnedProcess.on("close", async (code) => {
Expand Down
2 changes: 1 addition & 1 deletion cli/src/utils/processPhotonIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ function getPhotonInstallMessage(): string {
export async function startIndexer(
rpcUrl: string,
indexerPort: number,
grpcPort: number = 50051,
checkPhotonVersion: boolean = true,
photonDatabaseUrl?: string,
grpcPort: number = 50051,
) {
await killIndexer();
const resolvedOrNull = which.sync("photon", { nothrow: true });
Expand Down
11 changes: 5 additions & 6 deletions forester-utils/src/instructions/state_batch_append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use light_batched_merkle_tree::{
use light_client::{indexer::Indexer, rpc::Rpc};
use light_compressed_account::instruction_data::compressed_proof::CompressedProof;
use light_hasher::bigint::bigint_to_be_bytes_array;
use light_merkle_tree_metadata::QueueType;
use light_prover_client::{
proof_client::ProofClient,
proof_types::batch_append::{get_batch_append_inputs, BatchAppendsCircuitInputs},
Expand Down Expand Up @@ -120,18 +119,19 @@ pub async fn get_append_instruction_stream<'a, R: Rpc>(
indexer
.get_queue_elements(
merkle_tree_pubkey.to_bytes(),
QueueType::OutputStateV2,
zkp_batch_size,
next_queue_index,
Some(zkp_batch_size),
None,
None,
None,
)
.await
};

let (batch_elements, batch_first_queue_idx) = match queue_elements_result {
Ok(res) => {
let items = res.value.elements;
let first_idx = res.value.first_value_queue_index;
let items = res.value.output_queue_elements.unwrap_or_default();
let first_idx = res.value.output_queue_index;
if items.len() != zkp_batch_size as usize {
warn!(
"Got {} elements but expected {}, stopping",
Expand Down Expand Up @@ -238,6 +238,5 @@ pub async fn get_append_instruction_stream<'a, R: Rpc>(
yield Ok(proofs_buffer);
}
};

Ok((Box::pin(stream), zkp_batch_size))
}
10 changes: 5 additions & 5 deletions forester-utils/src/instructions/state_batch_nullify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use light_batched_merkle_tree::{
use light_client::{indexer::Indexer, rpc::Rpc};
use light_compressed_account::instruction_data::compressed_proof::CompressedProof;
use light_hasher::bigint::bigint_to_be_bytes_array;
use light_merkle_tree_metadata::QueueType;
use light_prover_client::{
proof_client::ProofClient,
proof_types::batch_update::{get_batch_update_inputs, BatchUpdateCircuitInputs},
Expand Down Expand Up @@ -113,18 +112,19 @@ pub async fn get_nullify_instruction_stream<'a, R: Rpc>(
let indexer = connection.indexer_mut()?;
indexer.get_queue_elements(
merkle_tree_pubkey.to_bytes(),
QueueType::InputStateV2,
zkp_batch_size,
None,
None,
next_queue_index,
Some(zkp_batch_size),
None,
)
.await
};

let (batch_elements, batch_first_queue_idx) = match queue_elements_result {
Ok(res) => {
let items = res.value.elements;
let first_idx = res.value.first_value_queue_index;
let items = res.value.input_queue_elements.unwrap_or_default();
let first_idx = res.value.input_queue_index;
if items.len() != zkp_batch_size as usize {
warn!(
"Got {} elements but expected {}, stopping",
Expand Down
4 changes: 3 additions & 1 deletion forester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ light-system-program-anchor = { workspace = true, features = ["cpi"] }
light-hash-set = { workspace = true, features = ["solana"] }
light-hasher = { workspace = true, features = ["poseidon"] }
light-merkle-tree-reference = { workspace = true }
light-prover-client = { workspace = true }
light-registry = { workspace = true }
photon-api = { workspace = true }
forester-utils = { workspace = true }
light-client = { workspace = true, features = ["v2"] }
light-merkle-tree-metadata = { workspace = true }
light-sparse-merkle-tree = { workspace = true }
light-sdk = { workspace = true, features = ["anchor"] }
light-program-test = { workspace = true }
solana-transaction-status = { workspace = true }
Expand Down Expand Up @@ -53,12 +55,12 @@ scopeguard = "1.2.0"
itertools = "0.14.0"
num-bigint = { workspace = true }

# gRPC client for Photon queue subscriptions (match Photon versions)
tonic = "0.14.2"
prost = "0.14.1"
prost-types = "0.14.1"
tonic-prost = "0.14.2"
tokio-stream = { version = "0.1", features = ["sync"] }
once_cell = "1.21.3"

[build-dependencies]
tonic-prost-build = "0.14.2"
Expand Down
2 changes: 1 addition & 1 deletion forester/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"license": "GPL-3.0",
"scripts": {
"build": "cargo build",
"test": "redis-start && RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester e2e_test -- --nocapture",
"test": "redis-start && TEST_MODE=local TEST_V1_STATE=true TEST_V2_STATE=true TEST_V1_ADDRESS=true TEST_V2_ADDRESS=true RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester e2e_test -- --nocapture",
"docker:build": "docker build --tag forester -f Dockerfile .."
},
"devDependencies": {
Expand Down
64 changes: 32 additions & 32 deletions forester/proto/photon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,67 @@ package photon;

// Queue information service
service QueueService {
// Get current queue information for all or specific trees
rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse);
// Get current queue information for all or specific trees
rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse);

// Subscribe to queue updates
rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate);
// Subscribe to queue updates
rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate);
}

// Request message for GetQueueInfo
message GetQueueInfoRequest {
// Optional list of tree pubkeys to filter by (base58 encoded)
// If empty, returns info for all trees
repeated string trees = 1;
// Optional list of tree pubkeys to filter by (base58 encoded)
// If empty, returns info for all trees
repeated string trees = 1;
}

// Response message for GetQueueInfo
message GetQueueInfoResponse {
repeated QueueInfo queues = 1;
uint64 slot = 2;
repeated QueueInfo queues = 1;
uint64 slot = 2;
}

// Information about a single queue
message QueueInfo {
// Tree public key (base58 encoded)
string tree = 1;
// Tree public key (base58 encoded)
string tree = 1;

// Queue public key (base58 encoded)
string queue = 2;
// Queue public key (base58 encoded)
string queue = 2;

// Queue type: 3 = InputStateV2, 4 = AddressV2, 5 = OutputStateV2
uint32 queue_type = 3;
// Queue type: 3 = InputStateV2, 4 = AddressV2, 5 = OutputStateV2
uint32 queue_type = 3;

// Current number of items in the queue
uint64 queue_size = 4;
// Current number of items in the queue
uint64 queue_size = 4;
}

// Request message for SubscribeQueueUpdates
message SubscribeQueueUpdatesRequest {
// Optional list of tree pubkeys to subscribe to (base58 encoded)
// If empty, subscribes to all trees
repeated string trees = 1;
// Optional list of tree pubkeys to subscribe to (base58 encoded)
// If empty, subscribes to all trees
repeated string trees = 1;

// Whether to send initial state before streaming updates
bool send_initial_state = 2;
// Whether to send initial state before streaming updates
bool send_initial_state = 2;
}

// Streamed queue update message
message QueueUpdate {
// The queue that was updated
QueueInfo queue_info = 1;
// The queue that was updated
QueueInfo queue_info = 1;

// Slot at which the update occurred
uint64 slot = 2;
// Slot at which the update occurred
uint64 slot = 2;

// Type of update
UpdateType update_type = 3;
// Type of update
UpdateType update_type = 3;
}

// Type of queue update
enum UpdateType {
UPDATE_TYPE_UNSPECIFIED = 0;
UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription
UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue
UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue
UPDATE_TYPE_UNSPECIFIED = 0;
UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription
UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue
UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue
}
12 changes: 12 additions & 0 deletions forester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub struct GeneralConfig {
pub skip_v2_state_trees: bool,
pub skip_v2_address_trees: bool,
pub tree_id: Option<Pubkey>,
pub sleep_after_processing_ms: u64,
pub sleep_when_idle_ms: u64,
}

impl Default for GeneralConfig {
Expand All @@ -96,6 +98,8 @@ impl Default for GeneralConfig {
skip_v2_state_trees: false,
skip_v2_address_trees: false,
tree_id: None,
sleep_after_processing_ms: 10_000,
sleep_when_idle_ms: 45_000,
}
}
}
Expand All @@ -111,6 +115,8 @@ impl GeneralConfig {
skip_v2_state_trees: true,
skip_v2_address_trees: false,
tree_id: None,
sleep_after_processing_ms: 50,
sleep_when_idle_ms: 100,
}
}

Expand All @@ -124,6 +130,8 @@ impl GeneralConfig {
skip_v2_state_trees: false,
skip_v2_address_trees: true,
tree_id: None,
sleep_after_processing_ms: 50,
sleep_when_idle_ms: 100,
}
}
}
Expand Down Expand Up @@ -276,6 +284,8 @@ impl ForesterConfig {
.tree_id
.as_ref()
.and_then(|id| Pubkey::from_str(id).ok()),
sleep_after_processing_ms: 10_000,
sleep_when_idle_ms: 45_000,
},
rpc_pool_config: RpcPoolConfig {
max_size: args.rpc_pool_size,
Expand Down Expand Up @@ -332,6 +342,8 @@ impl ForesterConfig {
skip_v1_address_trees: false,
skip_v2_address_trees: false,
tree_id: None,
sleep_after_processing_ms: 10_000,
sleep_when_idle_ms: 45_000,
},
rpc_pool_config: RpcPoolConfig {
max_size: 10,
Expand Down
Loading
Loading