Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ go.work.sum
# Stale branches / forks
stale

# Taskharbor binaries
th

# .gitignore
36 changes: 36 additions & 0 deletions cmd/taskharbor/internal/app/admin_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package app

import (
"context"
"fmt"

"github.com/ARJ2211/taskharbor/cmd/taskharbor/internal/backend"
drv "github.com/ARJ2211/taskharbor/taskharbor/driver"
)

type adminHandle struct {
Admin drv.Admin
Close func() error
}

func openAdmin(ctx context.Context, g GlobalFlags) (*adminHandle, error) {
h, err := backend.Open(ctx, backend.Config{
Driver: g.Driver,
PostgresDSN: g.PostgresDSN,
RedisAddr: g.RedisAddr,
})
if err != nil {
return nil, err
}

a, ok := h.Driver.(drv.Admin)
if !ok {
_ = h.Close()
return nil, fmt.Errorf("driver %q does not support admin operations", g.Driver)
}

return &adminHandle{
Admin: a,
Close: h.Close,
}, nil
}
195 changes: 195 additions & 0 deletions cmd/taskharbor/internal/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package app

import (
"flag"
"fmt"
"io"
"os"
"strings"

th "github.com/ARJ2211/taskharbor/taskharbor"
"github.com/ARJ2211/taskharbor/taskharbor/driver"
)

type GlobalFlags struct {
Driver string
Queue string
JSON bool
Verbose bool

PostgresDSN string
RedisAddr string
}

func envOr(key, fallback string) string {
if v := strings.TrimSpace(os.Getenv(key)); v != "" {
return v
}
return fallback
}

func Run(argv []string, stdout, stderr io.Writer) int {
var g GlobalFlags
var help bool
var h bool

fs := flag.NewFlagSet("taskharbor", flag.ContinueOnError)
fs.SetOutput(io.Discard)

driverList := ""
for _, d := range driver.ImplementedDrivers {
driverList += d + "|"
}
driverList = driverList[:len(driverList)-1]

fs.StringVar(&g.Driver, "driver", "memory", fmt.Sprintf("drivers: %s", driverList))
fs.StringVar(&g.Queue, "queue", th.DefaultQueue, "queue name")
fs.BoolVar(&g.JSON, "json", false, "output JSON")
fs.BoolVar(&g.Verbose, "verbose", false, "verbose logs")
fs.BoolVar(&help, "help", false, "show help")
fs.BoolVar(&h, "h", false, "show help")

fs.StringVar(&g.PostgresDSN, "dsn", envOr("TH_PG_DSN", envOr("TH_POSTGRES_DSN", "")), "postgres DSN (for --driver postgres)")
fs.StringVar(&g.RedisAddr, "redis-addr", envOr("TH_REDIS_ADDR", ""), "redis addr host:port (for --driver redis)")

if err := fs.Parse(argv); err != nil {
fmt.Println(stderr, "error: ", err)
printRootUsage(stderr)
return 2
}

args := fs.Args()
if help || h || len(argv) == 0 {
printRootUsage(stdout)
return 0
}

cmd := args[0]
cmdArgs := args[1:]

switch cmd {
case "worker":
return runWorker(g, cmdArgs, stdout, stderr)
case "enqueue":
return runEnqueue(g, cmdArgs, stdout, stderr)
case "list":
return runList(g, cmdArgs, stdout, stderr)
case "inspect":
return runInspect(g, cmdArgs, stdout, stderr)
case "dlq":
return runDLQ(g, cmdArgs, stdout, stderr)
case "job":
return runJob(g, cmdArgs, stdout, stderr)
case "help":
printRootUsage(stdout)
return 0
default:
fmt.Fprintln(stderr, "error: unknown command:", cmd)
printRootUsage(stderr)
return 2
}
}

func runWorker(g GlobalFlags, argv []string, stdout, stderr io.Writer) int {
var help bool
var h bool

fs := flag.NewFlagSet("taskharbor worker", flag.ContinueOnError)
fs.SetOutput(io.Discard)
fs.BoolVar(&help, "help", false, "show help")
fs.BoolVar(&h, "h", false, "show help")

if err := fs.Parse(argv); err != nil {
fmt.Fprintln(stderr, "error:", err)
printWorkerUsage(stderr)
return 2
}

args := fs.Args()
if help || h || len(args) == 0 {
printWorkerUsage(stdout)
return 0
}

sub := args[0]
subArgs := args[1:]

switch sub {
case "run":
return runWorkerRun(g, subArgs, stdout, stderr)
default:
fmt.Fprintln(stderr, "error: unknown subcommand: worker", sub)
printWorkerUsage(stderr)
return 2
}
}

func runDLQ(g GlobalFlags, argv []string, stdout, stderr io.Writer) int {
var help bool
var h bool

fs := flag.NewFlagSet("taskharbor dlq", flag.ContinueOnError)
fs.SetOutput(io.Discard)
fs.BoolVar(&help, "help", false, "show help")
fs.BoolVar(&h, "h", false, "show help")

if err := fs.Parse(argv); err != nil {
fmt.Fprintln(stderr, "error:", err)
printDLQUsage(stderr)
return 2
}

args := fs.Args()
if help || h || len(args) == 0 {
printDLQUsage(stdout)
return 0
}

sub := args[0]
subArgs := args[1:]

switch sub {
case "list":
return runDLQList(g, subArgs, stdout, stderr)
case "requeue":
return runDLQRequeue(g, subArgs, stdout, stderr)
default:
fmt.Fprintln(stderr, "error: unknown subcommand: dlq", sub)
printDLQUsage(stderr)
return 2
}
}

func runJob(g GlobalFlags, argv []string, stdout, stderr io.Writer) int {
var help bool
var h bool

fs := flag.NewFlagSet("taskharbor job", flag.ContinueOnError)
fs.SetOutput(io.Discard)
fs.BoolVar(&help, "help", false, "show help")
fs.BoolVar(&h, "h", false, "show help")

if err := fs.Parse(argv); err != nil {
fmt.Fprintln(stderr, "error:", err)
printJobUsage(stderr)
return 2
}

args := fs.Args()
if help || h || len(args) == 0 {
printJobUsage(stdout)
return 0
}

sub := args[0]
subArgs := args[1:]

switch sub {
case "retry":
return runJobRetry(g, subArgs, stdout, stderr)
default:
fmt.Fprintln(stderr, "error: unknown subcommand: job", sub)
printJobUsage(stderr)
return 2
}
}
78 changes: 78 additions & 0 deletions cmd/taskharbor/internal/app/app_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package app

import (
"bytes"
"strings"
"testing"
)

func TestRootHelp(t *testing.T) {
var out, err bytes.Buffer
code := Run([]string{"--help"}, &out, &err)
if code != 0 {
t.Fatalf("expected 0, got %d (stderr=%q)", code, err.String())
}
if !strings.Contains(out.String(), "worker run") {
t.Fatalf("expected help to mention worker run, got: %q", out.String())
}
}

func TestWorkerRunHelp(t *testing.T) {
var out, err bytes.Buffer
code := Run([]string{"worker", "run", "--help"}, &out, &err)
if code != 0 {
t.Fatalf("expected 0, got %d (stderr=%q)", code, err.String())
}
if !strings.Contains(out.String(), "worker run") {
t.Fatalf("expected worker run usage, got: %q", out.String())
}
}

func TestUnknownCommand(t *testing.T) {
var out, err bytes.Buffer
code := Run([]string{"nope"}, &out, &err)
if code == 0 {
t.Fatalf("expected non-zero, got %d", code)
}
}

func TestEnqueueMinimal(t *testing.T) {
var out, err bytes.Buffer
code := Run([]string{"enqueue", "--type", "echo", "--payload", "hi"}, &out, &err)
if code != 0 {
t.Fatalf("expected 0, got %d (stderr=%q)", code, err.String())
}
if strings.TrimSpace(out.String()) == "" {
t.Fatalf("expected a job id, got empty output")
}
}

func TestListAndInspectAfterEnqueue(t *testing.T) {
var out1, err1 bytes.Buffer
code := Run([]string{"enqueue", "--type", "echo", "--payload", "hi"}, &out1, &err1)
if code != 0 {
t.Fatalf("enqueue expected 0, got %d (stderr=%q)", code, err1.String())
}
id := strings.TrimSpace(out1.String())
if id == "" {
t.Fatalf("expected job id")
}

var out2, err2 bytes.Buffer
code = Run([]string{"list"}, &out2, &err2)
if code != 0 {
t.Fatalf("list expected 0, got %d (stderr=%q)", code, err2.String())
}
if !strings.Contains(out2.String(), id) {
t.Fatalf("expected list to contain id %q, got: %q", id, out2.String())
}

var out3, err3 bytes.Buffer
code = Run([]string{"inspect", id}, &out3, &err3)
if code != 0 {
t.Fatalf("inspect expected 0, got %d (stderr=%q)", code, err3.String())
}
if !strings.Contains(out3.String(), "id: "+id) {
t.Fatalf("expected inspect output to include id, got: %q", out3.String())
}
}
Loading