Note: The queue API is experimental and subject to change, but it's ready to use if you like living on the edge!
Long-running deep learning models or batch processing is best architected as a queue. Cog has a built-in queue worker that can process predictions from a Redis queue, and return the output back to the queue.
The request queue is implemented with Redis streams.
See github.com/replicate/cog-redis-example, which contains a Docker Compose setup for running a Cog model with the built-in Redis worker.
The entrypoint to run a Cog model against a queue is cog.server.redis_queue. You need to provide the following positional arguments:
redis_host: the host your redis server is running on.redis_port: the port your redis server is listening on.input_queue: the queue the Cog model will listen to for prediction requests. This queue should already exist.upload_url: the endpoint Cog willPUToutput files to. (Note: this will change in the near future. See this pull request for more details.)consumer_id: The name the Cog model will use to identify itself in the Redis group (also called "consumer name" by Redis).model_id: a unique ID for the Cog model, used to label setup logs.log_queue: the queue the Cog model should send setup logs to (prediction logs are sent as part of prediction responses).
Note: logging is changing as part of 0.3.0, so the model_id and log_queue arguments are likely to change soon.
You can optionally provide the following positional argument:
predict_timeout: the maximum time in seconds a prediction will be allowed to run for before it is terminated.
For example:
docker run python -m cog.server.redis_queue \
redis 6379 my-predict-queue \
https://example.com/ab48b7ff-1589-4360-a54b-47f9d8d3f6b7/ \
worker-1 \
widget-classifier logs-queue \
120
After starting, the setup() method of the predictor is called. When setup is finished the model will start polling the input queue for prediction request messages.
The message body should be a JSON object with the following fields:
input: a JSON object with the same keys as the arguments to thepredict()function. AnyFileorPathinputs are passed as URLs.response_queue: the Redis queue Cog will send responses to
You can enqueue the request using the XADD command:
redis:6379> XADD my-predict-queue * value {"input":{"tolerance":0.05},"response_queue":"my-response-queue"}
The model will send a message to the queue every time something happens:
- when the model generates some logs
- when the model returns some output
- when the model finishes running
The message body is a JSON object with the following fields:
status:processing,succeededorfailed.output: The return value of thepredict()function.logs: A list of any logs sent to stdout or stderr during the prediction.error: Ifstatusisfailed, the error message.
For example, a message early in the prediction might look like:
{
"status": "processing",
"output": null,
"logs": [
"Creating model and diffusion.",
"Done creating model and diffusion."
]
}
If the model yields progressive output then a mid-prediction message might look like:
{
"status": "processing",
"output": [
"https://example.com/ab48b7ff-1589-4360-a54b-47f9d8d3f6b7/0.jpg",
"https://example.com/ab48b7ff-1589-4360-a54b-47f9d8d3f6b7/20.jpg",
"https://example.com/ab48b7ff-1589-4360-a54b-47f9d8d3f6b7/40.jpg",
],
"logs": [
"Creating model and diffusion.",
"Done creating model and diffusion.",
"Iteration: 0, loss: -0.767578125",
"Iteration: 20, loss: -1.2333984375",
"Iteration: 40, loss: -1.380859375"
]
}
The response is written to a string key using SET. Because each message is a complete snapshot of the current state, the previous snapshots are not needed. You can read the values using the GET command:
redis:6379> GET my-response-queue
To get notified of updates to the value, you can SUBSCRIBE to keyspace notifications for the key:
redis:6379> SUBSCRIBE __keyspace@0__:my-response-queue
The response may also include experimental properties, prefixed with x-experimental-. Experimental properties may change or be removed in any version, not just major versions that would ordinarily be used to indicate a breaking change. Any such change will be documented in release notes.
Current experimental properties are:
x-experimental-timestamps: the time the prediction started and finished.
Cog's queue worker is instrumented using OpenTelemetry. For setup it sends:
- a span when the queue worker starts
- an event when it spawns the predictor subprocess
- a span when the predictor subprocess starts
- a span wrapping your
setup()method
For each prediction it sends:
- a span when the request is received
- a span wrapping your
predict()method - an event when the first output is received from your
predict()method - for progressive output, an event when the final output is received from your
predict()method
If the runner encounters an error during the prediction it will record it and set the span's status to error.
Telemetry is enabled when the OTEL_SERVICE_NAME environment variable is set. The OTLP exporter also needs to be configured via environment variables.
If a traceparent parameter is provided with the prediction request, Cog will use that value as the parent for the prediction spans. This allows spans from Cog to show up in distributed traces. The parameter should be in the W3C format, eg:
00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01