Skip to content

Commit 6a4e92e

Browse files
committed
refactor: feature-gate optional modules, remove anyhow from production deps
P1: Feature gates (minimal core + external extensions) - Add 4 feature flags: metrics, monitoring, telemetry, distributed - Gate telemetry.rs behind 'telemetry' (removes opentelemetry, opentelemetry_sdk, dashmap) - Gate distributed.rs, partition.rs, ratelimit.rs, boost.rs behind 'distributed' (removes num_cpus) - Gate metrics.rs behind 'metrics' - Gate alerts.rs, monitor.rs behind 'monitoring' - Minimal core: 8 deps (was 12), 114 tests - All features: 12 deps, 230 tests P2: Remove anyhow from production dependencies - Replace anyhow::Result with LaneError Result in manager.rs (start, stats, build) - Move anyhow to dev-dependencies (examples only) Other: - Add required-features to observability/scalability examples - Add release profile (opt-level=z, LTO, strip) - Bump version to 0.3.0 - Update lib.rs doc comment with feature flag table
1 parent e61b761 commit 6a4e92e

6 files changed

Lines changed: 141 additions & 110 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "a3s-lane"
3-
version = "0.2.3"
3+
version = "0.3.0"
44
edition = "2021"
55
authors = ["A3S Lab"]
66
license = "MIT"
@@ -15,27 +15,52 @@ categories = ["asynchronous", "concurrency"]
1515
name = "a3s_lane"
1616
path = "src/lib.rs"
1717

18+
[features]
19+
default = ["metrics", "monitoring", "telemetry", "distributed"]
20+
metrics = []
21+
monitoring = ["metrics"]
22+
telemetry = ["metrics", "dep:opentelemetry", "dep:opentelemetry_sdk", "dep:dashmap"]
23+
distributed = ["dep:num_cpus"]
24+
1825
[dependencies]
1926
tokio = { version = "1", features = ["sync", "time", "rt", "macros", "fs"] }
2027
async-trait = "0.1"
2128
tracing = "0.1"
2229
serde = { version = "1", features = ["derive"] }
2330
serde_json = "1"
24-
anyhow = "1"
2531
thiserror = "1"
2632
uuid = { version = "1", features = ["v4"] }
2733
chrono = { version = "0.4", features = ["serde"] }
28-
num_cpus = "1"
29-
opentelemetry = { version = "0.21", features = ["metrics"] }
30-
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio", "metrics"] }
31-
dashmap = "6.0"
34+
35+
# Optional: distributed
36+
num_cpus = { version = "1", optional = true }
37+
38+
# Optional: telemetry
39+
opentelemetry = { version = "0.21", features = ["metrics"], optional = true }
40+
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio", "metrics"], optional = true }
41+
dashmap = { version = "6.0", optional = true }
3242

3343
[dev-dependencies]
3444
tokio = { version = "1", features = ["full", "test-util"] }
3545
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
3646
tempfile = "3"
3747
criterion = { version = "0.5", features = ["async_tokio"] }
48+
anyhow = "1"
3849

3950
[[bench]]
4051
name = "queue_benchmark"
4152
harness = false
53+
54+
[[example]]
55+
name = "observability"
56+
required-features = ["monitoring", "telemetry"]
57+
58+
[[example]]
59+
name = "scalability"
60+
required-features = ["distributed"]
61+
62+
[profile.release]
63+
opt-level = "z"
64+
lto = true
65+
codegen-units = 1
66+
strip = true

src/config.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
//! .with_rate_limit(RateLimitConfig::per_second(100));
3131
//! ```
3232
33+
use crate::retry::RetryPolicy;
34+
#[cfg(feature = "distributed")]
3335
use crate::boost::PriorityBoostConfig;
36+
#[cfg(feature = "distributed")]
3437
use crate::ratelimit::RateLimitConfig;
35-
use crate::retry::RetryPolicy;
3638
use serde::{Deserialize, Serialize};
3739
use std::time::Duration;
3840

@@ -82,9 +84,11 @@ pub struct LaneConfig {
8284
#[serde(default)]
8385
pub retry_policy: RetryPolicy,
8486
/// Rate limit configuration
87+
#[cfg(feature = "distributed")]
8588
#[serde(skip)]
8689
pub rate_limit: Option<RateLimitConfig>,
8790
/// Priority boost configuration
91+
#[cfg(feature = "distributed")]
8892
#[serde(skip)]
8993
pub priority_boost: Option<PriorityBoostConfig>,
9094
}
@@ -119,7 +123,9 @@ impl Default for LaneConfig {
119123
max_concurrency: 4,
120124
default_timeout: None,
121125
retry_policy: RetryPolicy::default(),
126+
#[cfg(feature = "distributed")]
122127
rate_limit: None,
128+
#[cfg(feature = "distributed")]
123129
priority_boost: None,
124130
}
125131
}
@@ -147,7 +153,9 @@ impl LaneConfig {
147153
max_concurrency,
148154
default_timeout: None,
149155
retry_policy: RetryPolicy::default(),
156+
#[cfg(feature = "distributed")]
150157
rate_limit: None,
158+
#[cfg(feature = "distributed")]
151159
priority_boost: None,
152160
}
153161
}
@@ -201,6 +209,7 @@ impl LaneConfig {
201209
/// let config = LaneConfig::new(1, 10)
202210
/// .with_rate_limit(RateLimitConfig::per_second(100));
203211
/// ```
212+
#[cfg(feature = "distributed")]
204213
pub fn with_rate_limit(mut self, rate_limit: RateLimitConfig) -> Self {
205214
self.rate_limit = Some(rate_limit);
206215
self
@@ -220,6 +229,7 @@ impl LaneConfig {
220229
/// let config = LaneConfig::new(1, 10)
221230
/// .with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(300)));
222231
/// ```
232+
#[cfg(feature = "distributed")]
223233
pub fn with_priority_boost(mut self, priority_boost: PriorityBoostConfig) -> Self {
224234
self.priority_boost = Some(priority_boost);
225235
self
@@ -277,6 +287,7 @@ mod tests {
277287
assert_eq!(config.retry_policy, retry);
278288
}
279289

290+
#[cfg(feature = "distributed")]
280291
#[test]
281292
fn test_lane_config_with_rate_limit() {
282293
let rate_limit = RateLimitConfig::per_second(100);
@@ -285,6 +296,7 @@ mod tests {
285296
assert_eq!(config.rate_limit.unwrap().max_commands, 100);
286297
}
287298

299+
#[cfg(feature = "distributed")]
288300
#[test]
289301
fn test_lane_config_with_priority_boost() {
290302
let boost = PriorityBoostConfig::standard(Duration::from_secs(60));

src/lib.rs

Lines changed: 61 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,157 +1,123 @@
11
//! # A3S Lane
22
//!
3-
//! A high-performance, priority-based command queue for async task scheduling with comprehensive
4-
//! reliability, scalability, and observability features.
3+
//! A priority-based command queue for async task scheduling.
54
//!
6-
//! ## Overview
5+
//! ## Core (always compiled)
76
//!
8-
//! A3S Lane provides a lane-based priority command queue system designed for managing concurrent
9-
//! async operations with different priority levels. Commands are organized into lanes, each with
10-
//! configurable concurrency limits, timeouts, retry policies, and rate limiting.
7+
//! - Priority-based scheduling with per-lane concurrency control
8+
//! - Command timeout and retry policies (exponential backoff, fixed delay)
9+
//! - Dead letter queue for permanently failed commands
10+
//! - Persistent storage (pluggable `Storage` trait, `LocalStorage` included)
11+
//! - Event system for queue lifecycle notifications
12+
//! - Graceful shutdown with drain support
1113
//!
12-
//! ## Core Features
14+
//! ## Feature Flags
1315
//!
14-
//! ### Phase 1: Core Queue System
15-
//! - **Priority-based scheduling**: Commands execute based on lane priority (lower = higher priority)
16-
//! - **Concurrency control**: Per-lane min/max concurrency limits with semaphore-based coordination
17-
//! - **Built-in lanes**: 6 predefined lanes (system, control, query, session, skill, prompt)
18-
//! - **Event system**: Subscribe to queue events for real-time monitoring
19-
//! - **Health monitoring**: Track queue depth and active command counts
20-
//! - **Builder pattern**: Flexible, ergonomic queue configuration
21-
//!
22-
//! ### Phase 2: Reliability
23-
//! - **Command timeout**: Configurable timeout per lane with automatic cancellation
24-
//! - **Retry policies**: Exponential backoff, fixed delay, or custom retry strategies
25-
//! - **Dead letter queue**: Capture permanently failed commands for inspection and replay
26-
//! - **Persistent storage**: Optional pluggable storage backend (LocalStorage included)
27-
//! - **Graceful shutdown**: Drain pending commands before shutdown with timeout
28-
//!
29-
//! ### Phase 3: Scalability
30-
//! - **Multi-core parallelism**: Automatic CPU core detection and parallel processing
31-
//! - **Queue partitioning**: Distribute commands across workers (round-robin, hash-based, custom)
32-
//! - **Rate limiting**: Token bucket and sliding window rate limiters per lane
33-
//! - **Priority boosting**: Deadline-based automatic priority adjustment
34-
//! - **Distributed queue**: Pluggable interface for multi-machine processing
35-
//!
36-
//! ### Phase 4: Observability
37-
//! - **Metrics collection**: Local in-memory metrics with pluggable backend support
38-
//! - **Latency histograms**: Track command execution and wait time with percentiles (p50, p90, p95, p99)
39-
//! - **Queue depth alerts**: Configurable warning and critical thresholds
40-
//! - **Latency alerts**: Monitor and alert on command execution latency
41-
//! - **Prometheus/OpenTelemetry ready**: Implement MetricsBackend trait for external systems
16+
//! | Feature | Default | Dependencies | Description |
17+
//! |---------|---------|-------------|-------------|
18+
//! | `metrics` | ✅ | — | `MetricsBackend` trait, `LocalMetrics`, latency histograms |
19+
//! | `monitoring` | ✅ | `metrics` | `AlertManager`, `QueueMonitor` with depth/latency thresholds |
20+
//! | `telemetry` | ✅ | `opentelemetry`, `dashmap` | OpenTelemetry spans and `OtelMetricsBackend` |
21+
//! | `distributed` | ✅ | `num_cpus` | Partitioning, rate limiting, priority boosting, `DistributedQueue` |
4222
//!
4323
//! ## Quick Start
4424
//!
4525
//! ```rust,ignore
4626
//! use a3s_lane::{QueueManagerBuilder, EventEmitter, Command, Result};
4727
//! use async_trait::async_trait;
4828
//!
49-
//! // Define a command
50-
//! struct MyCommand {
51-
//! data: String,
52-
//! }
29+
//! struct MyCommand { data: String }
5330
//!
5431
//! #[async_trait]
5532
//! impl Command for MyCommand {
5633
//! async fn execute(&self) -> Result<serde_json::Value> {
5734
//! Ok(serde_json::json!({"processed": self.data}))
5835
//! }
59-
//!
60-
//! fn command_type(&self) -> &str {
61-
//! "my_command"
62-
//! }
36+
//! fn command_type(&self) -> &str { "my_command" }
6337
//! }
6438
//!
6539
//! #[tokio::main]
66-
//! async fn main() -> anyhow::Result<()> {
67-
//! // Create event emitter
68-
//! let emitter = EventEmitter::new(100);
69-
//!
70-
//! // Build queue manager with default lanes
71-
//! let manager = QueueManagerBuilder::new(emitter)
40+
//! async fn main() -> Result<()> {
41+
//! let manager = QueueManagerBuilder::new(EventEmitter::new(100))
7242
//! .with_default_lanes()
7343
//! .build()
7444
//! .await?;
7545
//!
76-
//! // Start the scheduler
7746
//! manager.start().await?;
7847
//!
79-
//! // Submit a command
80-
//! let cmd = Box::new(MyCommand { data: "hello".to_string() });
81-
//! let rx = manager.submit("query", cmd).await?;
82-
//!
83-
//! // Wait for result
48+
//! let rx = manager.submit("query", Box::new(MyCommand { data: "hello".into() })).await?;
8449
//! let result = rx.await??;
8550
//! println!("Result: {}", result);
86-
//!
8751
//! Ok(())
8852
//! }
8953
//! ```
90-
//!
91-
//! ## Lane Priority Model
92-
//!
93-
//! | Lane | Priority | Default Max Concurrency | Use Case |
94-
//! |------|----------|------------------------|----------|
95-
//! | system | 0 (highest) | 5 | System-level operations |
96-
//! | control | 1 | 3 | Control commands (pause, resume, cancel) |
97-
//! | query | 2 | 10 | Read-only queries |
98-
//! | session | 3 | 5 | Session management |
99-
//! | skill | 4 | 3 | Skill/tool execution |
100-
//! | prompt | 5 (lowest) | 2 | LLM prompt processing |
101-
//!
102-
//! ## Examples
103-
//!
104-
//! See the `examples/` directory for comprehensive examples:
105-
//! - `basic_usage.rs` - Simple command submission and result handling
106-
//! - `reliability.rs` - Timeout, retry policies, DLQ, graceful shutdown
107-
//! - `observability.rs` - Metrics collection, latency histograms, alerts
108-
//! - `scalability.rs` - Rate limiting, priority boosting, partitioning
10954
110-
pub mod alerts;
111-
pub mod boost;
55+
// Core modules (always compiled)
11256
pub mod config;
113-
pub mod distributed;
11457
pub mod dlq;
11558
pub mod error;
11659
pub mod event;
11760
pub mod manager;
118-
pub mod metrics;
119-
pub mod monitor;
120-
pub mod partition;
12161
pub mod queue;
122-
pub mod ratelimit;
12362
pub mod retry;
12463
pub mod storage;
64+
65+
// Feature-gated modules
66+
#[cfg(feature = "metrics")]
67+
pub mod metrics;
68+
#[cfg(feature = "monitoring")]
69+
pub mod alerts;
70+
#[cfg(feature = "monitoring")]
71+
pub mod monitor;
72+
#[cfg(feature = "telemetry")]
12573
pub mod telemetry;
74+
#[cfg(feature = "distributed")]
75+
pub mod boost;
76+
#[cfg(feature = "distributed")]
77+
pub mod partition;
78+
#[cfg(feature = "distributed")]
79+
pub mod ratelimit;
80+
#[cfg(feature = "distributed")]
81+
pub mod distributed;
12682

127-
// Re-export main types
128-
pub use alerts::{Alert, AlertLevel, AlertManager, LatencyAlertConfig, QueueDepthAlertConfig};
129-
pub use boost::{PriorityBoostConfig, PriorityBooster};
83+
// Core re-exports
13084
pub use config::LaneConfig;
131-
pub use distributed::{
132-
CommandEnvelope, CommandResult, DistributedQueue, LocalDistributedQueue, WorkerId, WorkerPool,
133-
};
13485
pub use dlq::{DeadLetter, DeadLetterQueue};
13586
pub use error::{LaneError, Result};
13687
pub use event::{EventEmitter, EventPayload, EventStream, LaneEvent};
13788
pub use manager::{QueueManager, QueueManagerBuilder};
89+
pub use queue::{
90+
lane_ids, priorities, Command, CommandId, CommandQueue, JsonCommand, Lane, LaneId, LaneStatus,
91+
Priority,
92+
};
93+
pub use retry::RetryPolicy;
94+
pub use storage::{LocalStorage, Storage, StoredCommand, StoredDeadLetter};
95+
96+
// Feature-gated re-exports
97+
#[cfg(feature = "metrics")]
13898
pub use metrics::{
13999
metric_names, HistogramPercentiles, HistogramStats, LocalMetrics, MetricsBackend,
140100
MetricsSnapshot, QueueMetrics,
141101
};
102+
#[cfg(feature = "monitoring")]
103+
pub use alerts::{Alert, AlertLevel, AlertManager, LatencyAlertConfig, QueueDepthAlertConfig};
104+
#[cfg(feature = "monitoring")]
142105
pub use monitor::{MonitorConfig, QueueMonitor};
106+
#[cfg(feature = "telemetry")]
107+
pub use telemetry::OtelMetricsBackend;
108+
#[cfg(feature = "distributed")]
109+
pub use boost::{PriorityBoostConfig, PriorityBooster};
110+
#[cfg(feature = "distributed")]
111+
pub use distributed::{
112+
CommandEnvelope, CommandResult, DistributedQueue, LocalDistributedQueue, WorkerId, WorkerPool,
113+
};
114+
#[cfg(feature = "distributed")]
143115
pub use partition::{
144116
CustomPartitioner, HashPartitioner, PartitionConfig, PartitionId, PartitionStrategy,
145117
Partitioner, RoundRobinPartitioner,
146118
};
147-
pub use queue::{
148-
lane_ids, priorities, Command, CommandId, CommandQueue, JsonCommand, Lane, LaneId, LaneStatus,
149-
Priority,
150-
};
119+
#[cfg(feature = "distributed")]
151120
pub use ratelimit::{RateLimitConfig, RateLimiter, SlidingWindowLimiter, TokenBucketLimiter};
152-
pub use retry::RetryPolicy;
153-
pub use storage::{LocalStorage, Storage, StoredCommand, StoredDeadLetter};
154-
pub use telemetry::OtelMetricsBackend;
155121

156122
use serde::{Deserialize, Serialize};
157123
use std::collections::HashMap;

0 commit comments

Comments
 (0)