Skip to content

Commit f48bd05

Browse files
committed
feat: continue adding argoworkflow jobagent
1 parent 2745258 commit f48bd05

File tree

4 files changed

+356
-1
lines changed

4 files changed

+356
-1
lines changed

apps/api/src/routes/argoworkflow/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export const createArgoWorkflowRouter = (): Router =>
99
Router().post("/webhook", asyncHandler(handleWebhookRequest));
1010

1111
const verifyRequest = (req: Request): boolean => {
12-
const authHeader = req.headers["authorization"]?.toString();
12+
const authHeader = req.headers.authorization?.toString();
1313
if (authHeader == null) return false;
1414
const secret = env.ARGO_WORKFLOW_WEBHOOK_SECRET;
1515
return authHeader === secret;

apps/workspace-engine/svc/controllers/jobdispatch/controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"workspace-engine/pkg/reconcile/postgres"
1818
"workspace-engine/svc/controllers/jobdispatch/jobagents"
1919
"workspace-engine/svc/controllers/jobdispatch/jobagents/argo"
20+
argoworkflow "workspace-engine/svc/controllers/jobdispatch/jobagents/argo-workflow"
2021
"workspace-engine/svc/controllers/jobdispatch/jobagents/github"
2122
"workspace-engine/svc/controllers/jobdispatch/jobagents/terraformcloud"
2223
"workspace-engine/svc/controllers/jobdispatch/jobagents/testrunner"
@@ -117,6 +118,9 @@ func New(workerID string, pgxPool *pgxpool.Pool) *reconcile.Worker {
117118
github.New(&github.GoGitHubWorkflowDispatcher{}, pgSetter),
118119
)
119120
dispatcher.Register(terraformcloud.New(pgSetter))
121+
dispatcher.Register(
122+
argoworkflow.New(&argoworkflow.GoWorkflowSubmitter{}, pgSetter),
123+
)
120124

121125
maxConcurrency := config.GetMaxConcurrency(kind)
122126
log.Debug(
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package argo_workflows
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"regexp"
8+
"strings"
9+
10+
"workspace-engine/pkg/oapi"
11+
"workspace-engine/pkg/templatefuncs"
12+
"workspace-engine/svc/controllers/jobdispatch/jobagents/types"
13+
14+
"github.com/goccy/go-yaml"
15+
"go.opentelemetry.io/otel"
16+
"go.opentelemetry.io/otel/trace"
17+
)
18+
19+
var tracer = otel.Tracer("workspace-engine/jobagents/argo")
20+
21+
// Workflow is a minimal representation of an Argo Workflows Workflow resource.
22+
// We define this locally to avoid importing argo-workflows/v4, which conflicts
23+
// with argo-cd/v2's transitive dependencies (docker/docker vs moby/moby rename).
24+
type Workflow struct {
25+
Kind string `yaml:"kind,omitempty" json:"kind,omitempty"`
26+
APIVersion string `yaml:"apiVersion,omitempty" json:"apiVersion,omitempty"`
27+
Metadata WorkflowMetadata `yaml:"metadata,omitempty" json:"metadata,omitempty"`
28+
Spec interface{} `yaml:"spec,omitempty" json:"spec,omitempty"`
29+
}
30+
31+
type WorkflowMetadata struct {
32+
Name string `yaml:"name,omitempty" json:"name,omitempty"`
33+
Namespace string `yaml:"namespace,omitempty" json:"namespace,omitempty"`
34+
Labels map[string]string `yaml:"labels,omitempty" json:"labels,omitempty"`
35+
}
36+
37+
var _ types.Dispatchable = &ArgoWorkflow{}
38+
39+
type Getter interface {
40+
GetWorkflow(ctx context.Context, name string) (*Workflow, error)
41+
}
42+
43+
// Setter persists job status updates.
44+
type Setter interface {
45+
UpdateJob(
46+
ctx context.Context,
47+
jobID string,
48+
status oapi.JobStatus,
49+
message string,
50+
metadata map[string]string,
51+
) error
52+
}
53+
54+
// WorkflowDeleter deletes an Argo Workflows Workflow resource.
55+
type WorkflowDeleter interface {
56+
DeleteWorkflow(ctx context.Context, serverAddr, apiKey, name string) error
57+
}
58+
59+
// WorkflowSubmitter submits an Argo Workflows Workflow to the server.
60+
type WorkflowSubmitter interface {
61+
SubmitWorkflow(ctx context.Context, serverAddr, apiKey string, wf *Workflow) error
62+
}
63+
64+
type ArgoWorkflow struct {
65+
setter Setter
66+
submitter WorkflowSubmitter
67+
}
68+
69+
func New(submitter WorkflowSubmitter, setter Setter) *ArgoWorkflow {
70+
return &ArgoWorkflow{setter: setter, submitter: submitter}
71+
}
72+
73+
func (a *ArgoWorkflow) Type() string {
74+
return "argo-workflow"
75+
}
76+
77+
func (a *ArgoWorkflow) Dispatch(ctx context.Context, job *oapi.Job) error {
78+
dispatchCtx := job.DispatchContext
79+
jobAgentConfig := dispatchCtx.JobAgentConfig
80+
serverAddr, apiKey, template, err := ParseJobAgentConfig(jobAgentConfig)
81+
if err != nil {
82+
return fmt.Errorf("failed to parse job agent config: %w", err)
83+
}
84+
85+
wf, err := TemplateApplication(dispatchCtx, template)
86+
if err != nil {
87+
return fmt.Errorf("failed to generate workflow from template: %w", err)
88+
}
89+
90+
MakeApplicationK8sCompatible(wf)
91+
92+
go func() {
93+
parentSpanCtx := trace.SpanContextFromContext(ctx)
94+
asyncCtx, span := tracer.Start(context.Background(), "ArgoWorkflow.AsyncDispatch",
95+
trace.WithLinks(trace.Link{SpanContext: parentSpanCtx}),
96+
)
97+
defer span.End()
98+
99+
if err := a.submitter.SubmitWorkflow(asyncCtx, serverAddr, apiKey, wf); err != nil {
100+
_ = a.setter.UpdateJob(asyncCtx, job.Id, oapi.JobStatusFailure,
101+
fmt.Sprintf("failed to submit workflow: %s", err.Error()), nil)
102+
return
103+
}
104+
105+
metadata := BuildArgoLinks(serverAddr, wf)
106+
_ = a.setter.UpdateJob(asyncCtx, job.Id, oapi.JobStatusInProgress, "", metadata)
107+
}()
108+
109+
return nil
110+
}
111+
112+
// ParseJobAgentConfig extracts the required fields from an agent config.
113+
func ParseJobAgentConfig(
114+
config oapi.JobAgentConfig,
115+
) (serverAddr, apiKey, template string, err error) {
116+
serverAddr, ok := config["serverUrl"].(string)
117+
if !ok {
118+
return "", "", "", fmt.Errorf("serverUrl is required")
119+
}
120+
apiKey, ok = config["apiKey"].(string)
121+
if !ok {
122+
return "", "", "", fmt.Errorf("apiKey is required")
123+
}
124+
template, ok = config["template"].(string)
125+
if !ok {
126+
return "", "", "", fmt.Errorf("template is required")
127+
}
128+
if serverAddr == "" || apiKey == "" || template == "" {
129+
return "", "", "", fmt.Errorf("missing required fields in job agent config")
130+
}
131+
return serverAddr, apiKey, template, nil
132+
}
133+
134+
// TemplateApplication renders the Argo Workflows Workflow YAML template using
135+
// the dispatch context variables.
136+
func TemplateApplication(ctx *oapi.DispatchContext, tmpl string) (*Workflow, error) {
137+
t, err := templatefuncs.Parse("argoWorkflowAgentConfig", tmpl)
138+
if err != nil {
139+
return nil, fmt.Errorf("failed to parse template: %w", err)
140+
}
141+
var buf bytes.Buffer
142+
if err := t.Execute(&buf, ctx.Map()); err != nil {
143+
return nil, fmt.Errorf("failed to execute template: %w", err)
144+
}
145+
146+
var workflow Workflow
147+
if err := yaml.Unmarshal(buf.Bytes(), &workflow); err != nil {
148+
return nil, fmt.Errorf("failed to unmarshal workflow: %w", err)
149+
}
150+
return &workflow, nil
151+
}
152+
153+
// MakeApplicationK8sCompatible sanitises the workflow name and label
154+
// values so they conform to Kubernetes naming rules.
155+
func MakeApplicationK8sCompatible(wf *Workflow) {
156+
wf.Metadata.Name = getK8sCompatibleName(wf.Metadata.Name)
157+
if wf.Metadata.Labels != nil {
158+
for key, value := range wf.Metadata.Labels {
159+
wf.Metadata.Labels[key] = getK8sCompatibleName(value)
160+
}
161+
}
162+
}
163+
164+
func getK8sCompatibleName(name string) string {
165+
cleaned := strings.ToLower(name)
166+
k8sInvalidCharsRegex := regexp.MustCompile(`[^a-z0-9-]`)
167+
cleaned = k8sInvalidCharsRegex.ReplaceAllString(cleaned, "-")
168+
169+
if len(cleaned) > 63 {
170+
cleaned = cleaned[:63]
171+
}
172+
cleaned = strings.Trim(cleaned, "-")
173+
if cleaned == "" {
174+
return "default"
175+
}
176+
177+
return cleaned
178+
}
179+
180+
// BuildArgoLinks builds the metadata map with an Argo Workflows URL.
181+
func BuildArgoLinks(serverAddr string, wf *Workflow) map[string]string {
182+
appURL := fmt.Sprintf("%s/workflows/%s/%s", serverAddr, wf.Metadata.Namespace, wf.Metadata.Name)
183+
if !strings.HasPrefix(appURL, "https://") {
184+
appURL = "https://" + appURL
185+
}
186+
return map[string]string{
187+
"ctrlplane/links": fmt.Sprintf(`{"Argo Workflow":"%s"}`, appURL),
188+
}
189+
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package argo_workflows
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/tls"
7+
"encoding/json"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
"strings"
12+
"time"
13+
14+
"os"
15+
16+
"github.com/avast/retry-go"
17+
"github.com/charmbracelet/log"
18+
"github.com/goccy/go-yaml"
19+
20+
argoapiclient "github.com/argoproj/argo-workflows/v3/pkg/apiclient"
21+
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
22+
"sigs.k8s.io/yaml"
23+
)
24+
25+
type submitWorkflowRequest struct {
26+
Workflow *Workflow `json:"workflow"`
27+
}
28+
29+
func submitThing() {
30+
ctx := context.Background()
31+
32+
b, err := os.ReadFile("workflow.yaml")
33+
if err != nil {
34+
panic(err)
35+
}
36+
37+
var wf wfv1.Workflow
38+
if err := yaml.Unmarshal(b, &wf); err != nil {
39+
panic(err)
40+
}
41+
42+
fmt.Printf("kind=%q\n", wf.Kind)
43+
fmt.Printf("apiVersion=%q\n", wf.APIVersion)
44+
fmt.Printf("metadata.name=%q\n", wf.Name)
45+
fmt.Printf("metadata.generateName=%q\n", wf.GenerateName)
46+
fmt.Printf("metadata.namespace=%q\n", wf.Namespace)
47+
48+
ctx, apiClient, err := argoapiclient.NewClientFromOptsWithContext(ctx, argoapiclient.Opts{
49+
ArgoServerOpts: argoapiclient.ArgoServerOpts{
50+
URL: "localhost:2746", // host:port only
51+
Secure: true, // HTTPS
52+
HTTP1: true, // avoid gRPC/HTTP2 issues
53+
InsecureSkipVerify: true,
54+
},
55+
AuthSupplier: func() string {
56+
return ""
57+
},
58+
})
59+
if err != nil {
60+
panic(err)
61+
}
62+
wfClient := apiClient.NewWorkflowServiceClient(ctx)
63+
64+
}
65+
66+
// GoWorkflowSubmitter is the production implementation of WorkflowSubmitter
67+
// that calls the Argo Workflows REST API.
68+
type GoWorkflowSubmitter struct{}
69+
70+
func (s *GoWorkflowSubmitter) SubmitWorkflow(
71+
ctx context.Context,
72+
serverAddr, apiKey string,
73+
wf *Workflow,
74+
) error {
75+
namespace := wf.Metadata.Namespace
76+
if namespace == "" {
77+
namespace = "default"
78+
}
79+
80+
url := fmt.Sprintf(
81+
"%s/api/v1/workflows/%s",
82+
strings.TrimRight(serverAddr, "/"),
83+
namespace,
84+
)
85+
jsonBody, err := yaml.YAMLToJSON(template)
86+
if err != nil {
87+
panic(err)
88+
}
89+
90+
client := &http.Client{
91+
Timeout: 20 * time.Second,
92+
Transport: &http.Transport{
93+
TLSClientConfig: &tls.Config{
94+
InsecureSkipVerify: true, // local dev only
95+
},
96+
},
97+
}
98+
99+
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader())
100+
if err != nil {
101+
panic(err)
102+
}
103+
req.Header.Set("Content-Type", "application/json")
104+
if token != "" {
105+
req.Header.Set("Authorization", "Bearer "+token)
106+
}
107+
108+
resp, err := client.Do(req)
109+
if err != nil {
110+
panic(err)
111+
}
112+
defer resp.Body.Close()
113+
114+
respBody, _ := io.ReadAll(resp.Body)
115+
116+
return retry.Do(
117+
func() error {
118+
body, err := json.Marshal(submitWorkflowRequest{Workflow: wf})
119+
if err != nil {
120+
return retry.Unrecoverable(fmt.Errorf("marshal workflow: %w", err))
121+
}
122+
123+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
124+
if err != nil {
125+
return retry.Unrecoverable(fmt.Errorf("create request: %w", err))
126+
}
127+
req.Header.Set("Content-Type", "application/json")
128+
req.Header.Set("Authorization", "Bearer "+apiKey)
129+
130+
resp, err := http.DefaultClient.Do(req)
131+
if err != nil {
132+
return fmt.Errorf("submit workflow: %w", err)
133+
}
134+
defer resp.Body.Close()
135+
136+
if resp.StatusCode >= 300 {
137+
respBody, _ := io.ReadAll(resp.Body)
138+
errMsg := fmt.Sprintf("submit workflow: status %d: %s", resp.StatusCode, string(respBody))
139+
if isRetryableStatusCode(resp.StatusCode) {
140+
return fmt.Errorf("%s", errMsg)
141+
}
142+
return retry.Unrecoverable(fmt.Errorf("%s", errMsg))
143+
}
144+
145+
return nil
146+
},
147+
retry.Attempts(5),
148+
retry.Delay(1*time.Second),
149+
retry.MaxDelay(10*time.Second),
150+
retry.DelayType(retry.BackOffDelay),
151+
retry.OnRetry(func(n uint, err error) {
152+
log.Warn("Retrying Argo Workflow submission",
153+
"attempt", n+1,
154+
"error", err)
155+
}),
156+
retry.Context(ctx),
157+
)
158+
}
159+
160+
func isRetryableStatusCode(code int) bool {
161+
return code == 502 || code == 503 || code == 504
162+
}

0 commit comments

Comments
 (0)