Skip to content

pragmaplatform/oxanus

Repository files navigation

Oxanus

Build Status Latest Version docs.rs

Oxanus logo

Oxanus is job processing library written in Rust doesn't suck (or at least sucks in a completely different way than other options).

Oxanus goes for simplicity and depth over breadth. It only aims to support a single backend with a simple flow.

Key Features

  • Isolated Queues: Separate job processing queues with independent configurations
  • Retrying: Automatic retry of failed jobs with configurable backoff
  • Scheduled Jobs: Schedule jobs to run at specific times or after delays
  • Dynamic Queues: Create and manage queues at runtime
  • Throttling: Control job processing rates with queue-based throttling
  • Unique Jobs: Ensure only one instance of a job runs at a time
  • Resilient Jobs: Jobs that can survive worker crashes and restarts
  • Graceful Shutdown: Clean shutdown of workers with in-progress job handling
  • Periodic Jobs: Run jobs on a schedule using cron-like expressions
  • Resumable Jobs: Jobs that can be resumed from where they left off when they are retried

Quick Start

use oxanus::{Context, Storage};
use serde::{Serialize, Deserialize};

// Define your component registry
#[derive(oxanus::Registry)]
struct ComponentRegistry(oxanus::ComponentRegistry<MyContext, MyError>);

// Define your error type
#[derive(Debug, thiserror::Error)]
enum MyError {}

// Define your context
#[derive(Debug, Clone)]
struct MyContext {}

// Define your worker using the derive macro
#[derive(Debug, Serialize, Deserialize, oxanus::Worker)]
struct MyWorker {
    data: String,
}

impl MyWorker {
    async fn process(&self, _ctx: &Context<MyContext>) -> Result<(), MyError> {
        // Process your job here
        println!("Processing: {}", self.data);
        Ok(())
    }
}

// Define your queue using the derive macro
#[derive(Serialize, oxanus::Queue)]
#[oxanus(key = "my_queue", concurrency = 2)]
struct MyQueue;

// Run your worker
#[tokio::main]
async fn main() -> Result<(), oxanus::OxanusError> {
    let ctx = Context::value(MyContext {});
    let storage = Storage::builder().build_from_env()?;
    let config = ComponentRegistry::build_config(&storage)
        .with_graceful_shutdown(tokio::signal::ctrl_c());

    // Enqueue some jobs
    storage.enqueue(MyQueue, MyWorker { data: "hello".into() }).await?;

    // Run the worker
    oxanus::run(config, ctx).await?;
    Ok(())
}

For more detailed usage examples, check out the examples directory.

Core Concepts

Workers

Workers are the units of work in Oxanus. They can be defined using the #[derive(oxanus::Worker)] macro or by implementing the [Worker] trait manually. Workers define the processing logic for jobs.

Worker attributes:

  • #[oxanus(max_retries = 3)] - Set maximum retry attempts
  • #[oxanus(retry_delay = 5)] - Set retry delay in seconds
  • #[oxanus(unique_id = "worker_{id}")] - Define unique job identifiers
  • #[oxanus(on_conflict = Skip)] - Handle job conflicts (Skip or Replace)
  • #[oxanus(cron(schedule = "*/5 * * * * *", queue = MyQueue))] - Schedule periodic jobs

Queues

Queues are the channels through which jobs flow. They can be defined using the #[derive(oxanus::Queue)] macro or by implementing the [Queue] trait manually.

Queues can be:

  • Static: Defined at compile time with a fixed key
  • Dynamic: Created at runtime with each instance being a separate queue (requires struct fields)

Queue attributes:

  • #[oxanus(key = "my_queue")] - Set static queue key
  • #[oxanus(prefix = "dynamic")] - Set prefix for dynamic queues
  • #[oxanus(concurrency = 2)] - Set concurrency limit
  • #[oxanus(throttle(window_ms = 2000, limit = 5))] - Configure throttling

Component Registry

The component registry automatically discovers and registers all workers and queues in your application. Use #[derive(oxanus::Registry)] to create a registry and ComponentRegistry::build_config() to build the configuration.

Storage

The [Storage] trait provides the interface for job persistence. It handles:

  • Job enqueueing
  • Job scheduling
  • Job state management
  • Queue monitoring

Storage is built using Storage::builder().build_from_env() which reads the REDIS_URL environment variable.

Context

The context provides shared state and utilities to workers. It can include:

  • Database connections
  • Configuration
  • Shared resources
  • Job state (for resumable jobs)

Configuration

Configuration is done through the [Config] builder, which allows you to:

  • Automatically register queues and workers via the component registry
  • Set up graceful shutdown
  • Configure exit conditions

Error Handling

Oxanus uses a custom error type [OxanusError] that covers all possible error cases in the library. Workers can define their own error type that implements std::error::Error.

Prometheus Metrics

Enable the prometheus feature to expose metrics:

let metrics = storage.metrics().await?;
let output = metrics.encode_to_string()?;
// Serve `output` on your metrics endpoint

About

Job processing in Rust backed by Redis

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages