From dc1c9e65983f8397a8d923a5d3ba2492a8d4e0aa Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 18 Mar 2026 12:10:50 +0000 Subject: [PATCH 1/7] Update --- .../ensemble_backpressure_test.py | 164 +++++++------ qa/L0_simple_ensemble/test.sh | 216 ++++++++++++------ 2 files changed, 226 insertions(+), 154 deletions(-) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 5521c4f00e..1648691df5 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -45,9 +45,9 @@ SERVER_URL = "localhost:8001" DEFAULT_RESPONSE_TIMEOUT = 60 EXPECTED_INFER_OUTPUT = 0.5 -MODEL_ENSEMBLE_DISABLED = "ensemble_disabled_max_inflight_requests" -MODEL_ENSEMBLE_LIMIT_4 = "ensemble_max_inflight_requests_limit_4" -MODEL_ENSEMBLE_LIMIT_1 = "ensemble_max_inflight_requests_limit_1" + +NUM_REQUESTS = 16 +NUM_RESPONSES_PER_REQUEST = 8 class UserData: @@ -62,11 +62,14 @@ def callback(user_data, result, error): user_data._response_queue.put(result) -def prepare_infer_args(input_value): +def prepare_infer_args(input_value, enable_batching=False): """ Create InferInput/InferRequestedOutput lists """ - input_data = np.array([input_value], dtype=np.int32) + if enable_batching: + input_data = np.array([[input_value]], dtype=np.int32) + else: + input_data = np.array([input_value], dtype=np.int32) infer_input = [grpcclient.InferInput("IN", input_data.shape, "INT32")] infer_input[0].set_data_from_numpy(input_data) outputs = [grpcclient.InferRequestedOutput("OUT")] @@ -87,7 +90,7 @@ def collect_responses(user_data): f"No response received within {DEFAULT_RESPONSE_TIMEOUT} seconds." ) - if type(result) == InferenceServerException: + if isinstance(result, InferenceServerException): errors.append(result) # error responses are final - stream terminates break @@ -110,85 +113,28 @@ class EnsembleBackpressureTest(tu.TestResultCollector): Tests for ensemble backpressure feature (max_inflight_requests). """ - def _run_inference(self, model_name, expected_responses_count=32): - """ - Helper function to run inference and verify responses. - """ - user_data = UserData() - with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: - try: - inputs, outputs = prepare_infer_args(expected_responses_count) - triton_client.start_stream(callback=partial(callback, user_data)) - triton_client.async_stream_infer( - model_name=model_name, inputs=inputs, outputs=outputs - ) - - # Collect and verify responses - errors, responses = collect_responses(user_data) - self.assertEqual( - len(responses), - expected_responses_count, - f"Expected {expected_responses_count} responses, got {len(responses)}", - ) - self.assertEqual( - len(errors), - 0, - f"Expected no errors during inference, got {len(errors)} errors", - ) - - # Verify correctness of responses - for idx, resp in enumerate(responses): - output = resp.as_numpy("OUT") - self.assertAlmostEqual( - output[0], - EXPECTED_INFER_OUTPUT, - places=5, - msg=f"Response {idx} has incorrect value - {output[0]}", - ) - finally: - triton_client.stop_stream() - - def test_max_inflight_requests_limit_4(self): - """ - Test that max_inflight_requests correctly limits concurrent - responses. - """ - self._run_inference(model_name=MODEL_ENSEMBLE_LIMIT_4) - - def test_max_inflight_requests_limit_1(self): - """ - Test edge case: max_inflight_requests=1. - """ - self._run_inference(model_name=MODEL_ENSEMBLE_LIMIT_1) - - def test_max_inflight_requests_limit_disabled(self): - """ - Test that an ensemble model without max_inflight_requests parameter works correctly. - """ - self._run_inference(model_name=MODEL_ENSEMBLE_DISABLED) - - def test_max_inflight_requests_limit_concurrent_requests(self): + def _run_inference( + self, model_name, expected_responses_per_request, num_concurrent_requests=1 + ): """ - Test that backpressure works correctly with multiple concurrent requests. - Each request should have independent backpressure state. + Send num_concurrent_requests streaming requests to model_name, each expecting + expected_responses_per_request responses. Verify all complete with correct data. """ - num_concurrent = 8 - expected_per_request = 8 - user_datas = [UserData() for _ in range(num_concurrent)] + user_datas = [UserData() for _ in range(num_concurrent_requests)] with ExitStack() as stack: clients = [ stack.enter_context(grpcclient.InferenceServerClient(SERVER_URL)) - for _ in range(num_concurrent) + for _ in range(num_concurrent_requests) ] - inputs, outputs = prepare_infer_args(expected_per_request) + inputs, outputs = prepare_infer_args(expected_responses_per_request, True) # Start all concurrent requests - for i in range(num_concurrent): + for i in range(num_concurrent_requests): clients[i].start_stream(callback=partial(callback, user_datas[i])) clients[i].async_stream_infer( - model_name=MODEL_ENSEMBLE_LIMIT_4, inputs=inputs, outputs=outputs + model_name=model_name, inputs=inputs, outputs=outputs ) # Collect and verify responses for all requests @@ -196,13 +142,11 @@ def test_max_inflight_requests_limit_concurrent_requests(self): errors, responses = collect_responses(ud) self.assertEqual( len(responses), - expected_per_request, - f"Request {i}: expected {expected_per_request} responses, got {len(responses)}", + expected_responses_per_request, + f"Request {i}: expected {expected_responses_per_request} responses, got {len(responses)}", ) self.assertEqual( - len(errors), - 0, - f"Request {i}: Expected no errors during inference, got {len(errors)} errors", + len(errors), 0, f"Request {i}: unexpected errors: {errors}" ) # Verify correctness of responses for idx, resp in enumerate(responses): @@ -211,30 +155,78 @@ def test_max_inflight_requests_limit_concurrent_requests(self): output[0], EXPECTED_INFER_OUTPUT, places=5, - msg=f"Response {idx} for request {i} has incorrect value - {output[0]}", + msg=f"Request {i} response {idx}: expected " + f"{EXPECTED_INFER_OUTPUT}, got {output[0]}", ) # Stop all streams for client in clients: client.stop_stream() - def test_max_inflight_requests_limit_request_cancellation(self): + def test_single_request_with_different_limits(self): + """ + Single streaming request producing 16 responses through a 3-step + ensemble (decoupled_producer -> consumer_high_delay -> consumer_low_delay) + under various max_inflight_requests settings. + """ + cases = [ + ("ensemble_limit_4", "max_inflight_requests=4"), + ("ensemble_limit_1", "max_inflight_requests=1"), + ("ensemble_disabled", "max_inflight_requests is disabled"), + ] + for model_name, desc in cases: + with self.subTest(limit=desc): + self._run_inference( + model_name=model_name, expected_responses_per_request=16 + ) + + def test_concurrent_requests_across_topologies(self): + """ + NUM_REQUESTS concurrent streaming requests (NUM_RESPONSES_PER_REQUEST + responses each) exercise the global max_inflight_requests limit. + Subtests cover: limit=4, limit=1, and the limit disabled. + """ + cases = [ + ("ensemble_limit_4", "max_inflight_requests=4"), + ("ensemble_limit_1", "max_inflight_requests=1"), + ("ensemble_disabled", "max_inflight_requests is disabled"), + ] + for model_name, desc in cases: + with self.subTest(topology=desc): + self._run_inference( + model_name=model_name, + expected_responses_per_request=NUM_RESPONSES_PER_REQUEST, + num_concurrent_requests=NUM_REQUESTS, + ) + + def test_sequential_requests_limiter_resets_cleanly(self): + """ + Send NUM_REQUESTS sequential requests one after another. If the limiter + leaks a slot on any request, subsequent requests will deadlock or time out. + """ + for seq_idx in range(NUM_REQUESTS): + with self.subTest(request=seq_idx): + self._run_inference( + model_name="ensemble_limit_4", + expected_responses_per_request=NUM_RESPONSES_PER_REQUEST, + ) + + def test_request_cancellation_under_backpressure(self): """ - Test that cancellation unblocks producers waiting on backpressure and that - the client receives a cancellation error. + Start a long-running request (32 responses), cancel mid-stream, + and verify the server sends a CANCELLED status and only a partial set of + responses is received. """ - # Use a large count to ensure the producer gets blocked by backpressure. - # The model is configured with max_inflight_requests = 4. input_value = 32 user_data = UserData() with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: - inputs, outputs = prepare_infer_args(input_value) + inputs, outputs = prepare_infer_args(input_value, True) triton_client.start_stream(callback=partial(callback, user_data)) # Start the request triton_client.async_stream_infer( - model_name=MODEL_ENSEMBLE_LIMIT_4, inputs=inputs, outputs=outputs + model_name="ensemble_limit_4", inputs=inputs, outputs=outputs ) responses = [] diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index be11d0db76..0623634a9c 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -32,6 +32,7 @@ SIMPLE_TEST_PY=./ensemble_test.py CLIENT_LOG="./client.log" TEST_MODEL_DIR="`pwd`/models" +BACKPRESSURE_TEST_MODEL_DIR="`pwd`/backpressure_test_models" TEST_RESULT_FILE='test_results.txt' SERVER=/opt/tritonserver/bin/tritonserver SERVER_ARGS="--model-repository=${TEST_MODEL_DIR}" @@ -149,24 +150,28 @@ wait $SERVER_PID ######## Test max_queue_size dynamic batching parameter in ensemble steps ######## ## Ensemble model: step1-decoupled_producer -> step2-slow_consumer -MODEL_DIR="`pwd`/max_queue_size_test_models" -rm -rf ${MODEL_DIR} +MAX_QUEUE_SIZE_TEST_MODEL_DIR="`pwd`/max_queue_size_test_models" +rm -rf ${MAX_QUEUE_SIZE_TEST_MODEL_DIR} # Enable max_queue_size in the first step (decoupled_producer) -mkdir -p ${MODEL_DIR}/ensemble_step1_enabled_max_queue_size/1 ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/1 ${MODEL_DIR}/slow_consumer/1 -cp ./backpressure_test_models/ensemble_disabled_max_inflight_requests/config.pbtxt ${MODEL_DIR}/ensemble_step1_enabled_max_queue_size/ -sed -i 's/"decoupled_producer"/"decoupled_producer_enabled_max_queue_size"/g' ${MODEL_DIR}/ensemble_step1_enabled_max_queue_size/config.pbtxt - -cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer/1 -cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer/ -sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MODEL_DIR}/slow_consumer/config.pbtxt -sed -i 's/max_batch_size: 64/max_batch_size: 1/g' ${MODEL_DIR}/slow_consumer/config.pbtxt - -cp ./backpressure_test_models/decoupled_producer/1/model.py ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/1 -cp ./backpressure_test_models/decoupled_producer/config.pbtxt ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/ -sed -i 's/name: "decoupled_producer"/name: "decoupled_producer_enabled_max_queue_size"/g' ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/config.pbtxt +mkdir -p ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step1_enabled_max_queue_size/1 \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/1 \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step1_enabled_max_queue_size/ +sed -i 's/"decoupled_producer"/"decoupled_producer_enabled_max_queue_size"/g' \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step1_enabled_max_queue_size/config.pbtxt + +cp ../python_models/ground_truth/model.py ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/1 +cp ../python_models/ground_truth/config.pbtxt ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/ +sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/config.pbtxt +sed -i 's/max_batch_size: 64/max_batch_size: 1/g' ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer/config.pbtxt + +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1/model.py ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/config.pbtxt ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/ +sed -i 's/name: "decoupled_producer"/name: "decoupled_producer_enabled_max_queue_size"/g' ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/config.pbtxt # Add dynamic_batching with max_queue_size to decoupled_producer -cat >> ${MODEL_DIR}/decoupled_producer_enabled_max_queue_size/config.pbtxt << 'EOF' +cat >> ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer_enabled_max_queue_size/config.pbtxt << 'EOF' dynamic_batching { preferred_batch_size: [ 1 ] @@ -177,19 +182,23 @@ dynamic_batching { EOF # Enable max_queue_size in the second step (slow_consumer) -mkdir -p ${MODEL_DIR}/ensemble_step2_enabled_max_queue_size/1 ${MODEL_DIR}/decoupled_producer/1 ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/1 -cp ./backpressure_test_models/ensemble_disabled_max_inflight_requests/config.pbtxt ${MODEL_DIR}/ensemble_step2_enabled_max_queue_size/ -sed -i 's/"slow_consumer"/"slow_consumer_enabled_max_queue_size"/g' ${MODEL_DIR}/ensemble_step2_enabled_max_queue_size/config.pbtxt - -cp ./backpressure_test_models/decoupled_producer/1/model.py ${MODEL_DIR}/decoupled_producer/1 -cp ./backpressure_test_models/decoupled_producer/config.pbtxt ${MODEL_DIR}/decoupled_producer/ - -cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/1 -cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/ -sed -i 's/name: "ground_truth"/name: "slow_consumer_enabled_max_queue_size"/g' ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt -sed -i 's/max_batch_size: 64/max_batch_size: 1/g' ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt +mkdir -p ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step2_enabled_max_queue_size/1 \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer/1 ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step2_enabled_max_queue_size/ +sed -i 's/"slow_consumer"/"slow_consumer_enabled_max_queue_size"/g' \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/ensemble_step2_enabled_max_queue_size/config.pbtxt + +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1/model.py ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/config.pbtxt ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer/ + +cp ../python_models/ground_truth/model.py ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/1 +cp ../python_models/ground_truth/config.pbtxt ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/ +sed -i 's/name: "ground_truth"/name: "slow_consumer_enabled_max_queue_size"/g' \ + ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt +sed -i 's/max_batch_size: 64/max_batch_size: 1/g' ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt # Add dynamic_batching with max_queue_size to slow_consumer -cat >> ${MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt << 'EOF' +cat >> ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer_enabled_max_queue_size/config.pbtxt << 'EOF' dynamic_batching { preferred_batch_size: [ 1 ] @@ -205,7 +214,7 @@ SERVER_LOG="./ensemble_step_max_queue_size_test_server.log" CLIENT_LOG="./ensemble_step_max_queue_size_test_client.log" rm -f $SERVER_LOG $CLIENT_LOG -SERVER_ARGS="--model-repository=${MODEL_DIR}" +SERVER_ARGS="--model-repository=${MAX_QUEUE_SIZE_TEST_MODEL_DIR}" run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -217,6 +226,7 @@ set +e python $BACKPRESSURE_TEST_PY $TEST_NAME -v >> $CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then RET=1 + cat $CLIENT_LOG else check_test_results $TEST_RESULT_FILE 2 if [ $? -ne 0 ]; then @@ -231,38 +241,104 @@ kill $SERVER_PID wait $SERVER_PID -######## Test ensemble backpressure feature (max_inflight_requests parameter) ######## -MODEL_DIR="`pwd`/backpressure_test_models" -mkdir -p ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/1 - -rm -rf ${MODEL_DIR}/slow_consumer -mkdir -p ${MODEL_DIR}/slow_consumer/1 -cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer/1 -cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer/ -sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MODEL_DIR}/slow_consumer/config.pbtxt - -# Create ensemble with "max_inflight_requests = 4" -rm -rf ${MODEL_DIR}/ensemble_max_inflight_requests_limit_4 -mkdir -p ${MODEL_DIR}/ensemble_max_inflight_requests_limit_4/1 -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${MODEL_DIR}/ensemble_max_inflight_requests_limit_4/ -sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: 4/g' \ - ${MODEL_DIR}/ensemble_max_inflight_requests_limit_4/config.pbtxt - -# Create ensemble with "max_inflight_requests = 1" -rm -rf ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1 -mkdir -p ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1/1 -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1/ -sed -i 's/platform: "ensemble"/name: "ensemble_max_inflight_requests_limit_1"\nplatform: "ensemble"/g' \ - ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1/config.pbtxt -sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: 1/g' \ - ${MODEL_DIR}/ensemble_max_inflight_requests_limit_1/config.pbtxt +######## Test backpressure feature - 'max_inflight_requests' config option ######## +ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR="`pwd`/ensemble_backpressure_test_models" +rm -rf ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR} TEST_NAME="EnsembleBackpressureTest" SERVER_LOG="./ensemble_backpressure_test_server.log" CLIENT_LOG="./ensemble_backpressure_test_client.log" +SERVER_ARGS="--model-repository=${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}" rm -f $SERVER_LOG $CLIENT_LOG -SERVER_ARGS="--model-repository=${MODEL_DIR}" +# Step 1 - decoupled_producer (batch size 2) +mkdir -p ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1 +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1/model.py ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/1/ +cp ${BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/config.pbtxt ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/ +sed -i 's/max_batch_size: 1/max_batch_size: 2/g' ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/decoupled_producer/config.pbtxt + +generate_consumer_model() { + local name=$1 + local delay=$2 + + mkdir -p ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/1 + cat > ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/1/model.py << EOF +import time +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + def execute(self, requests): + responses = [] + for request in requests: + in_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT0") + out_tensor = pb_utils.Tensor("OUTPUT0", in_tensor.as_numpy()) + responses.append(pb_utils.InferenceResponse([out_tensor])) + time.sleep(${delay}) + return responses +EOF + cat > ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/config.pbtxt << EOF +name: "${name}" +backend: "python" +max_batch_size: 2 +input [ { name: "INPUT0", data_type: TYPE_FP32, dims: [ 1 ] } ] +output [ { name: "OUTPUT0", data_type: TYPE_FP32, dims: [ 1 ] } ] +instance_group [ { count: 1, kind: KIND_CPU } ] +dynamic_batching { preferred_batch_size: [ 2 ] } +EOF +} + +generate_ensemble_model() { + local name=$1 + local limit=$2 + local batch_size=2 + + local limit_str="" + if [ "$limit" != "disabled" ]; then + limit_str="max_inflight_requests: $limit" + fi + + mkdir -p ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/1 + cat > ${ENSEMBLE_BACKPRESSURE_TEST_MODEL_DIR}/${name}/config.pbtxt << EOF +name: "${name}" +platform: "ensemble" +max_batch_size: ${batch_size} +input [ { name: "IN", data_type: TYPE_INT32, dims: [ 1 ] } ] +output [ { name: "OUT", data_type: TYPE_FP32, dims: [ 1 ] } ] +ensemble_scheduling { + ${limit_str} + step [ + { + model_name: "decoupled_producer" + model_version: -1 + input_map { key: "IN", value: "IN" } + output_map { key: "OUT", value: "intermediate_1" } + }, + { + model_name: "consumer_high_delay" + model_version: -1 + input_map { key: "INPUT0", value: "intermediate_1" } + output_map { key: "OUTPUT0", value: "intermediate_2" } + }, + { + model_name: "consumer_low_delay" + model_version: -1 + input_map { key: "INPUT0", value: "intermediate_2" } + output_map { key: "OUTPUT0", value: "OUT" } + } + ] +} +EOF +} + +# Step 2, 3 - consumer_high_delay and consumer_low_delay (batch size 2) +generate_consumer_model "consumer_high_delay" "0.5" +generate_consumer_model "consumer_low_delay" "0.1" + +# Ensemble models with different max_inflight_requests limits (including disabled) +generate_ensemble_model "ensemble_disabled" "disabled" +generate_ensemble_model "ensemble_limit_1" 1 +generate_ensemble_model "ensemble_limit_4" 4 + run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -271,11 +347,23 @@ if [ "$SERVER_PID" == "0" ]; then fi set +e + +# Verify valid config was loaded successfully +if ! grep -q "Ensemble model 'ensemble_limit_1' configured with max_inflight_requests: 1" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: ensemble_limit_1 did not load\n***" + RET=1 +fi +if ! grep -q "Ensemble model 'ensemble_limit_4' configured with max_inflight_requests: 4" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: ensemble_limit_4 did not load\n***" + RET=1 +fi + python $BACKPRESSURE_TEST_PY $TEST_NAME -v >> $CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then RET=1 + cat $CLIENT_LOG else - check_test_results $TEST_RESULT_FILE 5 + check_test_results $TEST_RESULT_FILE 4 if [ $? -ne 0 ]; then cat $CLIENT_LOG echo -e "\n***\n*** Test Result Verification Failed\n***" @@ -287,14 +375,6 @@ set -e kill $SERVER_PID wait $SERVER_PID -set +e -# Verify valid config was loaded successfully -if ! grep -q "Ensemble model 'ensemble_max_inflight_requests_limit_4' configured with max_inflight_requests: 4" $SERVER_LOG; then - echo -e "\n***\n*** FAILED: Valid model did not load successfully\n***" - RET=1 -fi -set -e - ######## Test invalid value for "max_inflight_requests" INVALID_PARAM_MODEL_DIR="`pwd`/invalid_param_test_models" @@ -305,20 +385,21 @@ rm -rf $SERVER_LOG ${INVALID_PARAM_MODEL_DIR} mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/1 mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/1 mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/1 -cp -r ${MODEL_DIR}/decoupled_producer ${MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ +# Copy the same decoupled_producer and slow_consumer models from the previous tests for all three ensemble models. +cp -r ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ # max_inflight_requests = -5 -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/ +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/ sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: -5/g' \ ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/config.pbtxt # max_inflight_requests = "invalid_value" -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/ +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/ sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: "invalid_value"/g' \ ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/config.pbtxt # max_inflight_requests = 12345678901 -cp ${MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/ +cp ${BACKPRESSURE_TEST_MODEL_DIR}/ensemble_disabled_max_inflight_requests/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/ sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_requests: 12345678901/g' \ ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/config.pbtxt @@ -356,7 +437,6 @@ set -e if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" else - cat $CLIENT_LOG echo -e "\n***\n*** Test FAILED\n***" fi From 35dfed2d243d05f2144581fbee578a03e08f4734 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 18 Mar 2026 12:12:16 +0000 Subject: [PATCH 2/7] Update --- .../ensemble_backpressure_test.py | 20 +++++++++---------- qa/L0_simple_ensemble/test.sh | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 1648691df5..8927d450fc 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -165,9 +165,9 @@ def _run_inference( def test_single_request_with_different_limits(self): """ - Single streaming request producing 16 responses through a 3-step - ensemble (decoupled_producer -> consumer_high_delay -> consumer_low_delay) - under various max_inflight_requests settings. + Single streaming request that produces 16 responses via a three-step ensemble pipeline + (decoupled_producer → consumer_high_delay → consumer_low_delay) under various + max_inflight_requests configurations. """ cases = [ ("ensemble_limit_4", "max_inflight_requests=4"), @@ -180,10 +180,10 @@ def test_single_request_with_different_limits(self): model_name=model_name, expected_responses_per_request=16 ) - def test_concurrent_requests_across_topologies(self): + def test_concurrent_requests_with_different_limits(self): """ NUM_REQUESTS concurrent streaming requests (NUM_RESPONSES_PER_REQUEST - responses each) exercise the global max_inflight_requests limit. + responses each) exercise the max_inflight_requests limit. Subtests cover: limit=4, limit=1, and the limit disabled. """ cases = [ @@ -192,7 +192,7 @@ def test_concurrent_requests_across_topologies(self): ("ensemble_disabled", "max_inflight_requests is disabled"), ] for model_name, desc in cases: - with self.subTest(topology=desc): + with self.subTest(limit=desc): self._run_inference( model_name=model_name, expected_responses_per_request=NUM_RESPONSES_PER_REQUEST, @@ -201,8 +201,8 @@ def test_concurrent_requests_across_topologies(self): def test_sequential_requests_limiter_resets_cleanly(self): """ - Send NUM_REQUESTS sequential requests one after another. If the limiter - leaks a slot on any request, subsequent requests will deadlock or time out. + Send NUM_REQUESTS requests one after another. If the limiter + leaks a slot on any request, subsequent requests will be stuck or time out. """ for seq_idx in range(NUM_REQUESTS): with self.subTest(request=seq_idx): @@ -240,7 +240,7 @@ def test_request_cancellation_under_backpressure(self): except queue.Empty: self.fail("Stream did not produce any response before cancellation.") - # Cancel the stream. This should unblock any waiting producers and result in a CANCELLED error. + # Cancel the stream - this unblocks any waiting producers and triggers a CANCELLED error. triton_client.stop_stream(cancel_requests=True) # Allow some time for cancellation @@ -323,7 +323,7 @@ def _run_inference(self, model_name, expected_responses_count): self.assertEqual( len(errors), 1, - "Expected exactly one error when queue full terminates stream", + "Expected exactly one error when the queue is full and the stream terminates", ) # Verify correctness of successful responses diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index 0623634a9c..a12c17e27b 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -330,7 +330,7 @@ ensemble_scheduling { EOF } -# Step 2, 3 - consumer_high_delay and consumer_low_delay (batch size 2) +# Steps 2 and 3 - consumer_high_delay and consumer_low_delay (batch size 2) generate_consumer_model "consumer_high_delay" "0.5" generate_consumer_model "consumer_low_delay" "0.1" @@ -376,7 +376,7 @@ kill $SERVER_PID wait $SERVER_PID -######## Test invalid value for "max_inflight_requests" +######## Test invalid values for 'max_inflight_requests' config option ######## INVALID_PARAM_MODEL_DIR="`pwd`/invalid_param_test_models" SERVER_ARGS="--model-repository=${INVALID_PARAM_MODEL_DIR}" SERVER_LOG="./invalid_max_inflight_requests_server.log" @@ -385,7 +385,7 @@ rm -rf $SERVER_LOG ${INVALID_PARAM_MODEL_DIR} mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/1 mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/1 mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/1 -# Copy the same decoupled_producer and slow_consumer models from the previous tests for all three ensemble models. +# Reuse the decoupled_producer and slow_consumer models built in the previous test section. cp -r ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/decoupled_producer ${MAX_QUEUE_SIZE_TEST_MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ # max_inflight_requests = -5 From 71a973d0545b4f3197d72a6b2efdc9d53faff701 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 18 Mar 2026 13:30:39 +0000 Subject: [PATCH 3/7] Update --- qa/L0_simple_ensemble/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index a12c17e27b..95a0efa4a2 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -291,7 +291,7 @@ generate_ensemble_model() { local name=$1 local limit=$2 local batch_size=2 - + local limit_str="" if [ "$limit" != "disabled" ]; then limit_str="max_inflight_requests: $limit" From 731a4ce4b1f77d0c6d92ada5499699091aa9f52f Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 18 Mar 2026 15:39:08 +0000 Subject: [PATCH 4/7] Update --- qa/L0_simple_ensemble/ensemble_backpressure_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 8927d450fc..fdbe00d028 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -151,12 +151,14 @@ def _run_inference( # Verify correctness of responses for idx, resp in enumerate(responses): output = resp.as_numpy("OUT") + # output shape is [batch_size, 1]; extract scalar for comparison. + value = float(output[0][0]) self.assertAlmostEqual( - output[0], + value, EXPECTED_INFER_OUTPUT, places=5, msg=f"Request {i} response {idx}: expected " - f"{EXPECTED_INFER_OUTPUT}, got {output[0]}", + f"{EXPECTED_INFER_OUTPUT}, got {value}", ) # Stop all streams From c18d9b3f4495b5d242731d7e1182d53d9da47af0 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 21 Mar 2026 23:04:38 +0530 Subject: [PATCH 5/7] Update docs --- docs/user_guide/decoupled_models.md | 4 ++-- docs/user_guide/ensemble_models.md | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/docs/user_guide/decoupled_models.md b/docs/user_guide/decoupled_models.md index d932b07d9c..14dcce8355 100644 --- a/docs/user_guide/decoupled_models.md +++ b/docs/user_guide/decoupled_models.md @@ -97,9 +97,9 @@ each time with a new response. You can take a look at [grpc_server.cc](https://g ### Using Decoupled Models in Ensembles -When using decoupled models within an [ensemble pipeline](ensemble_models.md), you may encounter unbounded memory growth if the decoupled model produces responses faster than downstream models can consume them. +When using decoupled models within an [ensemble pipeline](ensemble_models.md), you may experience unbounded memory growth if a decoupled model produces responses faster than downstream models can consume them. -To prevent unbounded memory growth in this scenario, consider using the `max_inflight_requests` configuration field. This field limits the maximum number of concurrent inflight requests permitted at each ensemble step for each inference request. +To avoid this, consider using the `max_inflight_requests` configuration field. This field sets a global limit on the number of concurrent in-flight requests allowed at each ensemble step across all active ensemble requests, helping to prevent unbounded memory growth. For more details and examples, see [Managing Memory Usage in Ensemble Models](ensemble_models.md#managing-memory-usage-in-ensemble-models). diff --git a/docs/user_guide/ensemble_models.md b/docs/user_guide/ensemble_models.md index 3477e33d6c..59ab79444b 100644 --- a/docs/user_guide/ensemble_models.md +++ b/docs/user_guide/ensemble_models.md @@ -200,8 +200,7 @@ Consider an example ensemble model with two steps where the upstream model is 10 Without backpressure, requests accumulate in the pipeline faster than they can be processed, eventually leading to out-of-memory errors. -The `max_inflight_requests` field in the ensemble configuration sets a limit on the number of concurrent inflight requests permitted at each ensemble step for a single inference request. -When this limit is reached, faster upstream models are paused (blocked) until downstream models finish processing, effectively preventing unbounded memory growth. +The `max_inflight_requests` field in the ensemble configuration defines a global limit on the number of concurrent in-flight requests allowed at each ensemble step, across all active ensemble requests. Once this limit is reached, scheduling for that step is paused (blocked) until downstream capacity becomes available, effectively preventing unbounded memory growth. ``` ensemble_scheduling { @@ -225,9 +224,8 @@ ensemble_scheduling { ``` **Configuration:** -* **`max_inflight_requests: 16`**: For each ensemble request (not globally), at most 16 requests from `dali_preprocess` - can wait for `onnx_inference` to process. Once this per-step limit is reached, `dali_preprocess` is blocked until the downstream step completes a response. -* **Default (`0`)**: No limit - allows unlimited inflight requests (original behavior). +* **`max_inflight_requests: 16`**: Allows up to 16 in-flight requests for a given step (for example, requests from `dali_preprocess` waiting for `onnx_inference` to complete) across all active ensemble requests. Once this global, per-step limit is reached, scheduling additional work for that step is blocked until capacity becomes available. +* **Default (`0`)**: No limit — allows an unlimited number of in-flight requests (original behavior). ### When to Use This Feature From c196d2214cc8a4678394e4afe1da1285d3e7f28e Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 31 Mar 2026 11:16:26 +0530 Subject: [PATCH 6/7] Update --- docs/user_guide/decoupled_models.md | 2 +- docs/user_guide/ensemble_models.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/user_guide/decoupled_models.md b/docs/user_guide/decoupled_models.md index 14dcce8355..4901ed0084 100644 --- a/docs/user_guide/decoupled_models.md +++ b/docs/user_guide/decoupled_models.md @@ -99,7 +99,7 @@ each time with a new response. You can take a look at [grpc_server.cc](https://g When using decoupled models within an [ensemble pipeline](ensemble_models.md), you may experience unbounded memory growth if a decoupled model produces responses faster than downstream models can consume them. -To avoid this, consider using the `max_inflight_requests` configuration field. This field sets a global limit on the number of concurrent in-flight requests allowed at each ensemble step across all active ensemble requests, helping to prevent unbounded memory growth. +To prevent this, use the `max_inflight_requests` configuration field. This field sets a limit on the maximum number of concurrent requests allowed at each ensemble step. The limit is shared across all active requests for that ensemble model, which helps control memory usage and prevents it from growing without bound. For more details and examples, see [Managing Memory Usage in Ensemble Models](ensemble_models.md#managing-memory-usage-in-ensemble-models). diff --git a/docs/user_guide/ensemble_models.md b/docs/user_guide/ensemble_models.md index 59ab79444b..c2e61ea787 100644 --- a/docs/user_guide/ensemble_models.md +++ b/docs/user_guide/ensemble_models.md @@ -198,9 +198,9 @@ Consider an example ensemble model with two steps where the upstream model is 10 1. **Preprocessing model**: Produces 100 preprocessed requests/sec 2. **Inference model**: Consumes 10 requests/sec -Without backpressure, requests accumulate in the pipeline faster than they can be processed, eventually leading to out-of-memory errors. +Without backpressure, requests accumulate in the pipeline faster than they can be processed, which eventually leads to out-of-memory errors. -The `max_inflight_requests` field in the ensemble configuration defines a global limit on the number of concurrent in-flight requests allowed at each ensemble step, across all active ensemble requests. Once this limit is reached, scheduling for that step is paused (blocked) until downstream capacity becomes available, effectively preventing unbounded memory growth. +The `max_inflight_requests` field in the ensemble configuration defines a limit on the number of concurrent in-flight requests allowed at each ensemble step. This limit is shared across all active requests for that ensemble model. When the limit is reached, new request scheduling for that step is paused until downstream models free up capacity. This prevents requests from accumulating indefinitely and keeps memory usage under control. ``` ensemble_scheduling { @@ -224,7 +224,7 @@ ensemble_scheduling { ``` **Configuration:** -* **`max_inflight_requests: 16`**: Allows up to 16 in-flight requests for a given step (for example, requests from `dali_preprocess` waiting for `onnx_inference` to complete) across all active ensemble requests. Once this global, per-step limit is reached, scheduling additional work for that step is blocked until capacity becomes available. +* **`max_inflight_requests: 16`**: Limits the number of concurrent in-flight requests at a given ensemble step to 16 (for example, requests from `dali_preprocess` waiting for `onnx_inference` to complete). This limit is shared across all active requests for that ensemble model. Once the limit is reached, scheduling new work for that step is paused until downstream capacity becomes available. * **Default (`0`)**: No limit — allows an unlimited number of in-flight requests (original behavior). ### When to Use This Feature From 4606855d8c9afd9f3c50daf48a557d37f89e9d81 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 31 Mar 2026 16:16:14 +0530 Subject: [PATCH 7/7] Update docs/user_guide/ensemble_models.md Co-authored-by: Yingge He <157551214+yinggeh@users.noreply.github.com> --- docs/user_guide/ensemble_models.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user_guide/ensemble_models.md b/docs/user_guide/ensemble_models.md index c2e61ea787..a1f9286197 100644 --- a/docs/user_guide/ensemble_models.md +++ b/docs/user_guide/ensemble_models.md @@ -190,7 +190,7 @@ When crafting the ensemble steps, it is useful to note the distinction between connecting ensemble `input`/`output` to those on the composing model and between composing models. -## Managing Memory Usage in Ensemble Models +## Limit Memory Growth in Ensemble Models (BETA) An *inflight request* refers to an intermediate request generated by an upstream model that is queued and held in memory until it is processed by a downstream model within an ensemble pipeline. When upstream models process requests significantly faster than downstream models, these in-flight requests can accumulate and potentially lead to unbounded memory growth. This problem occurs when there is a speed mismatch between different steps in the pipeline and is particularly common in *decoupled models* that produce multiple responses per request more quickly than downstream models can consume.