diff --git a/openapi/openapiv2.json b/openapi/openapiv2.json index bd0bc4351..005a4d5d1 100644 --- a/openapi/openapiv2.json +++ b/openapi/openapiv2.json @@ -2184,6 +2184,98 @@ ] } }, + "/api/v1/namespaces/{namespace}/workers": { + "get": { + "summary": "ListWorkers is a visibility API to list worker status information in a specific namespace.", + "operationId": "ListWorkers2", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1ListWorkersResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "pageSize", + "in": "query", + "required": false, + "type": "integer", + "format": "int32" + }, + { + "name": "nextPageToken", + "in": "query", + "required": false, + "type": "string", + "format": "byte" + }, + { + "name": "query", + "description": "`query` in ListWorkers is used to filter workers based on worker status info.\nThe following worker status attributes are expected are supported as part of the query:\n* WorkerInstanceKey\n* WorkerIdentity\n* HostName\n* TaskQueue\n* DeploymentName\n* BuildId\n* SdkName\n* SdkVersion\n* StartTime\n* LastHeartbeatTime\n* Status\nCurrently metrics are not supported as a part of ListWorkers query.", + "in": "query", + "required": false, + "type": "string" + } + ], + "tags": [ + "WorkflowService" + ] + } + }, + "/api/v1/namespaces/{namespace}/workers/heartbeat": { + "post": { + "summary": "WorkerHeartbeat receive heartbeat request from the worker.", + "operationId": "RecordWorkerHeartbeat2", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1RecordWorkerHeartbeatResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "description": "Namespace this worker belongs to.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/WorkflowServiceRecordWorkerHeartbeatBody" + } + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/api/v1/namespaces/{namespace}/workflow-count": { "get": { "summary": "CountWorkflowExecutions is a visibility API to count of workflow executions in a specific namespace.", @@ -5693,6 +5785,98 @@ ] } }, + "/namespaces/{namespace}/workers": { + "get": { + "summary": "ListWorkers is a visibility API to list worker status information in a specific namespace.", + "operationId": "ListWorkers", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1ListWorkersResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "pageSize", + "in": "query", + "required": false, + "type": "integer", + "format": "int32" + }, + { + "name": "nextPageToken", + "in": "query", + "required": false, + "type": "string", + "format": "byte" + }, + { + "name": "query", + "description": "`query` in ListWorkers is used to filter workers based on worker status info.\nThe following worker status attributes are expected are supported as part of the query:\n* WorkerInstanceKey\n* WorkerIdentity\n* HostName\n* TaskQueue\n* DeploymentName\n* BuildId\n* SdkName\n* SdkVersion\n* StartTime\n* LastHeartbeatTime\n* Status\nCurrently metrics are not supported as a part of ListWorkers query.", + "in": "query", + "required": false, + "type": "string" + } + ], + "tags": [ + "WorkflowService" + ] + } + }, + "/namespaces/{namespace}/workers/heartbeat": { + "post": { + "summary": "WorkerHeartbeat receive heartbeat request from the worker.", + "operationId": "RecordWorkerHeartbeat", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1RecordWorkerHeartbeatResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "description": "Namespace this worker belongs to.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/WorkflowServiceRecordWorkerHeartbeatBody" + } + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/namespaces/{namespace}/workflow-count": { "get": { "summary": "CountWorkflowExecutions is a visibility API to count of workflow executions in a specific namespace.", @@ -7385,6 +7569,18 @@ } } }, + "WorkflowServiceRecordWorkerHeartbeatBody": { + "type": "object", + "properties": { + "identity": { + "type": "string", + "description": "The identity of the client who initiated this request." + }, + "workerHeartbeat": { + "$ref": "#/definitions/v1WorkerHeartbeat" + } + } + }, "WorkflowServiceRequestCancelWorkflowExecutionBody": { "type": "object", "properties": { @@ -11174,6 +11370,23 @@ } } }, + "v1ListWorkersResponse": { + "type": "object", + "properties": { + "workersInfo": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1WorkerInfo" + } + }, + "nextPageToken": { + "type": "string", + "format": "byte", + "title": "Next page token" + } + } + }, "v1ListWorkflowExecutionsResponse": { "type": "object", "properties": { @@ -12507,6 +12720,9 @@ } } }, + "v1RecordWorkerHeartbeatResponse": { + "type": "object" + }, "v1RegisterNamespaceRequest": { "type": "object", "properties": { @@ -15053,6 +15269,193 @@ "default": "WORKER_DEPLOYMENT_VERSION_STATUS_UNSPECIFIED", "description": "Specify the status of a Worker Deployment Version.\nExperimental. Worker Deployments are experimental and might significantly change in the future.\n\n - WORKER_DEPLOYMENT_VERSION_STATUS_INACTIVE: The Worker Deployment Version has been created inside the Worker Deployment but is not used by any\nworkflow executions. These Versions can still have workflows if they have an explicit Versioning Override targeting\nthis Version. Such Versioning Override could be set at workflow start time, or at a later time via `UpdateWorkflowExecutionOptions`.\n - WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT: The Worker Deployment Version is the current version of the Worker Deployment. All new workflow executions \nand tasks of existing unversioned or AutoUpgrade workflows are routed to this version.\n - WORKER_DEPLOYMENT_VERSION_STATUS_RAMPING: The Worker Deployment Version is the ramping version of the Worker Deployment. A subset of new Pinned workflow executions are \nrouted to this version. Moreover, a portion of existing unversioned or AutoUpgrade workflow executions are also routed to this version.\n - WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING: The Worker Deployment Version is not used by new workflows but is still used by\nopen pinned workflows. The version cannot be decommissioned safely.\n - WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED: The Worker Deployment Version is not used by new or open workflows, but might be still needed by\nQueries sent to closed workflows. The version can be decommissioned safely if user does\nnot query closed workflows. If the user does query closed workflows for some time x after\nworkflows are closed, they should decommission the version after it has been drained for that duration." }, + "v1WorkerHeartbeat": { + "type": "object", + "properties": { + "workerInstanceKey": { + "type": "string", + "description": "Worker identifier, should be unique for the namespace.\nIt is distinct from worker identity, which is not necessarily namespace-unique." + }, + "workerIdentity": { + "type": "string", + "description": "Worker identity, set by the client, may not be unique.\nUsually host_name+(user group name)+process_id, but can be overwritten by the user." + }, + "hostInfo": { + "$ref": "#/definitions/v1WorkerHostInfo", + "description": "Worker host information." + }, + "taskQueue": { + "type": "string", + "description": "Task queue this worker is polling for tasks." + }, + "deploymentVersion": { + "$ref": "#/definitions/v1WorkerDeploymentVersion" + }, + "sdkName": { + "type": "string" + }, + "sdkVersion": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/v1WorkerStatus", + "description": "Worker status. Defined by SDK." + }, + "startTime": { + "type": "string", + "format": "date-time", + "title": "Worker start time.\nIt can be used to determine worker uptime. (current time - start time)" + }, + "heartbeatTime": { + "type": "string", + "format": "date-time", + "description": "Timestamp of this heartbeat, coming from the worker. Worker should set it to \"now\".\nNote that this timestamp comes directly from the worker and is subject to workers' clock skew." + }, + "elapsedSinceLastHeartbeat": { + "type": "string", + "description": "Elapsed time since the last heartbeat from the worker." + }, + "workflowTaskSlotsInfo": { + "$ref": "#/definitions/v1WorkerSlotsInfo" + }, + "activityTaskSlotsInfo": { + "$ref": "#/definitions/v1WorkerSlotsInfo" + }, + "nexusTaskSlotsInfo": { + "$ref": "#/definitions/v1WorkerSlotsInfo" + }, + "localActivitySlotsInfo": { + "$ref": "#/definitions/v1WorkerSlotsInfo" + }, + "workflowPollerInfo": { + "$ref": "#/definitions/v1WorkerPollerInfo" + }, + "workflowStickyPollerInfo": { + "$ref": "#/definitions/v1WorkerPollerInfo" + }, + "activityPollerInfo": { + "$ref": "#/definitions/v1WorkerPollerInfo" + }, + "nexusPollerInfo": { + "$ref": "#/definitions/v1WorkerPollerInfo" + }, + "totalStickyCacheHit": { + "type": "integer", + "format": "int32", + "description": "A Workflow Task found a cached Workflow Execution to run against." + }, + "totalStickyCacheMiss": { + "type": "integer", + "format": "int32", + "description": "A Workflow Task did not find a cached Workflow execution to run against." + }, + "currentStickyCacheSize": { + "type": "integer", + "format": "int32", + "description": "Current cache size, expressed in number of Workflow Executions." + } + }, + "description": "Worker info message, contains information about the worker and its current state.\nAll information is provided by the worker itself." + }, + "v1WorkerHostInfo": { + "type": "object", + "properties": { + "hostName": { + "type": "string", + "description": "Worker host identifier." + }, + "processId": { + "type": "string", + "description": "Worker process identifier, should be unique for the host." + }, + "currentHostCpuUsage": { + "type": "number", + "format": "float", + "description": "System used CPU as a float in the range [0.0, 1.0] where 1.0 is defined as all\ncores on the host pegged." + }, + "currentHostMemUsage": { + "type": "number", + "format": "float", + "description": "System used memory as a float in the range [0.0, 1.0] where 1.0 is defined as\nall available memory on the host is used." + } + }, + "title": "Holds everything needed to identify the worker host/process context" + }, + "v1WorkerInfo": { + "type": "object", + "properties": { + "workerHeartbeat": { + "$ref": "#/definitions/v1WorkerHeartbeat" + } + } + }, + "v1WorkerPollerInfo": { + "type": "object", + "properties": { + "currentPollers": { + "type": "integer", + "format": "int32", + "description": "Number of polling RPCs that are currently in flight." + }, + "lastSuccessfulPollTime": { + "type": "string", + "format": "date-time" + }, + "isAutoscaling": { + "type": "boolean", + "title": "Set true if the number of concurrent pollers is auto-scaled" + } + } + }, + "v1WorkerSlotsInfo": { + "type": "object", + "properties": { + "currentAvailableSlots": { + "type": "integer", + "format": "int32", + "description": "Number of slots available for the worker to specific tasks.\nMay be -1 if the upper bound is not known." + }, + "currentUsedSlots": { + "type": "integer", + "format": "int32", + "description": "Number of slots used by the worker for specific tasks." + }, + "slotSupplierKind": { + "type": "string", + "title": "Kind of the slot supplier, which is used to determine how the slots are allocated.\nPossible values: \"Fixed | ResourceBased | Custom String\"" + }, + "totalProcessedTasks": { + "type": "integer", + "format": "int32", + "description": "Total number of tasks processed (completed both successfully and unsuccesfully, or any other way)\nby the worker since the worker started. This is a cumulative counter." + }, + "totalFailedTasks": { + "type": "integer", + "format": "int32", + "description": "Total number of failed tasks processed by the worker so far." + }, + "lastIntervalProcessedTasks": { + "type": "integer", + "format": "int32", + "description": "Number of tasks processed in since the last heartbeat from the worker.\nThis is a cumulative counter, and it is reset to 0 each time the worker sends a heartbeat.\nContains both successful and failed tasks." + }, + "lastIntervalFailureTasks": { + "type": "integer", + "format": "int32", + "description": "Number of failed tasks processed since the last heartbeat from the worker." + } + } + }, + "v1WorkerStatus": { + "type": "string", + "enum": [ + "WORKER_STATUS_UNSPECIFIED", + "WORKER_STATUS_RUNNING", + "WORKER_STATUS_SHUTTING_DOWN", + "WORKER_STATUS_SHUTDOWN" + ], + "default": "WORKER_STATUS_UNSPECIFIED" + }, "v1WorkerVersionCapabilities": { "type": "object", "properties": { diff --git a/openapi/openapiv3.yaml b/openapi/openapiv3.yaml index a0dfa04d3..1b454569e 100644 --- a/openapi/openapiv3.yaml +++ b/openapi/openapiv3.yaml @@ -1982,6 +1982,92 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /api/v1/namespaces/{namespace}/workers: + get: + tags: + - WorkflowService + description: ListWorkers is a visibility API to list worker status information in a specific namespace. + operationId: ListWorkers + parameters: + - name: namespace + in: path + required: true + schema: + type: string + - name: pageSize + in: query + schema: + type: integer + format: int32 + - name: nextPageToken + in: query + schema: + type: string + format: bytes + - name: query + in: query + description: |- + `query` in ListWorkers is used to filter workers based on worker status info. + The following worker status attributes are expected are supported as part of the query: + * WorkerInstanceKey + * WorkerIdentity + * HostName + * TaskQueue + * DeploymentName + * BuildId + * SdkName + * SdkVersion + * StartTime + * LastHeartbeatTime + * Status + Currently metrics are not supported as a part of ListWorkers query. + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ListWorkersResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' + /api/v1/namespaces/{namespace}/workers/heartbeat: + post: + tags: + - WorkflowService + description: WorkerHeartbeat receive heartbeat request from the worker. + operationId: RecordWorkerHeartbeat + parameters: + - name: namespace + in: path + description: Namespace this worker belongs to. + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RecordWorkerHeartbeatRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/RecordWorkerHeartbeatResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /api/v1/namespaces/{namespace}/workflow-count: get: tags: @@ -5129,6 +5215,92 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /namespaces/{namespace}/workers: + get: + tags: + - WorkflowService + description: ListWorkers is a visibility API to list worker status information in a specific namespace. + operationId: ListWorkers + parameters: + - name: namespace + in: path + required: true + schema: + type: string + - name: pageSize + in: query + schema: + type: integer + format: int32 + - name: nextPageToken + in: query + schema: + type: string + format: bytes + - name: query + in: query + description: |- + `query` in ListWorkers is used to filter workers based on worker status info. + The following worker status attributes are expected are supported as part of the query: + * WorkerInstanceKey + * WorkerIdentity + * HostName + * TaskQueue + * DeploymentName + * BuildId + * SdkName + * SdkVersion + * StartTime + * LastHeartbeatTime + * Status + Currently metrics are not supported as a part of ListWorkers query. + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ListWorkersResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' + /namespaces/{namespace}/workers/heartbeat: + post: + tags: + - WorkflowService + description: WorkerHeartbeat receive heartbeat request from the worker. + operationId: RecordWorkerHeartbeat + parameters: + - name: namespace + in: path + description: Namespace this worker belongs to. + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RecordWorkerHeartbeatRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/RecordWorkerHeartbeatResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /namespaces/{namespace}/workflow-count: get: tags: @@ -8288,6 +8460,17 @@ components: - $ref: '#/components/schemas/WorkerDeploymentInfo_WorkerDeploymentVersionSummary' description: Summary of the ramping version of the Worker Deployment. description: A subset of WorkerDeploymentInfo + ListWorkersResponse: + type: object + properties: + workersInfo: + type: array + items: + $ref: '#/components/schemas/WorkerInfo' + nextPageToken: + type: string + description: Next page token + format: bytes ListWorkflowExecutionsResponse: type: object properties: @@ -9498,6 +9681,20 @@ components: description: |- Will be set to true if the activity was reset. Applies only to the current run. + RecordWorkerHeartbeatRequest: + type: object + properties: + namespace: + type: string + description: Namespace this worker belongs to. + identity: + type: string + description: The identity of the client who initiated this request. + workerHeartbeat: + $ref: '#/components/schemas/WorkerHeartbeat' + RecordWorkerHeartbeatResponse: + type: object + properties: {} RegisterNamespaceRequest: type: object properties: @@ -12467,6 +12664,169 @@ components: - TASK_QUEUE_TYPE_NEXUS type: string format: enum + WorkerHeartbeat: + type: object + properties: + workerInstanceKey: + type: string + description: |- + Worker identifier, should be unique for the namespace. + It is distinct from worker identity, which is not necessarily namespace-unique. + workerIdentity: + type: string + description: |- + Worker identity, set by the client, may not be unique. + Usually host_name+(user group name)+process_id, but can be overwritten by the user. + hostInfo: + allOf: + - $ref: '#/components/schemas/WorkerHostInfo' + description: Worker host information. + taskQueue: + type: string + description: Task queue this worker is polling for tasks. + deploymentVersion: + $ref: '#/components/schemas/WorkerDeploymentVersion' + sdkName: + type: string + sdkVersion: + type: string + status: + enum: + - WORKER_STATUS_UNSPECIFIED + - WORKER_STATUS_RUNNING + - WORKER_STATUS_SHUTTING_DOWN + - WORKER_STATUS_SHUTDOWN + type: string + description: Worker status. Defined by SDK. + format: enum + startTime: + type: string + description: |- + Worker start time. + It can be used to determine worker uptime. (current time - start time) + format: date-time + heartbeatTime: + type: string + description: |- + Timestamp of this heartbeat, coming from the worker. Worker should set it to "now". + Note that this timestamp comes directly from the worker and is subject to workers' clock skew. + format: date-time + elapsedSinceLastHeartbeat: + pattern: ^-?(?:0|[1-9][0-9]{0,11})(?:\.[0-9]{1,9})?s$ + type: string + description: Elapsed time since the last heartbeat from the worker. + workflowTaskSlotsInfo: + $ref: '#/components/schemas/WorkerSlotsInfo' + activityTaskSlotsInfo: + $ref: '#/components/schemas/WorkerSlotsInfo' + nexusTaskSlotsInfo: + $ref: '#/components/schemas/WorkerSlotsInfo' + localActivitySlotsInfo: + $ref: '#/components/schemas/WorkerSlotsInfo' + workflowPollerInfo: + $ref: '#/components/schemas/WorkerPollerInfo' + workflowStickyPollerInfo: + $ref: '#/components/schemas/WorkerPollerInfo' + activityPollerInfo: + $ref: '#/components/schemas/WorkerPollerInfo' + nexusPollerInfo: + $ref: '#/components/schemas/WorkerPollerInfo' + totalStickyCacheHit: + type: integer + description: A Workflow Task found a cached Workflow Execution to run against. + format: int32 + totalStickyCacheMiss: + type: integer + description: A Workflow Task did not find a cached Workflow execution to run against. + format: int32 + currentStickyCacheSize: + type: integer + description: Current cache size, expressed in number of Workflow Executions. + format: int32 + description: |- + Worker info message, contains information about the worker and its current state. + All information is provided by the worker itself. + (-- api-linter: core::0140::prepositions=disabled + aip.dev/not-precedent: Removing those words make names less clear. --) + WorkerHostInfo: + type: object + properties: + hostName: + type: string + description: Worker host identifier. + processId: + type: string + description: Worker process identifier, should be unique for the host. + currentHostCpuUsage: + type: number + description: |- + System used CPU as a float in the range [0.0, 1.0] where 1.0 is defined as all + cores on the host pegged. + format: float + currentHostMemUsage: + type: number + description: |- + System used memory as a float in the range [0.0, 1.0] where 1.0 is defined as + all available memory on the host is used. + format: float + description: Holds everything needed to identify the worker host/process context + WorkerInfo: + type: object + properties: + workerHeartbeat: + $ref: '#/components/schemas/WorkerHeartbeat' + WorkerPollerInfo: + type: object + properties: + currentPollers: + type: integer + description: Number of polling RPCs that are currently in flight. + format: int32 + lastSuccessfulPollTime: + type: string + format: date-time + isAutoscaling: + type: boolean + description: Set true if the number of concurrent pollers is auto-scaled + WorkerSlotsInfo: + type: object + properties: + currentAvailableSlots: + type: integer + description: |- + Number of slots available for the worker to specific tasks. + May be -1 if the upper bound is not known. + format: int32 + currentUsedSlots: + type: integer + description: Number of slots used by the worker for specific tasks. + format: int32 + slotSupplierKind: + type: string + description: |- + Kind of the slot supplier, which is used to determine how the slots are allocated. + Possible values: "Fixed | ResourceBased | Custom String" + totalProcessedTasks: + type: integer + description: |- + Total number of tasks processed (completed both successfully and unsuccesfully, or any other way) + by the worker since the worker started. This is a cumulative counter. + format: int32 + totalFailedTasks: + type: integer + description: Total number of failed tasks processed by the worker so far. + format: int32 + lastIntervalProcessedTasks: + type: integer + description: |- + Number of tasks processed in since the last heartbeat from the worker. + This is a cumulative counter, and it is reset to 0 each time the worker sends a heartbeat. + Contains both successful and failed tasks. + format: int32 + lastIntervalFailureTasks: + type: integer + description: Number of failed tasks processed since the last heartbeat from the worker. + format: int32 WorkerVersionCapabilities: type: object properties: diff --git a/temporal/api/enums/v1/common.proto b/temporal/api/enums/v1/common.proto index c45174b77..192c1d75b 100644 --- a/temporal/api/enums/v1/common.proto +++ b/temporal/api/enums/v1/common.proto @@ -96,4 +96,13 @@ enum ApplicationErrorCategory { APPLICATION_ERROR_CATEGORY_UNSPECIFIED = 0; // Expected application error with little/no severity. APPLICATION_ERROR_CATEGORY_BENIGN = 1; -} \ No newline at end of file +} + +// (-- api-linter: core::0216::synonyms=disabled +// aip.dev/not-precedent: It seems we have both state and status, and status is a better fit for workers. --) +enum WorkerStatus { + WORKER_STATUS_UNSPECIFIED = 0; + WORKER_STATUS_RUNNING = 1; + WORKER_STATUS_SHUTTING_DOWN = 2; + WORKER_STATUS_SHUTDOWN = 3; +} diff --git a/temporal/api/worker/v1/message.proto b/temporal/api/worker/v1/message.proto new file mode 100644 index 000000000..024357ce5 --- /dev/null +++ b/temporal/api/worker/v1/message.proto @@ -0,0 +1,126 @@ +syntax = "proto3"; + +package temporal.api.worker.v1; + +option go_package = "go.temporal.io/api/worker/v1;worker"; +option java_package = "io.temporal.api.worker.v1"; +option java_multiple_files = true; +option java_outer_classname = "MessageProto"; +option ruby_package = "Temporalio::Api::Worker::V1"; +option csharp_namespace = "Temporalio.Api.Worker.V1"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "temporal/api/deployment/v1/message.proto"; +import "temporal/api/enums/v1/common.proto"; + +message WorkerPollerInfo { + // Number of polling RPCs that are currently in flight. + int32 current_pollers = 1; + + google.protobuf.Timestamp last_successful_poll_time = 2; + + // Set true if the number of concurrent pollers is auto-scaled + bool is_autoscaling = 3; +} + +message WorkerSlotsInfo { + // Number of slots available for the worker to specific tasks. + // May be -1 if the upper bound is not known. + int32 current_available_slots = 1; + // Number of slots used by the worker for specific tasks. + int32 current_used_slots = 2; + + // Kind of the slot supplier, which is used to determine how the slots are allocated. + // Possible values: "Fixed | ResourceBased | Custom String" + string slot_supplier_kind = 3; + + // Total number of tasks processed (completed both successfully and unsuccesfully, or any other way) + // by the worker since the worker started. This is a cumulative counter. + int32 total_processed_tasks = 4; + // Total number of failed tasks processed by the worker so far. + int32 total_failed_tasks = 5; + + // Number of tasks processed in since the last heartbeat from the worker. + // This is a cumulative counter, and it is reset to 0 each time the worker sends a heartbeat. + // Contains both successful and failed tasks. + int32 last_interval_processed_tasks = 6; + // Number of failed tasks processed since the last heartbeat from the worker. + int32 last_interval_failure_tasks = 7; +} + +// Holds everything needed to identify the worker host/process context +message WorkerHostInfo { + // Worker host identifier. + string host_name = 1; + + // Worker process identifier, should be unique for the host. + string process_id = 2; + + // System used CPU as a float in the range [0.0, 1.0] where 1.0 is defined as all + // cores on the host pegged. + float current_host_cpu_usage = 3; + // System used memory as a float in the range [0.0, 1.0] where 1.0 is defined as + // all available memory on the host is used. + float current_host_mem_usage = 4; +} + +// Worker info message, contains information about the worker and its current state. +// All information is provided by the worker itself. +// (-- api-linter: core::0140::prepositions=disabled +// aip.dev/not-precedent: Removing those words make names less clear. --) +message WorkerHeartbeat { + // Worker identifier, should be unique for the namespace. + // It is distinct from worker identity, which is not necessarily namespace-unique. + string worker_instance_key = 1; + + // Worker identity, set by the client, may not be unique. + // Usually host_name+(user group name)+process_id, but can be overwritten by the user. + string worker_identity = 2; + + + // Worker host information. + WorkerHostInfo host_info = 3; + + // Task queue this worker is polling for tasks. + string task_queue = 4; + + temporal.api.deployment.v1.WorkerDeploymentVersion deployment_version = 5; + + string sdk_name = 6; + string sdk_version = 7; + + // Worker status. Defined by SDK. + temporal.api.enums.v1.WorkerStatus status = 8; + + // Worker start time. + // It can be used to determine worker uptime. (current time - start time) + google.protobuf.Timestamp start_time = 9; + + // Timestamp of this heartbeat, coming from the worker. Worker should set it to "now". + // Note that this timestamp comes directly from the worker and is subject to workers' clock skew. + google.protobuf.Timestamp heartbeat_time = 10; + // Elapsed time since the last heartbeat from the worker. + google.protobuf.Duration elapsed_since_last_heartbeat = 11; + + WorkerSlotsInfo workflow_task_slots_info = 12; + WorkerSlotsInfo activity_task_slots_info = 13; + WorkerSlotsInfo nexus_task_slots_info = 14; + WorkerSlotsInfo local_activity_slots_info = 15; + + WorkerPollerInfo workflow_poller_info = 16; + WorkerPollerInfo workflow_sticky_poller_info = 17; + WorkerPollerInfo activity_poller_info = 18; + WorkerPollerInfo nexus_poller_info = 19; + + // A Workflow Task found a cached Workflow Execution to run against. + int32 total_sticky_cache_hit = 20; + // A Workflow Task did not find a cached Workflow execution to run against. + int32 total_sticky_cache_miss = 21; + // Current cache size, expressed in number of Workflow Executions. + int32 current_sticky_cache_size = 22; +} + +message WorkerInfo { + WorkerHeartbeat worker_heartbeat = 1; +} \ No newline at end of file diff --git a/temporal/api/workflowservice/v1/request_response.proto b/temporal/api/workflowservice/v1/request_response.proto index 544499b77..652cc257c 100644 --- a/temporal/api/workflowservice/v1/request_response.proto +++ b/temporal/api/workflowservice/v1/request_response.proto @@ -40,6 +40,7 @@ import "temporal/api/batch/v1/message.proto"; import "temporal/api/sdk/v1/task_complete_metadata.proto"; import "temporal/api/sdk/v1/user_metadata.proto"; import "temporal/api/nexus/v1/message.proto"; +import "temporal/api/worker/v1/message.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/field_mask.proto"; @@ -265,6 +266,9 @@ message PollWorkflowTaskQueueRequest { // Worker deployment options that user has set in the worker. // Experimental. Worker Deployments are experimental and might significantly change in the future. temporal.api.deployment.v1.WorkerDeploymentOptions deployment_options = 6; + + // Worker info to be sent to the server. + temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 7; } message PollWorkflowTaskQueueResponse { @@ -436,6 +440,10 @@ message PollActivityTaskQueueRequest { temporal.api.common.v1.WorkerVersionCapabilities worker_version_capabilities = 5 [deprecated = true]; // Worker deployment options that user has set in the worker. temporal.api.deployment.v1.WorkerDeploymentOptions deployment_options = 6; + + // Worker info to be sent to the server. + temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 7; + } message PollActivityTaskQueueResponse { @@ -993,6 +1001,8 @@ message ShutdownWorkerRequest { string sticky_task_queue = 2; string identity = 3; string reason = 4; + + temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 5; } message ShutdownWorkerResponse { @@ -1170,6 +1180,7 @@ message GetSystemInfoResponse { // True if the server supports Nexus operations. // This flag is dependent both on server version and for Nexus to be enabled via server configuration. bool nexus = 11; + } } @@ -1754,6 +1765,9 @@ message PollNexusTaskQueueRequest { temporal.api.common.v1.WorkerVersionCapabilities worker_version_capabilities = 4 [deprecated = true]; // Worker deployment options that user has set in the worker. temporal.api.deployment.v1.WorkerDeploymentOptions deployment_options = 6; + + // Worker info to be sent to the server. + temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 7; } message PollNexusTaskQueueResponse { @@ -2346,3 +2360,45 @@ message TriggerWorkflowRuleResponse { // True is the rule was applied, based on the rule conditions (predicate/visibility_query). bool applied = 1; } +message RecordWorkerHeartbeatRequest { + // Namespace this worker belongs to. + string namespace = 1; + + // The identity of the client who initiated this request. + string identity = 2; + + temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 3; +} + +message RecordWorkerHeartbeatResponse { + +} + +message ListWorkersRequest { + string namespace = 1; + int32 page_size = 2; + bytes next_page_token = 3; + + // `query` in ListWorkers is used to filter workers based on worker status info. + // The following worker status attributes are expected are supported as part of the query: + //* WorkerInstanceKey + //* WorkerIdentity + //* HostName + //* TaskQueue + //* DeploymentName + //* BuildId + //* SdkName + //* SdkVersion + //* StartTime + //* LastHeartbeatTime + //* Status + // Currently metrics are not supported as a part of ListWorkers query. + string query = 4; +} + +message ListWorkersResponse { + repeated temporal.api.worker.v1.WorkerInfo workers_info = 1; + + // Next page token + bytes next_page_token = 2; +} diff --git a/temporal/api/workflowservice/v1/service.proto b/temporal/api/workflowservice/v1/service.proto index 865386506..bfa622dcd 100644 --- a/temporal/api/workflowservice/v1/service.proto +++ b/temporal/api/workflowservice/v1/service.proto @@ -1173,4 +1173,25 @@ service WorkflowService { }; } + // WorkerHeartbeat receive heartbeat request from the worker. + rpc RecordWorkerHeartbeat (RecordWorkerHeartbeatRequest) returns (RecordWorkerHeartbeatResponse) { + option (google.api.http) = { + post: "/namespaces/{namespace}/workers/heartbeat" + body: "*" + additional_bindings { + post: "/api/v1/namespaces/{namespace}/workers/heartbeat" + body: "*" + } + }; + }; + + // ListWorkers is a visibility API to list worker status information in a specific namespace. + rpc ListWorkers (ListWorkersRequest) returns (ListWorkersResponse) { + option (google.api.http) = { + get: "/namespaces/{namespace}/workers" + additional_bindings { + get: "/api/v1/namespaces/{namespace}/workers" + } + }; + } }