diff --git a/.yamllint.yaml b/.yamllint.yaml index e29c4fbcb..c8ca7a182 100644 --- a/.yamllint.yaml +++ b/.yamllint.yaml @@ -4,6 +4,7 @@ ignore: - components/etcdbackup/templates/ - charts/argocd-understack/templates/ - charts/nautobot-api-tokens/templates/ + - charts/nautobot-job-queues/templates/ - charts/site-workflows/templates/ - charts/undersync/templates/ diff --git a/ansible/nautobot-post-deploy.yaml b/ansible/nautobot-post-deploy.yaml index 4fbd7331e..eaabdc21f 100644 --- a/ansible/nautobot-post-deploy.yaml +++ b/ansible/nautobot-post-deploy.yaml @@ -41,7 +41,6 @@ roles: - role: nautobot_permissions - - role: jobs - role: secrets - role: git_repos - role: nautobot_roles diff --git a/charts/argocd-understack/templates/application-nautobot-api-tokens.yaml b/charts/argocd-understack/templates/application-nautobot-api-tokens.yaml index 5caf4418a..6daf4282f 100644 --- a/charts/argocd-understack/templates/application-nautobot-api-tokens.yaml +++ b/charts/argocd-understack/templates/application-nautobot-api-tokens.yaml @@ -8,6 +8,7 @@ metadata: - resources-finalizer.argocd.argoproj.io annotations: argocd.argoproj.io/compare-options: ServerSideDiff=true,IncludeMutationWebhook=true + argocd.argoproj.io/sync-wave: "1" {{- include "understack.appLabelsBlock" $ | nindent 2 }} spec: destination: diff --git a/charts/argocd-understack/templates/application-nautobot-job-queues.yaml b/charts/argocd-understack/templates/application-nautobot-job-queues.yaml new file mode 100644 index 000000000..73b1f8430 --- /dev/null +++ b/charts/argocd-understack/templates/application-nautobot-job-queues.yaml @@ -0,0 +1,38 @@ +{{- if eq (include "understack.isEnabled" (list $.Values.global "nautobot_job_queues")) "true" }} +--- +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: {{ printf "%s-%s" $.Release.Name "nautobot-job-queues" }} + finalizers: + - resources-finalizer.argocd.argoproj.io + annotations: + argocd.argoproj.io/compare-options: ServerSideDiff=true,IncludeMutationWebhook=true + argocd.argoproj.io/sync-wave: "2" +{{- include "understack.appLabelsBlock" $ | nindent 2 }} +spec: + destination: + namespace: nautobot + server: {{ $.Values.cluster_server }} + project: understack + sources: + - repoURL: {{ include "understack.understack_url" $ }} + targetRevision: {{ include "understack.understack_ref" $ }} + path: charts/nautobot-job-queues + helm: + ignoreMissingValueFiles: true + valueFiles: + - $deploy/{{ include "understack.deploy_path" $ }}/nautobot-job-queues/values.yaml + - path: {{ include "understack.deploy_path" $ }}/nautobot-job-queues + ref: deploy + repoURL: {{ include "understack.deploy_url" $ }} + targetRevision: {{ include "understack.deploy_ref" $ }} + syncPolicy: + automated: + prune: true + selfHeal: true + syncOptions: + - ServerSideApply=true + - RespectIgnoreDifferences=true + - ApplyOutOfSyncOnly=true +{{- end }} diff --git a/charts/argocd-understack/templates/application-nautobot.yaml b/charts/argocd-understack/templates/application-nautobot.yaml index b65de9928..5d036dda9 100644 --- a/charts/argocd-understack/templates/application-nautobot.yaml +++ b/charts/argocd-understack/templates/application-nautobot.yaml @@ -8,6 +8,7 @@ metadata: - resources-finalizer.argocd.argoproj.io annotations: argocd.argoproj.io/compare-options: ServerSideDiff=true,IncludeMutationWebhook=true + argocd.argoproj.io/sync-wave: "0" {{- include "understack.appLabelsBlock" $ | nindent 2 }} spec: destination: diff --git a/charts/argocd-understack/values.yaml b/charts/argocd-understack/values.yaml index 7cd752312..2f16bbce0 100644 --- a/charts/argocd-understack/values.yaml +++ b/charts/argocd-understack/values.yaml @@ -154,6 +154,12 @@ global: # @default -- false enabled: false + # -- Nautobot Celery JobQueue bootstrap jobs + nautobot_job_queues: + # -- Enable/disable deploying + # @default -- false + enabled: false + # -- Nautobot Operator for Kubernetes nautobotop: # -- Enable/disable deploying Nautobot Operator diff --git a/charts/nautobot-job-queues/Chart.yaml b/charts/nautobot-job-queues/Chart.yaml new file mode 100644 index 000000000..27965076d --- /dev/null +++ b/charts/nautobot-job-queues/Chart.yaml @@ -0,0 +1,12 @@ +apiVersion: v2 +name: nautobot-job-queues +description: Ensure Nautobot Celery JobQueue records from Helm values + +type: application + +version: 0.1.0 +# renovate: datasource=docker depName=networktocode/nautobot versioning=semver +appVersion: "3.0.7" + +maintainers: + - name: rackerlabs diff --git a/charts/nautobot-job-queues/ci/example.yaml b/charts/nautobot-job-queues/ci/example.yaml new file mode 100644 index 000000000..d177fc1ce --- /dev/null +++ b/charts/nautobot-job-queues/ci/example.yaml @@ -0,0 +1,22 @@ +queues: + - site-dc + +jobs: + enable: + groupings: + - Rackspace + names: + - Backup Configurations + - Deploy Config Plan (Job Button Receiver) + - Deploy Config Plans + - Generate Intended Configurations + - Perform Configuration Compliance + queueAssignments: + - names: + - Backup Configurations + - Deploy Config Plan (Job Button Receiver) + - Deploy Config Plans + - Generate Intended Configurations + - Perform Configuration Compliance + queues: + - site-dc diff --git a/charts/nautobot-job-queues/templates/_helpers.tpl b/charts/nautobot-job-queues/templates/_helpers.tpl new file mode 100644 index 000000000..0d81e0b48 --- /dev/null +++ b/charts/nautobot-job-queues/templates/_helpers.tpl @@ -0,0 +1,49 @@ +{{/* Expand the name of the chart. */}} +{{- define "nautobot-job-queues.name" -}} +{{- .Chart.Name | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* Create a default fully qualified app name. */}} +{{- define "nautobot-job-queues.fullname" -}} +{{- if contains .Chart.Name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name .Chart.Name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} + +{{/* Chart label value. */}} +{{- define "nautobot-job-queues.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* Common labels. */}} +{{- define "nautobot-job-queues.labels" -}} +helm.sh/chart: {{ include "nautobot-job-queues.chart" . }} +{{ include "nautobot-job-queues.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* Selector labels. */}} +{{- define "nautobot-job-queues.selectorLabels" -}} +app.kubernetes.io/name: {{ include "nautobot-job-queues.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* Script config map name. */}} +{{- define "nautobot-job-queues.scriptConfigMapName" -}} +{{- printf "%s-script" (include "nautobot-job-queues.fullname" .) | trunc 63 | trimSuffix "-" -}} +{{- end }} + +{{/* Desired JobQueue config map name. */}} +{{- define "nautobot-job-queues.desiredConfigMapName" -}} +{{- printf "%s-desired" (include "nautobot-job-queues.fullname" .) | trunc 63 | trimSuffix "-" -}} +{{- end }} + +{{/* Ensure job name. */}} +{{- define "nautobot-job-queues.jobName" -}} +{{- printf "%s-ensure" (include "nautobot-job-queues.fullname" .) | trunc 63 | trimSuffix "-" -}} +{{- end }} diff --git a/charts/nautobot-job-queues/templates/configmap-desired-queues.yaml b/charts/nautobot-job-queues/templates/configmap-desired-queues.yaml new file mode 100644 index 000000000..aa6ed149d --- /dev/null +++ b/charts/nautobot-job-queues/templates/configmap-desired-queues.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "nautobot-job-queues.desiredConfigMapName" . }} + labels: + {{- include "nautobot-job-queues.labels" . | nindent 4 }} +data: + job-queues.json: | + {{ .Values.queues | toJson }} + jobs.json: | + {{ .Values.jobs | toJson }} diff --git a/charts/nautobot-job-queues/templates/configmap-script.yaml b/charts/nautobot-job-queues/templates/configmap-script.yaml new file mode 100644 index 000000000..c34658465 --- /dev/null +++ b/charts/nautobot-job-queues/templates/configmap-script.yaml @@ -0,0 +1,563 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "nautobot-job-queues.scriptConfigMapName" . }} + labels: + {{- include "nautobot-job-queues.labels" . | nindent 4 }} +data: + nautobot_api.py: | + import json + import os + from urllib import error + from urllib import parse + from urllib import request + + + def object_id(value): + if isinstance(value, dict): + return value.get("id") or value.get("pk") + return value + + + def object_name(value): + if isinstance(value, str): + return value + if isinstance(value, dict): + return value.get("name") + return None + + + class NautobotAPI: + def __init__(self): + self.base_url = os.environ["NAUTOBOT_URL"].rstrip("/") + self.headers = { + "Accept": "application/json", + "Authorization": f"Token {os.environ['NAUTOBOT_TOKEN']}", + "Content-Type": "application/json", + } + + def url(self, path_or_url, params=None): + if path_or_url.startswith("http://") or path_or_url.startswith("https://"): + url = path_or_url + else: + url = f"{self.base_url}{path_or_url}" + + if not params: + return url + + separator = "&" if "?" in url else "?" + return f"{url}{separator}{parse.urlencode(params)}" + + def request(self, method, path_or_url, body=None, expected=(200,)): + payload = None + if body is not None: + payload = json.dumps(body).encode("utf-8") + + req = request.Request( + self.url(path_or_url), + data=payload, + headers=self.headers, + method=method, + ) + + try: + with request.urlopen(req, timeout=30) as response: + response_body = response.read() + status = response.getcode() + except error.HTTPError as exc: + response_body = exc.read().decode("utf-8", errors="replace") + message = f"{method} {path_or_url} failed with HTTP {exc.code}: {response_body}" + raise RuntimeError(message) from exc + + if status not in expected: + decoded = response_body.decode("utf-8", errors="replace") + raise RuntimeError(f"{method} {path_or_url} returned HTTP {status}: {decoded}") + + if not response_body: + return None + return json.loads(response_body) + + def list(self, path, params=None): + results = [] + next_url = self.url(path, params) + + while next_url: + response = self.request("GET", next_url) + if isinstance(response, list): + results.extend(response) + break + + if not isinstance(response, dict) or "results" not in response: + raise RuntimeError(f"unexpected list response from {path}: {response!r}") + + results.extend(response["results"]) + next_url = response.get("next") + + return results + + def post(self, path, body): + return self.request("POST", path, body, expected=(200, 201)) + + def patch(self, path, body): + return self.request("PATCH", path, body, expected=(200,)) + + def delete(self, path): + return self.request("DELETE", path, expected=(200, 202, 204)) + + desired_state.py: | + import json + + + QUEUES_PATH = "/desired/job-queues.json" + JOBS_PATH = "/desired/jobs.json" + + + def dedupe(items): + seen = set() + result = [] + for item in items: + if item in seen: + continue + seen.add(item) + result.append(item) + return result + + + def load_json(path): + with open(path, "r", encoding="utf-8") as fp: + return json.load(fp) + + + def load_queues(path=QUEUES_PATH): + raw = load_json(path) + if not isinstance(raw, list): + raise RuntimeError(f"expected a JSON list in {path}") + + queues = [] + for index, name in enumerate(raw): + if not isinstance(name, str) or not name: + raise RuntimeError(f"queue entry {index} must be a non-empty string") + queues.append(name) + return queues + + + def load_jobs(path=JOBS_PATH): + raw = load_json(path) + if not isinstance(raw, dict): + raise RuntimeError(f"expected a JSON object in {path}") + return raw + + + class DesiredState: + def __init__(self, queues, jobs): + self.queues = queues + self.jobs = jobs + + @classmethod + def load(cls): + return cls(load_queues(), load_jobs()) + + @property + def fail_on_missing(self): + return self.jobs.get("failOnMissing", True) + + @property + def wait_config(self): + return self.jobs.get("waitFor", {}) + + @property + def enable(self): + return self.jobs.get("enable", {}) + + @property + def assignments(self): + return self.jobs.get("queueAssignments", []) + + @property + def requested_job_names(self): + names = list(self.enable.get("names", [])) + for assignment in self.assignments: + names.extend(assignment.get("names", [])) + return dedupe(names) + + @property + def required_queue_names(self): + return dedupe(self.queues) + + reconcile.py: | + import time + + from desired_state import DesiredState + from desired_state import dedupe + from nautobot_api import NautobotAPI + from nautobot_api import object_id + from nautobot_api import object_name + + + QUEUE_TYPE = "celery" + + + def job_key(job): + return job.get("id") or job.get("pk") or job.get("url") + + + def missing_job_names(all_jobs, names): + existing_names = {job.get("name") for job in all_jobs} + return [name for name in names if name not in existing_names] + + + def wait_for_named_jobs(api, names, wait_config, fail_on_missing=True): + attempts = wait_config.get("attempts", 1) + delay_seconds = wait_config.get("delaySeconds", 0) + + for attempt in range(1, attempts + 1): + all_jobs = api.list("/api/extras/jobs/") + missing = missing_job_names(all_jobs, names) + if not missing: + return all_jobs + + if attempt < attempts: + print( + "Waiting for Nautobot Jobs to be registered " + f"(attempt {attempt}/{attempts}); missing: {', '.join(missing)}" + ) + time.sleep(delay_seconds) + continue + + message = ( + "missing Nautobot Jobs after waiting: " + f"{', '.join(missing)}. This chart configures existing Nautobot Job records; " + "the job code must be installed or synced before these records can be enabled." + ) + if fail_on_missing: + raise RuntimeError(message) + + print(message) + return all_jobs + + + def ensure_queues(api, queue_names): + queue_by_name = {queue["name"]: queue for queue in api.list("/api/extras/job-queues/")} + created = 0 + updated = 0 + unchanged = 0 + + for name in queue_names: + queue = queue_by_name.get(name) + if queue is None: + queue_by_name[name] = api.post( + "/api/extras/job-queues/", + {"name": name, "queue_type": QUEUE_TYPE}, + ) + created += 1 + print(f"Created Nautobot JobQueue '{name}' ({QUEUE_TYPE})") + continue + + if queue.get("queue_type") != QUEUE_TYPE: + queue_by_name[name] = api.patch( + f"/api/extras/job-queues/{queue['id']}/", + {"queue_type": QUEUE_TYPE}, + ) + updated += 1 + print(f"Updated Nautobot JobQueue '{name}' queue_type to '{QUEUE_TYPE}'") + continue + + unchanged += 1 + print(f"Nautobot JobQueue '{name}' already exists") + + return queue_by_name, created, updated, unchanged + + + def resolve_jobs(all_jobs, names=None, groupings=None, fail_on_missing=True): + names = names or [] + groupings = groupings or [] + by_name = {job.get("name"): job for job in all_jobs} + matches = [] + missing = [] + + for name in names: + job = by_name.get(name) + if job is None: + missing.append(name) + continue + matches.append(job) + + for grouping in groupings: + grouping_matches = [job for job in all_jobs if job.get("grouping") == grouping] + if not grouping_matches: + print(f"No Nautobot Jobs found for grouping '{grouping}'") + matches.extend(grouping_matches) + + if missing and fail_on_missing: + raise RuntimeError(f"missing Nautobot Jobs: {', '.join(missing)}") + + if missing: + print(f"Skipping missing Nautobot Jobs: {', '.join(missing)}") + + jobs_by_key = {} + for job in matches: + jobs_by_key[job_key(job)] = job + return list(jobs_by_key.values()) + + + def enable_jobs(api, jobs): + enabled = 0 + unchanged = 0 + + for job in jobs: + name = job.get("name") + if job.get("enabled") is True: + unchanged += 1 + print(f"Nautobot Job '{name}' already enabled") + continue + + api.patch(f"/api/extras/jobs/{job['id']}/", {"enabled": True}) + job["enabled"] = True + enabled += 1 + print(f"Enabled Nautobot Job '{name}'") + + return enabled, unchanged + + + def get_queue_assignments(api, job_id): + return api.list("/api/extras/job-queue-assignments/", {"job": job_id}) + + + def set_queue_override(api, job, override): + if job.get("job_queues_override") == override: + return False + + updated_job = api.patch( + f"/api/extras/jobs/{job['id']}/", + {"job_queues_override": override}, + ) + job["job_queues_override"] = updated_job.get("job_queues_override", override) + return True + + + def queue_names_by_id(queue_by_name): + return { + str(object_id(queue)): name + for name, queue in queue_by_name.items() + if object_id(queue) + } + + + def queue_name_from_reference(value, queue_by_name): + name = object_name(value) + if name in queue_by_name: + return name + + ref_id = object_id(value) + if ref_id is None: + return None + + return queue_names_by_id(queue_by_name).get(str(ref_id)) + + + def queue_id_for_name(queue_name, queue_by_name): + queue = queue_by_name.get(queue_name) + if queue is None: + return None + queue_id = object_id(queue) + if queue_id is None: + return None + return str(queue_id) + + + def desired_queue_names_for_job(job, assignment, existing_queue_names, queue_by_name): + desired_queues = assignment.get("queues", []) + include_existing = assignment.get("includeExistingQueues", True) + + queue_names = list(desired_queues) + if include_existing: + queue_names = list(existing_queue_names) + desired_queues + + default_queue = queue_name_from_reference(job.get("default_job_queue"), queue_by_name) + if default_queue: + queue_names.append(default_queue) + + return dedupe(queue_names), default_queue + + + def existing_assignments_by_queue_id(assignments): + return { + str(object_id(item.get("job_queue"))): item + for item in assignments + if object_id(item.get("job_queue")) + } + + + def existing_queue_names(existing_by_queue_id, queue_by_name): + names_by_id = queue_names_by_id(queue_by_name) + names = [] + for queue_id in existing_by_queue_id: + name = names_by_id.get(queue_id) + if name: + names.append(name) + return names + + + def assert_queues_exist(job_name, queue_names, queue_by_name): + missing = [queue_name for queue_name in queue_names if queue_name not in queue_by_name] + if missing: + raise RuntimeError( + f"cannot assign Nautobot Job '{job_name}' to missing JobQueues: {', '.join(missing)}" + ) + + + def create_missing_assignments(api, job, queue_names, existing_by_queue_id, queue_by_name): + changed = False + + for queue_name in queue_names: + queue_id = queue_id_for_name(queue_name, queue_by_name) + if queue_id in existing_by_queue_id: + continue + + try: + api.post( + "/api/extras/job-queue-assignments/", + { + "job": job["id"], + "job_queue": queue_id, + }, + ) + except RuntimeError as exc: + if "must make a unique set" not in str(exc): + raise + print(f"Nautobot Job '{job.get('name')}' is already assigned to JobQueue '{queue_name}'") + continue + + changed = True + print(f"Assigned Nautobot Job '{job.get('name')}' to JobQueue '{queue_name}'") + + return changed + + + def remove_extra_assignments(api, job, existing_by_queue_id, desired_queue_names, default_queue, queue_by_name): + changed = False + desired_queue_ids = { + queue_id_for_name(queue_name, queue_by_name) + for queue_name in desired_queue_names + } + names_by_id = queue_names_by_id(queue_by_name) + + for queue_id, existing_assignment in existing_by_queue_id.items(): + queue_name = names_by_id.get(queue_id, queue_id) + if queue_id in desired_queue_ids: + continue + if queue_name == default_queue: + continue + + api.delete(f"/api/extras/job-queue-assignments/{object_id(existing_assignment)}/") + changed = True + print(f"Removed Nautobot Job '{job.get('name')}' from JobQueue '{queue_name}'") + + return changed + + + def reconcile_job_queue_assignment(api, job, assignment, queue_by_name): + job_name = job.get("name") + existing = existing_assignments_by_queue_id(get_queue_assignments(api, job["id"])) + existing_names = existing_queue_names(existing, queue_by_name) + desired_queue_names, default_queue = desired_queue_names_for_job( + job, + assignment, + existing_names, + queue_by_name, + ) + assert_queues_exist(job_name, desired_queue_names, queue_by_name) + + desired_queue_ids = { + queue_id_for_name(queue_name, queue_by_name) + for queue_name in desired_queue_names + } + existing_matches = set(existing) == desired_queue_ids + override_changed = set_queue_override(api, job, assignment.get("override", True)) + if existing_matches and not override_changed: + print(f"Nautobot Job '{job_name}' already has desired JobQueue assignment") + return False + + changed = create_missing_assignments(api, job, desired_queue_names, existing, queue_by_name) + if not assignment.get("includeExistingQueues", True): + changed = remove_extra_assignments( + api, + job, + existing, + desired_queue_names, + default_queue, + queue_by_name, + ) or changed + + if changed or override_changed: + print(f"Updated Nautobot Job '{job_name}' JobQueue assignment to {desired_queue_names}") + return True + + print(f"Nautobot Job '{job_name}' already has desired JobQueue assignment") + return False + + + def assign_queues(api, all_jobs, queue_by_name, assignments, fail_on_missing=True): + updated = 0 + unchanged = 0 + + for assignment in assignments: + jobs = resolve_jobs( + all_jobs, + names=assignment.get("names", []), + groupings=assignment.get("groupings", []), + fail_on_missing=fail_on_missing, + ) + + for job in jobs: + changed = reconcile_job_queue_assignment(api, job, assignment, queue_by_name) + if changed: + updated += 1 + else: + unchanged += 1 + + return updated, unchanged + + + def main(): + api = NautobotAPI() + desired = DesiredState.load() + + queue_by_name, queues_created, queues_updated, queues_unchanged = ensure_queues( + api, + desired.required_queue_names, + ) + + all_jobs = wait_for_named_jobs( + api, + desired.requested_job_names, + desired.wait_config, + fail_on_missing=desired.fail_on_missing, + ) + + jobs_to_enable = resolve_jobs( + all_jobs, + names=desired.enable.get("names", []), + groupings=desired.enable.get("groupings", []), + fail_on_missing=desired.fail_on_missing, + ) + jobs_enabled, jobs_unchanged = enable_jobs(api, jobs_to_enable) + + assignments_updated, assignments_unchanged = assign_queues( + api, + all_jobs, + queue_by_name, + desired.assignments, + fail_on_missing=desired.fail_on_missing, + ) + + print( + "Nautobot reconciliation complete: " + f"queues {queues_created} created, {queues_updated} updated, {queues_unchanged} unchanged; " + f"jobs {jobs_enabled} enabled, {jobs_unchanged} already enabled; " + f"assignments {assignments_updated} updated, {assignments_unchanged} unchanged" + ) + + + if __name__ == "__main__": + main() diff --git a/charts/nautobot-job-queues/templates/job.yaml b/charts/nautobot-job-queues/templates/job.yaml new file mode 100644 index 000000000..4ce243b2e --- /dev/null +++ b/charts/nautobot-job-queues/templates/job.yaml @@ -0,0 +1,53 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: {{ include "nautobot-job-queues.jobName" . }} + labels: + {{- include "nautobot-job-queues.labels" . | nindent 4 }} + annotations: + argocd.argoproj.io/hook: "PostSync" + argocd.argoproj.io/hook-delete-policy: "BeforeHookCreation" + argocd.argoproj.io/sync-wave: "10" +spec: + backoffLimit: 3 + ttlSecondsAfterFinished: 300 + template: + metadata: + labels: + {{- include "nautobot-job-queues.selectorLabels" . | nindent 8 }} + nautobot-job-queues/job: "ensure" + spec: + serviceAccountName: nautobot + restartPolicy: OnFailure + containers: + - name: ensure + image: {{ printf "ghcr.io/nautobot/nautobot:%s" .Chart.AppVersion | quote }} + imagePullPolicy: IfNotPresent + command: + - /usr/local/bin/python + args: + - /scripts/reconcile.py + env: + - name: PYTHONUNBUFFERED + value: "1" + - name: NAUTOBOT_URL + value: {{ .Values.api.url | quote }} + - name: NAUTOBOT_TOKEN + valueFrom: + secretKeyRef: + name: {{ .Values.api.tokenSecretRef.name | quote }} + key: {{ .Values.api.tokenSecretRef.key | quote }} + volumeMounts: + - name: scripts + mountPath: /scripts + readOnly: true + - name: desired + mountPath: /desired + readOnly: true + volumes: + - name: scripts + configMap: + name: {{ include "nautobot-job-queues.scriptConfigMapName" . }} + - name: desired + configMap: + name: {{ include "nautobot-job-queues.desiredConfigMapName" . }} diff --git a/charts/nautobot-job-queues/values.schema.json b/charts/nautobot-job-queues/values.schema.json new file mode 100644 index 000000000..1cebc6557 --- /dev/null +++ b/charts/nautobot-job-queues/values.schema.json @@ -0,0 +1,148 @@ +{ + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "api": { + "type": "object", + "additionalProperties": false, + "properties": { + "url": { + "type": "string", + "minLength": 1 + }, + "tokenSecretRef": { + "type": "object", + "additionalProperties": false, + "properties": { + "name": { + "type": "string", + "minLength": 1 + }, + "key": { + "type": "string", + "minLength": 1 + } + }, + "required": [ + "name", + "key" + ] + } + }, + "required": [ + "url", + "tokenSecretRef" + ] + }, + "queues": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "jobs": { + "type": "object", + "additionalProperties": false, + "properties": { + "failOnMissing": { + "type": "boolean" + }, + "waitFor": { + "type": "object", + "additionalProperties": false, + "properties": { + "attempts": { + "type": "integer", + "minimum": 1 + }, + "delaySeconds": { + "type": "integer", + "minimum": 0 + } + }, + "required": [ + "attempts", + "delaySeconds" + ] + }, + "enable": { + "type": "object", + "additionalProperties": false, + "properties": { + "groupings": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "names": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + } + }, + "required": [ + "groupings", + "names" + ] + }, + "queueAssignments": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": false, + "properties": { + "names": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "groupings": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, + "queues": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + }, + "minItems": 1 + }, + "includeExistingQueues": { + "type": "boolean" + }, + "override": { + "type": "boolean" + } + }, + "required": [ + "queues" + ] + } + } + }, + "required": [ + "failOnMissing", + "waitFor", + "enable", + "queueAssignments" + ] + } + }, + "required": [ + "api", + "queues", + "jobs" + ] +} diff --git a/charts/nautobot-job-queues/values.yaml b/charts/nautobot-job-queues/values.yaml new file mode 100644 index 000000000..43af39997 --- /dev/null +++ b/charts/nautobot-job-queues/values.yaml @@ -0,0 +1,41 @@ +# -- Nautobot API connection used by the managed reconciliation job. +api: + # -- In-cluster Nautobot API base URL. + url: http://nautobot-default.nautobot.svc.cluster.local + # -- Secret reference containing a Nautobot API token. + tokenSecretRef: + name: nautobot-superuser + key: apitoken + +# -- Nautobot JobQueue names to ensure. +queues: [] +# - site-dc + +# -- Nautobot Job records to reconcile. +jobs: + # -- Fail reconciliation when a requested job name cannot be found. + failOnMissing: true + # -- Wait for requested named jobs to be registered in Nautobot before failing. + waitFor: + attempts: 12 + delaySeconds: 10 + + # -- Jobs to enable. + enable: + # -- Enable all jobs matching these Nautobot groupings. + groupings: [] + # - Rackspace + + # -- Enable jobs matching these exact Nautobot names. + names: [] + # - Backup Configurations + + # -- Optional JobQueue assignments for jobs. + queueAssignments: [] + # - names: + # - Backup Configurations + # groupings: [] + # queues: + # - site-dc + # includeExistingQueues: true + # override: true diff --git a/docs/deploy-guide/components/index.md b/docs/deploy-guide/components/index.md index fe1a02a29..981c3a6f9 100644 --- a/docs/deploy-guide/components/index.md +++ b/docs/deploy-guide/components/index.md @@ -90,6 +90,7 @@ enablement defaults, validation, and troubleshooting notes. | [monitoring](./monitoring.md) | global, site | | [nautobot](./nautobot.md) | global | | [nautobot-api-tokens](./nautobot-api-tokens.md) | global | +| [nautobot-job-queues](./nautobot-job-queues.md) | global | | [nautobotop](./nautobotop.md) | global | | [openebs](./openebs.md) | global, site | | [openstack](./openstack.md) | site | diff --git a/docs/deploy-guide/components/nautobot-job-queues.md b/docs/deploy-guide/components/nautobot-job-queues.md new file mode 100644 index 000000000..803c7cfdb --- /dev/null +++ b/docs/deploy-guide/components/nautobot-job-queues.md @@ -0,0 +1,176 @@ +--- +charts: +- charts/nautobot-job-queues +deploy_overrides: + helm: + mode: values +--- + +# nautobot-job-queues + +Global Nautobot reconciliation job for Celery JobQueue records and +selected existing Nautobot Jobs. + +This component is rendered by +`charts/argocd-understack/templates/application-nautobot-job-queues.yaml`. +It runs against the global Nautobot API and reconciles runtime state in +Nautobot; it does not deploy Celery workers. + +## Deployment Scope + +- Cluster scope: global +- Values key: `global.nautobot_job_queues` +- ArgoCD Application template: `charts/argocd-understack/templates/application-nautobot-job-queues.yaml` +- Helm chart: `charts/nautobot-job-queues` + +The ArgoCD Application is ordered after global Nautobot and +`nautobot-api-tokens` by sync wave. Nautobot must be reachable, and the +configured token Secret must exist before the reconciliation Job can +complete. + +## How ArgoCD Builds It + +{{ component_argocd_builds() }} + +## How to Enable + +Enable this component in the global deployment values file: + +```yaml title="$CLUSTER_NAME/deploy.yaml" +global: + nautobot_job_queues: + enabled: true +``` + +## Deployment Repo Content + +Required deployment repo content: + +- `nautobot-job-queues/values.yaml`: Declares the desired JobQueue + names, existing Jobs to enable, and optional JobQueue assignments. + +Example: + +```yaml title="$CLUSTER_NAME/nautobot-job-queues/values.yaml" +queues: + - site1-dc2 + +jobs: + enable: + groupings: [] + names: + - Backup Configurations + - Deploy Config Plan (Job Button Receiver) + - Deploy Config Plans + - Generate Intended Configurations + - Perform Configuration Compliance + queueAssignments: + - names: + - Backup Configurations + - Deploy Config Plan (Job Button Receiver) + - Deploy Config Plans + - Generate Intended Configurations + - Perform Configuration Compliance + queues: + - site1-dc2 +``` + +## Managed State + +This component creates or updates Nautobot `JobQueue` records declared +under top-level `queues`. + +It can also reconcile existing Nautobot `Job` records: + +- `jobs.enable.groupings`: Enable all Jobs with matching Nautobot + groupings. +- `jobs.enable.names`: Enable Jobs with exact Nautobot names. +- `jobs.queueAssignments`: Add allowed JobQueues for matching Jobs. + +This chart does not create Nautobot `Job` records. Jobs such as +`Backup Configurations` are registered by installed Nautobot plugins or +Git-backed Job code. If a requested Job is missing, fix plugin or Job +registration first. + +## JobQueue Assignments + +Each entry under `jobs.queueAssignments` selects Jobs by `names`, +`groupings`, or both, then assigns them to the listed queues: + +```yaml +jobs: + queueAssignments: + - names: + - Backup Configurations + groupings: + - Network Automation + queues: + - default + - site1-dc2 + includeExistingQueues: true + override: true +``` + +Queues listed in `jobs.queueAssignments[].queues` are assignment +targets only. They are not created unless they also appear under +top-level `queues`. This allows assignments to reference existing +Nautobot queues such as `default` without having the chart create them. + +`includeExistingQueues` defaults to `true`, which preserves queues +already assigned to the Job and appends the desired queues. Set it to +`false` when the desired list should replace non-default existing +assignments. + +`override` defaults to `true`, which sets Nautobot's +`job_queues_override` flag so the API-configured assignments are used. + +## API Access + +By default the reconciliation Job calls the in-cluster Nautobot service +with the `nautobot-superuser` API token: + +```yaml +api: + url: http://nautobot-default.nautobot.svc.cluster.local + tokenSecretRef: + name: nautobot-superuser + key: apitoken +``` + +Override these values only when the Nautobot service name or token +Secret differs for the deployment. + +## Relationship to Site Workers + +The `nautobot-worker` site component deploys Celery workers that listen +on a site-specific queue, usually the site label such as `site1-dc2`. +This global component creates the matching Nautobot `JobQueue` record +and assigns selected Jobs to that queue so Nautobot accepts runs sent to +the site worker. + +When starting a Job through the API, pass the target queue, for example: + +```json +{ + "data": {}, + "task_queue": "site1-dc2" +} +``` + +Nautobot validates that the requested queue exists, is allowed for the +Job, and has a running Celery worker. + +## Troubleshooting + +If reconciliation fails with a missing Job error, the Job has not been +registered in Nautobot yet. Confirm the plugin or Git-backed Job code is +installed and synced, then run Nautobot's registration path again, such +as `nautobot-server migrate --no-input` from a Nautobot pod. + +If running a Job returns `"is not a valid choice"` for `task_queue`, +check that `jobs.queueAssignments` includes the Job and target queue, +then rerun the reconciliation Job. + +If Nautobot rejects the run because no worker is listening, check the +site `nautobot-worker` Application and confirm its worker pod has +`CELERY_TASK_QUEUES` set to the same queue name. diff --git a/docs/deploy-guide/components/nautobot-worker.md b/docs/deploy-guide/components/nautobot-worker.md index fc70a5188..99f655f9c 100644 --- a/docs/deploy-guide/components/nautobot-worker.md +++ b/docs/deploy-guide/components/nautobot-worker.md @@ -20,10 +20,9 @@ application. The web server, Redis, and PostgreSQL all remain on the global cluster -- site workers connect back to those shared services over the network. -For details on how Celery task queues are configured per site and how to -route jobs to site-specific workers, see the -[Nautobot Celery Queues](../../operator-guide/nautobot-celery-queues.md) -operator guide. +The matching Nautobot `JobQueue` records and Job assignments are +reconciled by the +[nautobot-job-queues](./nautobot-job-queues.md) component. ## Deployment Scope diff --git a/docs/operator-guide/nautobot-celery-queues.md b/docs/operator-guide/nautobot-celery-queues.md deleted file mode 100644 index c42c26db2..000000000 --- a/docs/operator-guide/nautobot-celery-queues.md +++ /dev/null @@ -1,261 +0,0 @@ -# Nautobot Celery Queues - -This guide covers how Celery task queues work in the understack -nautobot-worker deployment, how the queue name is derived from the -site name, and how to route jobs to site-specific queues -programmatically. - -## How the Queue Name is Set - -The ArgoCD Application template for `nautobot-worker` automatically -sets the Celery queue name to match the site label -(`understack.rackspace.com/site`). The relevant section in -`application-nautobot-worker.yaml`: - -{% raw %} - -```yaml -{{- with index $.Values.appLabels "understack.rackspace.com/site" }} -values: | - workers: - default: - enabled: false - {{ . }}: - enabled: true - taskQueues: {{ . | quote }} -{{- end }} -``` - -{% endraw %} - -For a site label `site-dc`, this renders as: - -```yaml -workers: - default: - enabled: false - site-dc: - enabled: true - taskQueues: "site-dc" -``` - -This produces a Deployment named `nautobot-worker-celery-site-dc` with -the label `app.kubernetes.io/component: nautobot-celery-site-dc` and -the environment variable `CELERY_TASK_QUEUES=site-dc`. - -The queue name comes from the ArgoCD Application label -`understack.rackspace.com/site`. - -### Why workers.default must be disabled - -The upstream Nautobot Helm chart defines `workers.default.taskQueues: -"default"` in its own `values.yaml`. The chart's `nautobot.workers` -helper merges worker-specific values on top of the `celery` defaults. -If you only set `celery.taskQueues`, the chart's `workers.default` -overrides it because worker-level values take precedence. Disabling -`workers.default` and creating a new worker key avoids this conflict. - -## Nautobot JobQueue Setup - -Before any job can be dispatched to a site queue, a `JobQueue` record -must exist in Nautobot's database. Without it, the API rejects the -request with a validation error. - -### Create via the UI - -Navigate to Jobs > Job Queues > Add and create a queue with: - -- Name: `site-dc` (must match the worker's `taskQueues` value) -- Queue Type: `celery` - -### Create via the REST API - -```bash -curl -X POST \ - -H "Authorization: Token $TOKEN" \ - -H "Content-Type: application/json" \ - https://nautobot.example.com/api/extras/job-queues/ \ - --data '{"name": "site-dc", "queue_type": "celery"}' -``` - -### Create via pynautobot - -```python -import pynautobot - -nb = pynautobot.api("https://nautobot.example.com", token="your-token") -nb.extras.job_queues.create(name="site-dc", queue_type="celery") -``` - -### Automate via Ansible - -The `ansible/roles/jobs/tasks/main.yml` role enables jobs but does not -currently create JobQueues. You can extend it: - -{% raw %} - -```yaml -- name: "Ensure site JobQueue exists" - ansible.builtin.uri: - url: "{{ nautobot_url }}/api/extras/job-queues/" - method: POST - headers: - Authorization: "Token {{ nautobot_token }}" - body_format: json - body: - name: "{{ site }}" - queue_type: "celery" - status_code: [200, 201, 400] -``` - -{% endraw %} - -## Assigning Jobs to Queues - -A job must list the queue in its allowed queues before it can be -dispatched there. There are three ways to do this. - -### Option 1: In the Job class (code) - -Set `task_queues` in the Job's Meta class. This is baked into the -job's source code and applies everywhere the job is installed. - -```python -from nautobot.apps.jobs import Job - -class SyncSiteConfig(Job): - class Meta: - name = "Sync Site Config" - task_queues = ["site-dc", "default"] -``` - -### Option 2: Via the Nautobot UI - -Navigate to Jobs > Jobs, select the job, click Edit, and add the -desired JobQueue(s) under the Job Queues field. Check "Override -job queues" to use the UI-configured queues instead of the ones -defined in code. - -### Option 3: Via the REST API - -```bash -curl -X PATCH \ - -H "Authorization: Token $TOKEN" \ - -H "Content-Type: application/json" \ - https://nautobot.example.com/api/extras/jobs/$JOB_ID/ \ - --data '{ - "job_queues": [{"name": "site-dc"}, {"name": "default"}], - "job_queues_override": true - }' -``` - -## Running Jobs on a Specific Queue - -### Via pynautobot - -```python -import pynautobot - -nb = pynautobot.api("https://nautobot.example.com", token="your-token") - -job = nb.extras.jobs.get(name="my_app.jobs.SyncSiteConfig") - -# Run on the site worker -result = job.run(data={"device": "server-01"}, task_queue="site-dc") -``` - -The `task_queue` parameter (or `job_queue` -- both are accepted in -Nautobot 2.4+) tells Nautobot to dispatch the Celery task to the -specified queue. The site worker listening on that queue picks it up. - -### Via the REST API - -```bash -curl -X POST \ - -H "Authorization: Token $TOKEN" \ - -H "Content-Type: application/json" \ - https://nautobot.example.com/api/extras/jobs/$JOB_ID/run/ \ - --data '{ - "data": {"device": "server-01"}, - "task_queue": "site-dc" - }' -``` - -### Via the Nautobot UI - -When running a job from the web UI, if the job has multiple queues -configured, a dropdown appears allowing you to select the target -queue before clicking "Run Job". - -### Default behavior - -If `task_queue` is not specified, Nautobot dispatches the job to the -job's `default_job_queue`. If no default is configured, it falls back -to `CELERY_TASK_DEFAULT_QUEUE` (typically `"default"`). - -## Validation - -Nautobot validates two things before accepting a job run request: - -1. The requested queue must be in the job's allowed queues list. - If not, the API returns: - `{"task_queue": ["\"site-dc\" is not a valid choice."]}` - -2. At least one Celery worker must be actively listening on the - requested queue. If no worker is found, the API returns a - `CeleryWorkerNotRunningException`. This check uses Celery's - `inspect` to count active workers on the queue. - -## Verifying Workers are Listening - -To confirm a site worker is consuming from the correct queue: - -```bash -# Check the CELERY_TASK_QUEUES env var in the running pod -kubectl -n nautobot get deploy nautobot-worker-celery-site-dc \ - -o jsonpath='{.spec.template.spec.containers[0].env[?(@.name=="CELERY_TASK_QUEUES")].value}' - -# Check worker logs for the queue binding -kubectl logs -n nautobot \ - -l app.kubernetes.io/component=nautobot-celery-site-dc \ - --tail=20 | grep "ready" -``` - -## Multiple Sites - -Each site gets its own queue named after its site label. For example: - -| Site | Site Label | Queue Name | Deployment | -|---|---|---|---| -| DC1 Staging | dc1-staging | dc1-staging | nautobot-worker-celery-dc1-staging | -| DC1 Prod | dc1-prod | dc1-prod | nautobot-worker-celery-dc1-prod | -| DC2 Prod | dc2-prod | dc2-prod | nautobot-worker-celery-dc2-prod | -| DC3 Prod | dc3-prod | dc3-prod | nautobot-worker-celery-dc3-prod | - -Each site's worker only processes tasks from its own queue. The global -Nautobot instance dispatches jobs to the appropriate queue based on the -`task_queue` parameter in the API call. - -## Troubleshooting - -### "is not a valid choice" when running a job - -The job does not have the requested queue in its allowed queues. Either: - -- Add the queue to the job's `task_queues` in code, or -- Add the JobQueue to the job via the UI/API with `job_queues_override: true` - -### CeleryWorkerNotRunningException - -No worker is listening on the requested queue. Check: - -- The site's nautobot-worker ArgoCD Application is synced and healthy -- The worker pod is running: `kubectl get pods -n nautobot -l app.kubernetes.io/component=nautobot-celery-` -- The `CELERY_TASK_QUEUES` env var matches the queue name - -### Job runs but nothing happens - -The job was dispatched to a queue that no worker is consuming. This -can happen if `task_queue` was not specified and the job defaulted to -`"default"`, but the site worker is listening on `"site-dc"`. Always -pass `task_queue` explicitly when targeting a site worker. diff --git a/docs/operator-guide/nautobot.md b/docs/operator-guide/nautobot.md index 2666358a9..ea078cf01 100644 --- a/docs/operator-guide/nautobot.md +++ b/docs/operator-guide/nautobot.md @@ -2,8 +2,8 @@ ## Related Guides -- [Nautobot Celery Queues](nautobot-celery-queues.md) -- configuring - per-site Celery task queues and routing jobs to site-specific workers +- [nautobot-job-queues](../deploy-guide/components/nautobot-job-queues.md) -- + reconciling Nautobot JobQueue records and assigning Jobs to queues - [mTLS Certificate Renewal](nautobot-mtls-certificate-renewal.md) -- how mTLS client certificates for site workers are renewed and distributed across clusters diff --git a/mkdocs.yml b/mkdocs.yml index 5d52b5bad..fa91a2d07 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -188,6 +188,7 @@ nav: - deploy-guide/components/mariadb-operator.md - deploy-guide/components/monitoring.md - deploy-guide/components/nautobot-api-tokens.md + - deploy-guide/components/nautobot-job-queues.md - deploy-guide/components/nautobot.md - deploy-guide/components/nautobotop.md - deploy-guide/components/nautobot-worker.md @@ -240,7 +241,6 @@ nav: - operator-guide/rook-ceph.md - operator-guide/nautobot.md - operator-guide/nautobotop.md - - operator-guide/nautobot-celery-queues.md - operator-guide/nautobot-mtls-certificate-renewal.md - operator-guide/troubleshooting-osh.md - operator-guide/logging.md