Skip to content

zoobz-io/flume

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Flume

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

A dynamic pipeline factory for pipz that enables schema-driven pipeline construction with hot-reloading capabilities.

Logic in Code, Structure in Schema

// 1. Register components
factory := flume.New[Order]()
factory.Add(
    pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
        if o.Total <= 0 {
            return o, fmt.Errorf("invalid total")
        }
        return o, nil
    }),
    pipz.Apply("apply-discount", func(ctx context.Context, o Order) (Order, error) {
        o.Total *= 0.9 // 10% discount
        return o, nil
    }),
)
factory.AddPredicate(flume.Predicate[Order]{
    Name:      "is-premium",
    Predicate: func(ctx context.Context, o Order) bool { return o.Tier == "premium" },
})

// 2. Build from schema
pipeline, _ := factory.BuildFromYAML(`
type: sequence
children:
  - ref: validate
  - type: filter
    predicate: is-premium
    then:
      ref: apply-discount
`)

// 3. Process data
result, err := pipeline.Process(ctx, order)

Installation

Requires Go 1.24+

go get github.com/zoobz-io/flume

Quick Start

package main

import (
    "context"
    "fmt"
    "github.com/zoobz-io/flume"
    "github.com/zoobz-io/pipz"
)

type Order struct {
    ID    string
    Total float64
}

func (o Order) Clone() Order { return Order{ID: o.ID, Total: o.Total} }

func main() {
    factory := flume.New[Order]()

    // Register processors
    factory.Add(
        pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
            if o.Total <= 0 {
                return o, fmt.Errorf("invalid total")
            }
            return o, nil
        }),
        pipz.Transform("enrich", func(ctx context.Context, o Order) Order {
            o.ID = "ORD-" + o.ID
            return o
        }),
    )

    // Build pipeline from YAML
    pipeline, err := factory.BuildFromYAML(`
type: sequence
children:
  - ref: validate
  - ref: enrich
`)
    if err != nil {
        panic(err)
    }

    // Process
    result, err := pipeline.Process(context.Background(), Order{ID: "123", Total: 99.99})
    fmt.Printf("Result: %+v, Error: %v\n", result, err)
}

Capabilities

Feature Description Docs
Schema-Driven Construction Define pipelines in YAML/JSON, build at runtime Schema Format
Hot Reloading Update pipeline behavior without restarts Hot Reloading
Lock-Free Bindings Concurrent-safe pipeline access with auto-sync API Reference
Comprehensive Validation Schema validation with detailed error messages Schema Design
14 Connector Types Sequence, filter, retry, timeout, circuit breaker, and more Connector Types
Event Emission Built-in observability signals Observability

Why Flume?

  • Schema-driven: Define pipelines in YAML/JSON, not code
  • Hot-reloadable: Update pipeline behavior without restarts
  • Type-safe: Full generics support with compile-time safety
  • Composable: Build complex flows from simple, tested components
  • Observable: Built-in capitan event emission
  • Validated: Comprehensive schema validation with detailed error messages

Configuration as Code

Flume enables a pattern: define once, reconfigure anywhere.

Register your processing components at startup. Define pipeline structure in configuration files. Update behavior at runtime without redeployment.

// Components registered once
factory.Add(validate, enrich, notify, discount)

// Structure defined in config
factory.SetSchema("checkout", loadYAML("pipelines/checkout.yaml"))

// Behavior changes without restart
factory.SetSchema("checkout", loadYAML("pipelines/checkout-v2.yaml"))

The code defines what processors do. Configuration defines how they combine.

Hot Reloading

Update pipelines at runtime without restarts:

// Register a named schema
factory.SetSchema("order-pipeline", schema)

// Create a binding with auto-sync enabled
binding, _ := factory.Bind(pipelineID, "order-pipeline", flume.WithAutoSync())

// Process requests (lock-free)
result, _ := binding.Process(ctx, order)

// Update schema - all auto-sync bindings rebuild automatically
factory.SetSchema("order-pipeline", newSchema)

Documentation

Full documentation is available in the docs/ directory:

Learn

Guides

Cookbook

Reference

Contributing

See CONTRIBUTING.md for development guidelines.

License

MIT License - see LICENSE file.

About

A dynamic pipeline factory for pipz with hot-reloading capabilities

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Contributors