Skip to content

openjobspec/ojs-go-sdk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

123 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

ojs-go-sdk

Go Reference Go Version License CI

The official Go SDK for Open Job Spec (OJS) -- a vendor-neutral, language-agnostic specification for background job processing.

🚀 Try it now: Open in Playground · Run on CodeSandbox · Docker Quickstart

Features

  • Client: Enqueue jobs, batch operations, workflow management, queue control
  • Worker: Process jobs with configurable concurrency, middleware, and graceful shutdown
  • Workflows: Chain (sequential), Group (parallel), Batch (parallel with callbacks)
  • Middleware: Composable middleware chain for cross-cutting concerns
  • Structured errors: Full errors.Is/errors.As support with OJS error codes
  • Zero dependencies: Only the Go standard library

Architecture

Client → Server Flow

sequenceDiagram
    participant App as Application
    participant Client as ojs.Client
    participant Transport as HTTP Transport
    participant Server as OJS Server

    App->>Client: Enqueue(ctx, "email.send", args)
    Client->>Client: Validate job type & args
    Client->>Transport: POST /ojs/v1/jobs
    Transport->>Server: HTTP request (application/openjobspec+json)
    Server-->>Transport: 201 Created {job}
    Transport-->>Client: *Job
    Client-->>App: *Job, nil
Loading

Worker Lifecycle

stateDiagram-v2
    [*] --> Running: Start(ctx)
    Running --> Running: Fetch → Process → ACK/NACK
    Running --> Quiet: Server directive
    Quiet --> Running: Server directive
    Quiet --> Terminate: Server directive / ctx.Done()
    Running --> Terminate: ctx.Done()
    Terminate --> [*]: Grace period expires or all jobs complete
Loading

Middleware Chain (Onion Model)

flowchart LR
    Request([Job Fetched]) --> MW1[Middleware 1\nbefore]
    MW1 --> MW2[Middleware 2\nbefore]
    MW2 --> Handler[Handler]
    Handler --> MW2A[Middleware 2\nafter]
    MW2A --> MW1A[Middleware 1\nafter]
    MW1A --> Result([ACK / NACK])
Loading

Installation

go get github.com/openjobspec/ojs-go-sdk

Quick Start

Enqueue a Job

package main

import (
    "context"
    "log"
    "time"

    ojs "github.com/openjobspec/ojs-go-sdk"
)

func main() {
    client, err := ojs.NewClient("http://localhost:8080")
    if err != nil {
        log.Fatal(err)
    }

    job, err := client.Enqueue(context.Background(), "email.send",
        ojs.Args{"to": "user@example.com"},
        ojs.WithQueue("email"),
        ojs.WithRetry(ojs.RetryPolicy{MaxAttempts: 5}),
        ojs.WithDelay(5 * time.Minute),
    )
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Enqueued: %s", job.ID)
}

Process Jobs

package main

import (
    "context"
    "log"
    "os/signal"
    "syscall"
    "time"

    ojs "github.com/openjobspec/ojs-go-sdk"
)

func main() {
    worker := ojs.NewWorker("http://localhost:8080",
        ojs.WithQueues("default", "email"),
        ojs.WithConcurrency(10),
    )

    worker.Register("email.send", func(ctx ojs.JobContext) error {
        to := ctx.Job.Args["to"].(string)
        log.Printf("Sending email to %s", to)
        ctx.SetResult(map[string]any{"sent": true})
        return nil
    })

    // Add middleware.
    worker.Use(func(ctx ojs.JobContext, next ojs.HandlerFunc) error {
        start := time.Now()
        err := next(ctx)
        log.Printf("%s took %s", ctx.Job.Type, time.Since(start))
        return err
    })

    // Graceful shutdown.
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM)
    defer cancel()

    if err := worker.Start(ctx); err != nil {
        log.Fatal(err)
    }
}

Typed Handlers (Generics)

Use RegisterTyped to get auto-parsed, type-safe arguments — no more manual type assertions:

type EmailArgs struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
}

ojs.RegisterTyped(worker, "email.send", func(ctx ojs.JobContext, args EmailArgs) error {
    log.Printf("Sending to %s: %s", args.To, args.Subject)
    return nil
})

If the args can't be parsed into the struct, the job is automatically NACKed as non-retryable.

Pre-built Middleware

The middleware package provides ready-to-use middleware for common cross-cutting concerns:

import "github.com/openjobspec/ojs-go-sdk/middleware"

// Structured logging via log/slog.
worker.Use(middleware.Logging(slog.Default()))

// Panic recovery — converts panics to errors.
worker.Use(middleware.Recovery(slog.Default()))

// Metrics — implement the MetricsRecorder interface for your backend.
worker.Use(middleware.Metrics(myRecorder))

The MetricsRecorder interface is backend-agnostic:

type MetricsRecorder interface {
    JobStarted(jobType, queue string)
    JobCompleted(jobType, queue string, duration time.Duration)
    JobFailed(jobType, queue string, duration time.Duration)
}

OpenTelemetry

The middleware/otel package provides native OpenTelemetry tracing and metrics (separate Go module to keep the core dependency-free):

go get github.com/openjobspec/ojs-go-sdk/middleware/otel
import ojsotel "github.com/openjobspec/ojs-go-sdk/middleware/otel"

// Distributed tracing — one span per job with type, ID, queue, attempt.
worker.UseNamed("tracing", ojsotel.Tracing(
    ojsotel.WithTracerProvider(tp),
))

// Metrics — counters (started/completed/failed) and duration histogram.
worker.UseNamed("metrics", ojsotel.Metrics(
    ojsotel.WithMeterProvider(mp),
))

Batch Enqueue

jobs, err := client.EnqueueBatch(ctx, []ojs.JobRequest{
    {Type: "email.send", Args: ojs.Args{"to": "a@example.com"}},
    {Type: "email.send", Args: ojs.Args{"to": "b@example.com"}},
})

Workflows

OJS provides three workflow primitives for composing multi-step job pipelines:

Chain — sequential steps, each starts after the previous completes:

graph LR
    A[Step 1: Fetch] --> B[Step 2: Transform] --> C[Step 3: Load]
Loading

Group — parallel fan-out/fan-in, all jobs run simultaneously:

graph TD
    S[Start] --> A[Export CSV]
    S --> B[Export PDF]
    A --> J[All Complete]
    B --> J
Loading

Batch — parallel execution with completion callbacks:

graph TD
    S[Batch] --> A[Job 1]
    S --> B[Job 2]
    S --> C[Job N]
    A --> CB{All Done?}
    B --> CB
    C --> CB
    CB -->|on_complete| D[Report]
    CB -->|on_failure| E[Alert]
Loading
// Chain: sequential steps.
wf, err := client.CreateWorkflow(ctx, ojs.Chain(
    ojs.Step{Type: "data.fetch", Args: ojs.Args{"url": "..."}},
    ojs.Step{Type: "data.transform", Args: ojs.Args{"format": "csv"}},
))

// Group: parallel jobs.
wf, err := client.CreateWorkflow(ctx, ojs.Group(
    ojs.Step{Type: "export.csv", Args: ojs.Args{"id": "rpt_1"}},
    ojs.Step{Type: "export.pdf", Args: ojs.Args{"id": "rpt_1"}},
))

// Batch: parallel with callbacks.
wf, err := client.CreateWorkflow(ctx, ojs.Batch(
    ojs.BatchCallbacks{
        OnComplete: &ojs.Step{Type: "batch.report", Args: ojs.Args{}},
        OnFailure:  &ojs.Step{Type: "batch.alert", Args: ojs.Args{}},
    },
    ojs.Step{Type: "email.send", Args: ojs.Args{"to": "user1@example.com"}},
    ojs.Step{Type: "email.send", Args: ojs.Args{"to": "user2@example.com"}},
))

Error Handling

The SDK provides structured errors with errors.Is/errors.As support:

job, err := client.Enqueue(ctx, "email.send", ojs.Args{})

if errors.Is(err, ojs.ErrDuplicate) {
    log.Println("Job already exists")
}

if errors.Is(err, ojs.ErrRateLimited) {
    log.Println("Rate limited, retry later")
}

if errors.Is(err, ojs.ErrNotFound) {
    log.Println("Resource not found")
}

// Extract full error details.
var ojsErr *ojs.Error
if errors.As(err, &ojsErr) {
    log.Printf("Code: %s, Message: %s, Retryable: %v",
        ojsErr.Code, ojsErr.Message, ojsErr.Retryable)
}

// Check if an error is retryable.
if ojs.IsRetryable(err) {
    // Retry the operation.
}

Non-Retryable Handler Errors

By default, handler errors are retryable. Wrap an error with NonRetryable to tell the server the job should be discarded instead of re-queued:

worker.Register("email.send", func(ctx ojs.JobContext) error {
    to, _ := ctx.Job.Args["to"].(string)
    if !isValidEmail(to) {
        return ojs.NonRetryable(fmt.Errorf("invalid email: %s", to))
    }
    return sendEmail(to)
})

Configuration

Client Options

Option Description
WithHTTPClient(c) Custom *http.Client
WithAuthToken(t) Bearer token authentication
WithHeader(k, v) Custom request header

Enqueue Options

Option Description
WithQueue(q) Target queue (default: "default")
WithPriority(p) Job priority
WithTimeout(d) Execution timeout
WithDelay(d) Delay before execution
WithScheduledAt(t) Execute at specific time
WithRetry(p) Custom retry policy
WithUnique(p) Deduplication policy
WithTags(t...) Tags for filtering
WithMeta(m) Metadata key-value pairs

Worker Options

Option Description
WithQueues(q...) Queue subscriptions (priority order)
WithConcurrency(n) Max parallel jobs (default: 10)
WithGracePeriod(d) Shutdown grace period (default: 25s)
WithHeartbeatInterval(d) Heartbeat interval (default: 5s)
WithLabels(l...) Worker labels
WithPollInterval(d) Fetch poll interval (default: 1s)
WithLogger(l) Structured logger (*slog.Logger) for operational events

Real-Time Subscriptions

Subscribe to job state changes via Server-Sent Events (SSE):

// Subscribe to all events
events, cancel := client.Subscribe(ctx)
defer cancel()

for event := range events {
    fmt.Printf("Job %s: %s → %s\n", event.JobID, event.From, event.To)
}

// Subscribe to a specific job
events, cancel := client.SubscribeJob(ctx, jobID)

// Subscribe to a queue
events, cancel := client.SubscribeQueue(ctx, "emails")

OJS Spec Conformance

This SDK implements the OJS v1.0 specification:

  • Layer 1 (Core): Job envelope, lifecycle states, retry policies, unique jobs, workflows, middleware
  • Layer 2 (Wire Format): JSON encoding with application/openjobspec+json content type
  • Layer 3 (HTTP Binding): Full HTTP REST protocol binding (PUSH, FETCH, ACK, FAIL, BEAT, CANCEL, INFO)
  • Worker Protocol: Three-state lifecycle (running/quiet/terminate), heartbeat, graceful shutdown

License

Apache 2.0 -- see LICENSE.

About

The official Go SDK for Open Job Spec (OJS), a language-agnostic standard for background job processing.

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Sponsor this project

Packages

 
 
 

Contributors