diff --git a/docs/user_guide/decoupled_models.md b/docs/user_guide/decoupled_models.md index d932b07d9c..4901ed0084 100644 --- a/docs/user_guide/decoupled_models.md +++ b/docs/user_guide/decoupled_models.md @@ -97,9 +97,9 @@ each time with a new response. You can take a look at [grpc_server.cc](https://g ### Using Decoupled Models in Ensembles -When using decoupled models within an [ensemble pipeline](ensemble_models.md), you may encounter unbounded memory growth if the decoupled model produces responses faster than downstream models can consume them. +When using decoupled models within an [ensemble pipeline](ensemble_models.md), you may experience unbounded memory growth if a decoupled model produces responses faster than downstream models can consume them. -To prevent unbounded memory growth in this scenario, consider using the `max_inflight_requests` configuration field. This field limits the maximum number of concurrent inflight requests permitted at each ensemble step for each inference request. +To prevent this, use the `max_inflight_requests` configuration field. This field sets a limit on the maximum number of concurrent requests allowed at each ensemble step. The limit is shared across all active requests for that ensemble model, which helps control memory usage and prevents it from growing without bound. For more details and examples, see [Managing Memory Usage in Ensemble Models](ensemble_models.md#managing-memory-usage-in-ensemble-models). diff --git a/docs/user_guide/ensemble_models.md b/docs/user_guide/ensemble_models.md index 3477e33d6c..a1f9286197 100644 --- a/docs/user_guide/ensemble_models.md +++ b/docs/user_guide/ensemble_models.md @@ -190,7 +190,7 @@ When crafting the ensemble steps, it is useful to note the distinction between connecting ensemble `input`/`output` to those on the composing model and between composing models. -## Managing Memory Usage in Ensemble Models +## Limit Memory Growth in Ensemble Models (BETA) An *inflight request* refers to an intermediate request generated by an upstream model that is queued and held in memory until it is processed by a downstream model within an ensemble pipeline. When upstream models process requests significantly faster than downstream models, these in-flight requests can accumulate and potentially lead to unbounded memory growth. This problem occurs when there is a speed mismatch between different steps in the pipeline and is particularly common in *decoupled models* that produce multiple responses per request more quickly than downstream models can consume. @@ -198,10 +198,9 @@ Consider an example ensemble model with two steps where the upstream model is 10 1. **Preprocessing model**: Produces 100 preprocessed requests/sec 2. **Inference model**: Consumes 10 requests/sec -Without backpressure, requests accumulate in the pipeline faster than they can be processed, eventually leading to out-of-memory errors. +Without backpressure, requests accumulate in the pipeline faster than they can be processed, which eventually leads to out-of-memory errors. -The `max_inflight_requests` field in the ensemble configuration sets a limit on the number of concurrent inflight requests permitted at each ensemble step for a single inference request. -When this limit is reached, faster upstream models are paused (blocked) until downstream models finish processing, effectively preventing unbounded memory growth. +The `max_inflight_requests` field in the ensemble configuration defines a limit on the number of concurrent in-flight requests allowed at each ensemble step. This limit is shared across all active requests for that ensemble model. When the limit is reached, new request scheduling for that step is paused until downstream models free up capacity. This prevents requests from accumulating indefinitely and keeps memory usage under control. ``` ensemble_scheduling { @@ -225,9 +224,8 @@ ensemble_scheduling { ``` **Configuration:** -* **`max_inflight_requests: 16`**: For each ensemble request (not globally), at most 16 requests from `dali_preprocess` - can wait for `onnx_inference` to process. Once this per-step limit is reached, `dali_preprocess` is blocked until the downstream step completes a response. -* **Default (`0`)**: No limit - allows unlimited inflight requests (original behavior). +* **`max_inflight_requests: 16`**: Limits the number of concurrent in-flight requests at a given ensemble step to 16 (for example, requests from `dali_preprocess` waiting for `onnx_inference` to complete). This limit is shared across all active requests for that ensemble model. Once the limit is reached, scheduling new work for that step is paused until downstream capacity becomes available. +* **Default (`0`)**: No limit — allows an unlimited number of in-flight requests (original behavior). ### When to Use This Feature diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 5521c4f00e..fdbe00d028 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -45,9 +45,9 @@ SERVER_URL = "localhost:8001" DEFAULT_RESPONSE_TIMEOUT = 60 EXPECTED_INFER_OUTPUT = 0.5 -MODEL_ENSEMBLE_DISABLED = "ensemble_disabled_max_inflight_requests" -MODEL_ENSEMBLE_LIMIT_4 = "ensemble_max_inflight_requests_limit_4" -MODEL_ENSEMBLE_LIMIT_1 = "ensemble_max_inflight_requests_limit_1" + +NUM_REQUESTS = 16 +NUM_RESPONSES_PER_REQUEST = 8 class UserData: @@ -62,11 +62,14 @@ def callback(user_data, result, error): user_data._response_queue.put(result) -def prepare_infer_args(input_value): +def prepare_infer_args(input_value, enable_batching=False): """ Create InferInput/InferRequestedOutput lists """ - input_data = np.array([input_value], dtype=np.int32) + if enable_batching: + input_data = np.array([[input_value]], dtype=np.int32) + else: + input_data = np.array([input_value], dtype=np.int32) infer_input = [grpcclient.InferInput("IN", input_data.shape, "INT32")] infer_input[0].set_data_from_numpy(input_data) outputs = [grpcclient.InferRequestedOutput("OUT")] @@ -87,7 +90,7 @@ def collect_responses(user_data): f"No response received within {DEFAULT_RESPONSE_TIMEOUT} seconds." ) - if type(result) == InferenceServerException: + if isinstance(result, InferenceServerException): errors.append(result) # error responses are final - stream terminates break @@ -110,85 +113,28 @@ class EnsembleBackpressureTest(tu.TestResultCollector): Tests for ensemble backpressure feature (max_inflight_requests). """ - def _run_inference(self, model_name, expected_responses_count=32): + def _run_inference( + self, model_name, expected_responses_per_request, num_concurrent_requests=1 + ): """ - Helper function to run inference and verify responses. + Send num_concurrent_requests streaming requests to model_name, each expecting + expected_responses_per_request responses. Verify all complete with correct data. """ - user_data = UserData() - with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: - try: - inputs, outputs = prepare_infer_args(expected_responses_count) - triton_client.start_stream(callback=partial(callback, user_data)) - triton_client.async_stream_infer( - model_name=model_name, inputs=inputs, outputs=outputs - ) - - # Collect and verify responses - errors, responses = collect_responses(user_data) - self.assertEqual( - len(responses), - expected_responses_count, - f"Expected {expected_responses_count} responses, got {len(responses)}", - ) - self.assertEqual( - len(errors), - 0, - f"Expected no errors during inference, got {len(errors)} errors", - ) - - # Verify correctness of responses - for idx, resp in enumerate(responses): - output = resp.as_numpy("OUT") - self.assertAlmostEqual( - output[0], - EXPECTED_INFER_OUTPUT, - places=5, - msg=f"Response {idx} has incorrect value - {output[0]}", - ) - finally: - triton_client.stop_stream() - - def test_max_inflight_requests_limit_4(self): - """ - Test that max_inflight_requests correctly limits concurrent - responses. - """ - self._run_inference(model_name=MODEL_ENSEMBLE_LIMIT_4) - - def test_max_inflight_requests_limit_1(self): - """ - Test edge case: max_inflight_requests=1. - """ - self._run_inference(model_name=MODEL_ENSEMBLE_LIMIT_1) - - def test_max_inflight_requests_limit_disabled(self): - """ - Test that an ensemble model without max_inflight_requests parameter works correctly. - """ - self._run_inference(model_name=MODEL_ENSEMBLE_DISABLED) - - def test_max_inflight_requests_limit_concurrent_requests(self): - """ - Test that backpressure works correctly with multiple concurrent requests. - Each request should have independent backpressure state. - """ - num_concurrent = 8 - expected_per_request = 8 - user_datas = [UserData() for _ in range(num_concurrent)] + user_datas = [UserData() for _ in range(num_concurrent_requests)] with ExitStack() as stack: clients = [ stack.enter_context(grpcclient.InferenceServerClient(SERVER_URL)) - for _ in range(num_concurrent) + for _ in range(num_concurrent_requests) ] - inputs, outputs = prepare_infer_args(expected_per_request) + inputs, outputs = prepare_infer_args(expected_responses_per_request, True) # Start all concurrent requests - for i in range(num_concurrent): + for i in range(num_concurrent_requests): clients[i].start_stream(callback=partial(callback, user_datas[i])) clients[i].async_stream_infer( - model_name=MODEL_ENSEMBLE_LIMIT_4, inputs=inputs, outputs=outputs + model_name=model_name, inputs=inputs, outputs=outputs ) # Collect and verify responses for all requests @@ -196,45 +142,93 @@ def test_max_inflight_requests_limit_concurrent_requests(self): errors, responses = collect_responses(ud) self.assertEqual( len(responses), - expected_per_request, - f"Request {i}: expected {expected_per_request} responses, got {len(responses)}", + expected_responses_per_request, + f"Request {i}: expected {expected_responses_per_request} responses, got {len(responses)}", ) self.assertEqual( - len(errors), - 0, - f"Request {i}: Expected no errors during inference, got {len(errors)} errors", + len(errors), 0, f"Request {i}: unexpected errors: {errors}" ) # Verify correctness of responses for idx, resp in enumerate(responses): output = resp.as_numpy("OUT") + # output shape is [batch_size, 1]; extract scalar for comparison. + value = float(output[0][0]) self.assertAlmostEqual( - output[0], + value, EXPECTED_INFER_OUTPUT, places=5, - msg=f"Response {idx} for request {i} has incorrect value - {output[0]}", + msg=f"Request {i} response {idx}: expected " + f"{EXPECTED_INFER_OUTPUT}, got {value}", ) # Stop all streams for client in clients: client.stop_stream() - def test_max_inflight_requests_limit_request_cancellation(self): + def test_single_request_with_different_limits(self): + """ + Single streaming request that produces 16 responses via a three-step ensemble pipeline + (decoupled_producer → consumer_high_delay → consumer_low_delay) under various + max_inflight_requests configurations. + """ + cases = [ + ("ensemble_limit_4", "max_inflight_requests=4"), + ("ensemble_limit_1", "max_inflight_requests=1"), + ("ensemble_disabled", "max_inflight_requests is disabled"), + ] + for model_name, desc in cases: + with self.subTest(limit=desc): + self._run_inference( + model_name=model_name, expected_responses_per_request=16 + ) + + def test_concurrent_requests_with_different_limits(self): + """ + NUM_REQUESTS concurrent streaming requests (NUM_RESPONSES_PER_REQUEST + responses each) exercise the max_inflight_requests limit. + Subtests cover: limit=4, limit=1, and the limit disabled. + """ + cases = [ + ("ensemble_limit_4", "max_inflight_requests=4"), + ("ensemble_limit_1", "max_inflight_requests=1"), + ("ensemble_disabled", "max_inflight_requests is disabled"), + ] + for model_name, desc in cases: + with self.subTest(limit=desc): + self._run_inference( + model_name=model_name, + expected_responses_per_request=NUM_RESPONSES_PER_REQUEST, + num_concurrent_requests=NUM_REQUESTS, + ) + + def test_sequential_requests_limiter_resets_cleanly(self): + """ + Send NUM_REQUESTS requests one after another. If the limiter + leaks a slot on any request, subsequent requests will be stuck or time out. + """ + for seq_idx in range(NUM_REQUESTS): + with self.subTest(request=seq_idx): + self._run_inference( + model_name="ensemble_limit_4", + expected_responses_per_request=NUM_RESPONSES_PER_REQUEST, + ) + + def test_request_cancellation_under_backpressure(self): """ - Test that cancellation unblocks producers waiting on backpressure and that - the client receives a cancellation error. + Start a long-running request (32 responses), cancel mid-stream, + and verify the server sends a CANCELLED status and only a partial set of + responses is received. """ - # Use a large count to ensure the producer gets blocked by backpressure. - # The model is configured with max_inflight_requests = 4. input_value = 32 user_data = UserData() with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: - inputs, outputs = prepare_infer_args(input_value) + inputs, outputs = prepare_infer_args(input_value, True) triton_client.start_stream(callback=partial(callback, user_data)) # Start the request triton_client.async_stream_infer( - model_name=MODEL_ENSEMBLE_LIMIT_4, inputs=inputs, outputs=outputs + model_name="ensemble_limit_4", inputs=inputs, outputs=outputs ) responses = [] @@ -248,7 +242,7 @@ def test_max_inflight_requests_limit_request_cancellation(self): except queue.Empty: self.fail("Stream did not produce any response before cancellation.") - # Cancel the stream. This should unblock any waiting producers and result in a CANCELLED error. + # Cancel the stream - this unblocks any waiting producers and triggers a CANCELLED error. triton_client.stop_stream(cancel_requests=True) # Allow some time for cancellation @@ -331,7 +325,7 @@ def _run_inference(self, model_name, expected_responses_count): self.assertEqual( len(errors), 1, - "Expected exactly one error when queue full terminates stream", + "Expected exactly one error when the queue is full and the stream terminates", ) # Verify correctness of successful responses diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index be11d0db76..95a0efa4a2 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -32,6 +32,7 @@ SIMPLE_TEST_PY=./ensemble_test.py CLIENT_LOG="./client.log" TEST_MODEL_DIR="`pwd`/models" +BACKPRESSURE_TEST_MODEL_DIR="`pwd`/backpressure_test_models" TEST_RESULT_FILE='test_results.txt' SERVER=/opt/tritonserver/bin/tritonserver SERVER_ARGS="--model-repository=${TEST_MODEL_DIR}" @@ -149,24 +150,28 @@ wait $SERVER_PID ######## Test max_queue_size dynamic batching parameter in ensemble steps ######## ## Ensemble model: step1-decoupled_producer -> step2-slow_consumer -MODEL_DIR="`pwd`/max_queue_size_test_models" -rm -rf ${MODEL_DIR} +MAX_QUEUE_SIZE_TEST_MODEL_DIR="`pwd`/max_queue_size_test_models" +rm -rf ${MAX_QUEUE_SIZE_TEST_MODEL_DIR} # Enable max_queue_size in the first step (decoupled_producer) -mkdir -p ${MODEL_DIR}/ensemble_step1_enabled_max_queue_size/1 ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/1 ${MODEL_DIR}/slow_consumer/1 -cp ./backpressure_test_models/ensemble_disabled_max_inflight_requests/config.pbtxt ${MODEL_DIR}/ensemble_step1_enabled_max_queue_size/ -sed -i 's/"decoupled_producer"/"decoupled_producer_enabled_max_queue_size"/g' ${MODEL_DIR}/ensemble_step1_enabled_max_queue_size/config.pbtxt - -cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer/1 -cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer/ -sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MODEL_DIR}/slow_consumer/config.pbtxt -sed -i 's/max_batch_size: 64/max_batch_size: 1/g' ${MODEL_DIR}/slow_consumer/config.pbtxt - -cp ./backpressure_test_models/decoupled_producer/1/model.py ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/1 -cp ./backpressure_test_models/decoupled_producer/config.pbtxt ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/ -sed -i 's/name: "decoupled_producer"/name: "decoupled_producer_enabled_max_queue_size"/g' ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/config.pbtxt +mkdir -p ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step1_enabled_max_queue_size/1 \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/1 \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step1_enabled_max_queue_size/ +sed -i 's/"decoupled_producer"/"decoupled_producer_enabled_max_queue_size"/g' \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step1_enabled_max_queue_size/config.pbtxt + +cp ../python_models/ground_truth/model.py ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/1 +cp ../python_models/ground_truth/config.pbtxt ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/ +sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/config.pbtxt +sed -i 's/max_batch_size: 64/max_batch_size: 1/g' ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/config.pbtxt + +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1/model.py ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/config.pbtxt ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/ +sed -i 's/name: "decoupled_producer"/name: "decoupled_producer_enabled_max_queue_size"/g' ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/config.pbtxt # Add dynamic_batching with max_queue_size to decoupled_producer -cat >> ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/config.pbtxt << 'EOF' +cat >> ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/config.pbtxt << 'EOF' dynamic_batching { preferred_batch_size: [ 1 ] @@ -177,19 +182,23 @@ dynamic_batching { EOF # Enable max_queue_size in the second step (slow_consumer) -mkdir -p ${MODEL_DIR}/ensemble_step2_enabled_max_queue_size/1 ${MODEL_DIR}/decoupled_producer/1 ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/1 -cp ./backpressure_test_models/ensemble_disabled_max_inflight_requests/config.pbtxt ${MODEL_DIR}/ensemble_step2_enabled_max_queue_size/ -sed -i 's/"slow_consumer"/"slow_consumer_enabled_max_queue_size"/g' ${MODEL_DIR}/ensemble_step2_enabled_max_queue_size/config.pbtxt - -cp ./backpressure_test_models/decoupled_producer/1/model.py ${MODEL_DIR}/decoupled_producer/1 -cp ./backpressure_test_models/decoupled_producer/config.pbtxt ${MODEL_DIR}/decoupled_producer/ - -cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/1 -cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/ -sed -i 's/name: "ground_truth"/name: "slow_consumer_enabled_max_queue_size"/g' ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt -sed -i 's/max_batch_size: 64/max_batch_size: 1/g' ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt +mkdir -p ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step2_enabled_max_queue_size/1 \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer/1 ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step2_enabled_max_queue_size/ +sed -i 's/"slow_consumer"/"slow_consumer_enabled_max_queue_size"/g' \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step2_enabled_max_queue_size/config.pbtxt + +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1/model.py ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/config.pbtxt ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer/ + +cp ../python_models/ground_truth/model.py ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/1 +cp ../python_models/ground_truth/config.pbtxt ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/ +sed -i 's/name: "ground_truth"/name: "slow_consumer_enabled_max_queue_size"/g' \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt +sed -i 's/max_batch_size: 64/max_batch_size: 1/g' ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt # Add dynamic_batching with max_queue_size to slow_consumer -cat >> ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt << 'EOF' +cat >> ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt << 'EOF' dynamic_batching { preferred_batch_size: [ 1 ] @@ -205,7 +214,7 @@ SERVER_LOG="./ensemble_step_max_queue_size_test_server.log" CLIENT_LOG="./ensemble_step_max_queue_size_test_client.log" rm -f $SERVER_LOG $CLIENT_LOG -SERVER_ARGS="--model-repository=${MODEL_DIR}" +SERVER_ARGS="--model-repository=${MAX_QUEUE_SIZE_TEST_MODEL_DIR}" run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -217,6 +226,7 @@ set +e python $BACKPRESSURE_TEST_PY $TEST_NAME -v >> $CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then RET=1 + cat $CLIENT_LOG else check_test_results $TEST_RESULT_FILE 2 if [ $? -ne 0 ]; then @@ -231,38 +241,104 @@ kill $SERVER_PID wait $SERVER_PID -######## Test ensemble backpressure feature (max_inflight_requests parameter) ######## -MODEL_DIR="`pwd`/backpressure_test_models" -mkdir -p ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/1 - -rm -rf ${MODEL_DIR}/slow_consumer -mkdir -p ${MODEL_DIR}/slow_consumer/1 -cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer/1 -cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer/ -sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MODEL_DIR}/slow_consumer/config.pbtxt - -# Create ensemble with "max_inflight_requests = 4" -rm -rf ${MODEL_DIR}/ensemble_max_inflight_requests_limit_4 -mkdir -p ${MODEL_DIR}/ensemble_max_inflight_requests_limit_4/1 -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${MODEL_DIR}/ensemble_max_inflight_requests_limit_4/ -sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: 4/g' \ - ${MODEL_DIR}/ensemble_max_inflight_requests_limit_4/config.pbtxt - -# Create ensemble with "max_inflight_requests = 1" -rm -rf ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1 -mkdir -p ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1/1 -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1/ -sed -i 's/platform: "ensemble"/name: "ensemble_max_inflight_requests_limit_1"\nplatform: "ensemble"/g' \ - ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1/config.pbtxt -sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: 1/g' \ - ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1/config.pbtxt +######## Test backpressure feature - 'max_inflight_requests' config option ######## +ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR="`pwd`/ensemble_backpressure_test_models" +rm -rf ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR} TEST_NAME="EnsembleBackpressureTest" SERVER_LOG="./ensemble_backpressure_test_server.log" CLIENT_LOG="./ensemble_backpressure_test_client.log" +SERVER_ARGS="--model-repository=${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}" rm -f $SERVER_LOG $CLIENT_LOG -SERVER_ARGS="--model-repository=${MODEL_DIR}" +# Step 1 - decoupled_producer (batch size 2) +mkdir -p ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1/model.py ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1/ +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/config.pbtxt ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/ +sed -i 's/max_batch_size: 1/max_batch_size: 2/g' ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/config.pbtxt + +generate_consumer_model() { + local name=$1 + local delay=$2 + + mkdir -p ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/1 + cat > ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/1/model.py << EOF +import time +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + def execute(self, requests): + responses = [] + for request in requests: + in_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT0") + out_tensor = pb_utils.Tensor("OUTPUT0", in_tensor.as_numpy()) + responses.append(pb_utils.InferenceResponse([out_tensor])) + time.sleep(${delay}) + return responses +EOF + cat > ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/config.pbtxt << EOF +name: "${name}" +backend: "python" +max_batch_size: 2 +input [ { name: "INPUT0", data_type: TYPE_FP32, dims: [ 1 ] } ] +output [ { name: "OUTPUT0", data_type: TYPE_FP32, dims: [ 1 ] } ] +instance_group [ { count: 1, kind: KIND_CPU } ] +dynamic_batching { preferred_batch_size: [ 2 ] } +EOF +} + +generate_ensemble_model() { + local name=$1 + local limit=$2 + local batch_size=2 + + local limit_str="" + if [ "$limit" != "disabled" ]; then + limit_str="max_inflight_requests: $limit" + fi + + mkdir -p ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/1 + cat > ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/config.pbtxt << EOF +name: "${name}" +platform: "ensemble" +max_batch_size: ${batch_size} +input [ { name: "IN", data_type: TYPE_INT32, dims: [ 1 ] } ] +output [ { name: "OUT", data_type: TYPE_FP32, dims: [ 1 ] } ] +ensemble_scheduling { + ${limit_str} + step [ + { + model_name: "decoupled_producer" + model_version: -1 + input_map { key: "IN", value: "IN" } + output_map { key: "OUT", value: "intermediate_1" } + }, + { + model_name: "consumer_high_delay" + model_version: -1 + input_map { key: "INPUT0", value: "intermediate_1" } + output_map { key: "OUTPUT0", value: "intermediate_2" } + }, + { + model_name: "consumer_low_delay" + model_version: -1 + input_map { key: "INPUT0", value: "intermediate_2" } + output_map { key: "OUTPUT0", value: "OUT" } + } + ] +} +EOF +} + +# Steps 2 and 3 - consumer_high_delay and consumer_low_delay (batch size 2) +generate_consumer_model "consumer_high_delay" "0.5" +generate_consumer_model "consumer_low_delay" "0.1" + +# Ensemble models with different max_inflight_requests limits (including disabled) +generate_ensemble_model "ensemble_disabled" "disabled" +generate_ensemble_model "ensemble_limit_1" 1 +generate_ensemble_model "ensemble_limit_4" 4 + run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -271,11 +347,23 @@ if [ "$SERVER_PID" == "0" ]; then fi set +e + +# Verify valid config was loaded successfully +if ! grep -q "Ensemble model 'ensemble_limit_1' configured with max_inflight_requests: 1" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: ensemble_limit_1 did not load\n***" + RET=1 +fi +if ! grep -q "Ensemble model 'ensemble_limit_4' configured with max_inflight_requests: 4" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: ensemble_limit_4 did not load\n***" + RET=1 +fi + python $BACKPRESSURE_TEST_PY $TEST_NAME -v >> $CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then RET=1 + cat $CLIENT_LOG else - check_test_results $TEST_RESULT_FILE 5 + check_test_results $TEST_RESULT_FILE 4 if [ $? -ne 0 ]; then cat $CLIENT_LOG echo -e "\n***\n*** Test Result Verification Failed\n***" @@ -287,16 +375,8 @@ set -e kill $SERVER_PID wait $SERVER_PID -set +e -# Verify valid config was loaded successfully -if ! grep -q "Ensemble model 'ensemble_max_inflight_requests_limit_4' configured with max_inflight_requests: 4" $SERVER_LOG; then - echo -e "\n***\n*** FAILED: Valid model did not load successfully\n***" - RET=1 -fi -set -e - -######## Test invalid value for "max_inflight_requests" +######## Test invalid values for 'max_inflight_requests' config option ######## INVALID_PARAM_MODEL_DIR="`pwd`/invalid_param_test_models" SERVER_ARGS="--model-repository=${INVALID_PARAM_MODEL_DIR}" SERVER_LOG="./invalid_max_inflight_requests_server.log" @@ -305,20 +385,21 @@ rm -rf $SERVER_LOG ${INVALID_PARAM_MODEL_DIR} mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/1 mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/1 mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/1 -cp -r ${MODEL_DIR}/decoupled_producer ${MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ +# Reuse the decoupled_producer and slow_consumer models built in the previous test section. +cp -r ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ # max_inflight_requests = -5 -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/ +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/ sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: -5/g' \ ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/config.pbtxt # max_inflight_requests = "invalid_value" -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/ +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/ sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: "invalid_value"/g' \ ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/config.pbtxt # max_inflight_requests = 12345678901 -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/ +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/ sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: 12345678901/g' \ ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/config.pbtxt @@ -356,7 +437,6 @@ set -e if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" else - cat $CLIENT_LOG echo -e "\n***\n*** Test FAILED\n***" fi