From 5ba596417231ac3a5cdee62cc37f998ed2149eff Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 18 Mar 2026 11:23:53 +0000 Subject: [PATCH 1/6] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 158 +++++++++---------- src/ensemble_scheduler/ensemble_scheduler.h | 18 ++- 2 files changed, 87 insertions(+), 89 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 2a507bccd..e34f56db8 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -41,35 +41,6 @@ namespace triton { namespace core { -namespace { - -class EnsembleContext; - -using IterationCount = size_t; - -// Check if the model is configured to preserve the order of responses. -// This is critical for async execution of ResponseComplete callbacks. -inline bool -preserve_responses_order(const inference::ModelConfig& config) -{ - uint64_t total_instance_groups = 0; - for (const auto& group : config.instance_group()) { - total_instance_groups += group.count(); - } - - // Case 1: Sequence batching is enabled - // Case 2: Dynamic batching is disabled and there is only one instance group - // Case 3: Dynamic batching is enabled and preserve_ordering is true - // Case 4: Model transaction policy is decoupled (if the final response - // callback is not executed in the last step, the RequestTracker object will - // be freed prematurely and led to segmentation fault) - return config.has_sequence_batching() || - (!config.has_dynamic_batching() && total_instance_groups <= 1) || - (config.has_dynamic_batching() && - config.dynamic_batching().preserve_ordering()) || - config.model_transaction_policy().decoupled(); -} - // Request tracker is passed as 'userp' in RequestRelease function and used // to manage the lifecycle of the ensemble request class RequestTracker { @@ -152,8 +123,9 @@ class RequestTracker { triton::common::ThreadPool* const callback_pool_; }; -// Limits concurrent inflight requests for a single ensemble step. -// Tracks inflight requests count and blocks producers when limit is reached. +// Globally limits the number of concurrent in-flight requests for a single +// ensemble step. Tracks the count of in-flight requests and blocks producers +// when the limit is reached. class StepInflightRequestLimiter { public: explicit StepInflightRequestLimiter(const size_t max_inflight) @@ -161,13 +133,14 @@ class StepInflightRequestLimiter { { } - // Wait until capacity is available or request is cancelled. - // No-op if limit not configured (max_inflight_ == 0). - void WaitForCapacity( + // Blocks until a slot becomes available or the request is canceled. + // Canceled requests bypass the wait so that cancellation propagates + // through the normal step-scheduling path. + void Acquire( RequestTracker* request_tracker, const size_t step_idx, const std::string& ensemble_name) { - // No limit configured, no blocking + // No limit is configured, so requests are not blocked. if (max_inflight_ == 0) { return; } @@ -194,29 +167,25 @@ class StepInflightRequestLimiter { << kMutexTimeoutSeconds << " seconds. Proceeding to avoid deadlock."; } - } - // Increment inflight count after successfully scheduling a request. - // No-op if limit not configured (max_inflight_ == 0). - void IncrementInflightCount() - { - // No limit configured, no tracking needed - if (max_inflight_ == 0) { - return; - } - std::lock_guard lk(mutex_); + // Increment while holding the lock to prevent transient oversubscription. inflight_count_++; } - // Decrement inflight count when a request completes, and notify waiting - // producers. No-op if limit not configured (max_inflight_ == 0). - void DecrementInflightCount() + // Release one acquired slot and notify a waiting thread + void Release() { - // No limit configured, no tracking needed + // No limit is configured, so requests are not blocked. if (max_inflight_ == 0) { return; } + std::lock_guard lk(mutex_); + if (inflight_count_ == 0) { + LOG_ERROR << "[Internal Error] step inflight request limiter underflow"; + return; + } + inflight_count_--; cv_.notify_one(); } @@ -228,6 +197,35 @@ class StepInflightRequestLimiter { std::condition_variable cv_; }; +namespace { + +class EnsembleContext; + +using IterationCount = size_t; + +// Check if the model is configured to preserve the order of responses. +// This is critical for async execution of ResponseComplete callbacks. +inline bool +preserve_responses_order(const inference::ModelConfig& config) +{ + uint64_t total_instance_groups = 0; + for (const auto& group : config.instance_group()) { + total_instance_groups += group.count(); + } + + // Case 1: Sequence batching is enabled + // Case 2: Dynamic batching is disabled and there is only one instance group + // Case 3: Dynamic batching is enabled and preserve_ordering is true + // Case 4: Model transaction policy is decoupled (if the final response + // callback is not executed in the last step, the RequestTracker object will + // be freed prematurely and led to segmentation fault) + return config.has_sequence_batching() || + (!config.has_dynamic_batching() && total_instance_groups <= 1) || + (config.has_dynamic_batching() && + config.dynamic_batching().preserve_ordering()) || + config.model_transaction_policy().decoupled(); +} + // Step is used as 'userp' and keeps ensemble context alive // until no more internal requests are inflight. // Step contains metadata, and status for the @@ -448,11 +446,6 @@ class EnsembleContext { size_t inflight_step_counter_; - // Inflight request limiters for each ensemble step. - // Only allocated when max_inflight_requests_ > 0. - std::vector> - step_inflight_request_limiters_; - // pointer that either points to 'pruned_tensor_to_step_' or to // 'info_->tensor_to_step_' if all ensemble outputs are requested std::unordered_map>* tensor_to_step_; @@ -592,17 +585,6 @@ EnsembleContext::EnsembleContext( } } - // Initialize step inflight request limiters for each step. - if (info_->max_inflight_requests_ > 0) { - size_t num_steps = info_->steps_.size(); - step_inflight_request_limiters_.reserve(num_steps); - for (size_t i = 0; i < num_steps; i++) { - step_inflight_request_limiters_.emplace_back( - std::make_unique( - info_->max_inflight_requests_)); - } - } - if (ensemble_status_.IsOk()) { request_id_ = lrequest->Id(); correlation_id_ = lrequest->CorrelationId(); @@ -1016,9 +998,9 @@ EnsembleContext::UpdateEnsembleState( if (completed_step->response_flags_ & TRITONSERVER_RESPONSE_COMPLETE_FINAL) { inflight_step_counter_--; - if (!step_inflight_request_limiters_.empty()) { - step_inflight_request_limiters_[completed_step->step_idx_] - ->DecrementInflightCount(); + if (!info_->step_inflight_request_limiters_.empty()) { + info_->step_inflight_request_limiters_[completed_step->step_idx_] + ->Release(); } } RETURN_IF_ERROR(ConsumeResponse(completed_step)); @@ -1510,9 +1492,10 @@ EnsembleContext::ScheduleSteps( step->ctx_ = context; size_t this_step_idx = step->step_idx_; - // Apply step inflight request limiters if configured. - if (!context->step_inflight_request_limiters_.empty()) { - context->step_inflight_request_limiters_[this_step_idx]->WaitForCapacity( + // Acquire a slot from the global per-step limiter before scheduling the + // step. + if (!context->info_->step_inflight_request_limiters_.empty()) { + context->info_->step_inflight_request_limiters_[this_step_idx]->Acquire( context->request_tracker_, this_step_idx, context->info_->ensemble_name_); } @@ -1546,13 +1529,6 @@ EnsembleContext::ScheduleSteps( std::unique_ptr request = std::move(step->request_); auto step_status = context->is_->InferAsync(request); if (step_status.IsOk()) { - // Increment inflight counter AFTER successful scheduling. Always - // increment for ALL steps (including step 0) to ensure symmetry with - // decrement and prevent underflow when steps complete. - if (!context->step_inflight_request_limiters_.empty()) { - context->step_inflight_request_limiters_[this_step_idx] - ->IncrementInflightCount(); - } step.release(); continue; } else { @@ -1565,10 +1541,19 @@ EnsembleContext::ScheduleSteps( // Reaching here means the step is not being scheduled, update corresponding // counters and attempt to finish ensemble if it is the last step. + if (!context->info_->step_inflight_request_limiters_.empty()) { + context->info_->step_inflight_request_limiters_[this_step_idx]->Release(); + } + std::lock_guard lock(context->mutex_); - // The request is not sent to server properly, shouldn't expect its - // release function get called. - context->request_tracker_->DecrementCounter(); + // Only decrement the request tracker when IncrementCounter was previously + // called. If should_schedule is false, IncrementCounter was never + // invoked; unconditionally calling DecrementCounter here would underflow + // the counter, prematurely release the request, and potentially cause a + // use-after-free. + if (should_schedule) { + context->request_tracker_->DecrementCounter(); + } --context->inflight_step_counter_; if (context->inflight_step_counter_ == 0) { @@ -1736,7 +1721,7 @@ EnsembleScheduler::EnsembleScheduler( } callback_pool_ = is_->EnsembleCallbackPool(); - // Parse the configuration for max_inflight_requests from the protobuf field. + // Parse the max_inflight_requests configuration from the protobuf field if (config.has_ensemble_scheduling()) { info_->max_inflight_requests_ = config.ensemble_scheduling().max_inflight_requests(); @@ -1744,6 +1729,15 @@ EnsembleScheduler::EnsembleScheduler( LOG_INFO << "Ensemble model '" << config.name() << "' configured with max_inflight_requests: " << info_->max_inflight_requests_; + + // Allocate one global limiter per step so that the in-flight bound is + // shared across all concurrent ensemble requests. + info_->step_inflight_request_limiters_.reserve(info_->steps_.size()); + for (size_t i = 0; i < info_->steps_.size(); ++i) { + info_->step_inflight_request_limiters_.emplace_back( + std::make_unique( + info_->max_inflight_requests_)); + } } } } diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 2d252445e..1f9bff2c6 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -49,6 +49,7 @@ using cudaStream_t = void*; #endif // TRITON_ENABLE_GPU class InferenceServer; +class StepInflightRequestLimiter; struct EnsembleInfo { struct StepInfo { @@ -84,14 +85,17 @@ struct EnsembleInfo { // backward path, ensemble tensor to the step that provides its data std::unordered_map tensor_to_prev_step_; - // The maximum number of concurrent inflight requests allowed at each ensemble - // step per inference request. This limit is applied per step and per - // inference request, not globally for the entire ensemble model. This limit - // prevents unbounded memory growth when ensemble steps produce responses - // faster than downstream steps can consume them. Default value is 0, which - // indicates that no limit is enforced. Configured via 'max_inflight_requests' - // field in ensemble_scheduling. + // The maximum number of concurrent in-flight requests allowed at each + // ensemble step across all concurrent ensemble requests for this model. + // The limit is applied per step index and is global to the ensemble + // model instance. + // This limit prevents unbounded memory growth when upstream steps + // produce responses faster than downstream steps can consume them. + // A value of 0 means no limit is enforced. + // Configured via the 'max_inflight_requests' field in ensemble_scheduling. size_t max_inflight_requests_ = 0; + std::vector> + step_inflight_request_limiters_; }; // Scheduler that implements ensemble scheduling. From 32023d73ce3c813005c63270364c9df3b7ca2bf1 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 18 Mar 2026 14:06:58 +0000 Subject: [PATCH 2/6] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 222 +++++++++---------- src/ensemble_scheduler/ensemble_scheduler.h | 28 ++- 2 files changed, 130 insertions(+), 120 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index e34f56db8..6903f3543 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -28,8 +28,7 @@ #include "ensemble_scheduler.h" -#include -#include +#include #include "constants.h" #include "cuda_utils.h" @@ -41,6 +40,35 @@ namespace triton { namespace core { +namespace { + +class EnsembleContext; + +using IterationCount = size_t; + +// Check if the model is configured to preserve the order of responses. +// This is critical for async execution of ResponseComplete callbacks. +inline bool +preserve_responses_order(const inference::ModelConfig& config) +{ + uint64_t total_instance_groups = 0; + for (const auto& group : config.instance_group()) { + total_instance_groups += group.count(); + } + + // Case 1: Sequence batching is enabled + // Case 2: Dynamic batching is disabled and there is only one instance group + // Case 3: Dynamic batching is enabled and preserve_ordering is true + // Case 4: Model transaction policy is decoupled (if the final response + // callback is not executed in the last step, the RequestTracker object will + // be freed prematurely and led to segmentation fault) + return config.has_sequence_batching() || + (!config.has_dynamic_batching() && total_instance_groups <= 1) || + (config.has_dynamic_batching() && + config.dynamic_batching().preserve_ordering()) || + config.model_transaction_policy().decoupled(); +} + // Request tracker is passed as 'userp' in RequestRelease function and used // to manage the lifecycle of the ensemble request class RequestTracker { @@ -123,109 +151,6 @@ class RequestTracker { triton::common::ThreadPool* const callback_pool_; }; -// Globally limits the number of concurrent in-flight requests for a single -// ensemble step. Tracks the count of in-flight requests and blocks producers -// when the limit is reached. -class StepInflightRequestLimiter { - public: - explicit StepInflightRequestLimiter(const size_t max_inflight) - : inflight_count_(0), max_inflight_(max_inflight) - { - } - - // Blocks until a slot becomes available or the request is canceled. - // Canceled requests bypass the wait so that cancellation propagates - // through the normal step-scheduling path. - void Acquire( - RequestTracker* request_tracker, const size_t step_idx, - const std::string& ensemble_name) - { - // No limit is configured, so requests are not blocked. - if (max_inflight_ == 0) { - return; - } - - std::unique_lock lk(mutex_); - auto timeout = std::chrono::seconds(kMutexTimeoutSeconds); - - auto is_request_cancelled = [&]() { - auto& req = request_tracker->Request(); - return (req == nullptr) || req->IsCancelled(); - }; - - bool capacity_available = cv_.wait_for(lk, timeout, [&] { - return is_request_cancelled() || (inflight_count_ < max_inflight_); - }); - - // Log error if timeout occurred (not cancellation), but proceed anyway - // to avoid deadlock. Caller always continues after this call. - if (!capacity_available && !is_request_cancelled()) { - LOG_ERROR << "[Internal Error] Ensemble '" << ensemble_name - << "' unable to schedule step " << step_idx - << " (inflight: " << inflight_count_ - << " >= limit: " << max_inflight_ << ") for " - << kMutexTimeoutSeconds - << " seconds. Proceeding to avoid deadlock."; - } - - // Increment while holding the lock to prevent transient oversubscription. - inflight_count_++; - } - - // Release one acquired slot and notify a waiting thread - void Release() - { - // No limit is configured, so requests are not blocked. - if (max_inflight_ == 0) { - return; - } - - std::lock_guard lk(mutex_); - if (inflight_count_ == 0) { - LOG_ERROR << "[Internal Error] step inflight request limiter underflow"; - return; - } - - inflight_count_--; - cv_.notify_one(); - } - - private: - size_t inflight_count_; - const size_t max_inflight_; - std::mutex mutex_; - std::condition_variable cv_; -}; - -namespace { - -class EnsembleContext; - -using IterationCount = size_t; - -// Check if the model is configured to preserve the order of responses. -// This is critical for async execution of ResponseComplete callbacks. -inline bool -preserve_responses_order(const inference::ModelConfig& config) -{ - uint64_t total_instance_groups = 0; - for (const auto& group : config.instance_group()) { - total_instance_groups += group.count(); - } - - // Case 1: Sequence batching is enabled - // Case 2: Dynamic batching is disabled and there is only one instance group - // Case 3: Dynamic batching is enabled and preserve_ordering is true - // Case 4: Model transaction policy is decoupled (if the final response - // callback is not executed in the last step, the RequestTracker object will - // be freed prematurely and led to segmentation fault) - return config.has_sequence_batching() || - (!config.has_dynamic_batching() && total_instance_groups <= 1) || - (config.has_dynamic_batching() && - config.dynamic_batching().preserve_ordering()) || - config.model_transaction_policy().decoupled(); -} - // Step is used as 'userp' and keeps ensemble context alive // until no more internal requests are inflight. // Step contains metadata, and status for the @@ -1492,14 +1417,6 @@ EnsembleContext::ScheduleSteps( step->ctx_ = context; size_t this_step_idx = step->step_idx_; - // Acquire a slot from the global per-step limiter before scheduling the - // step. - if (!context->info_->step_inflight_request_limiters_.empty()) { - context->info_->step_inflight_request_limiters_[this_step_idx]->Acquire( - context->request_tracker_, this_step_idx, - context->info_->ensemble_name_); - } - bool should_schedule = false; // Must release lock before InferAsync to avoid deadlock, as the same thread // will be calling request/response callbacks on cache hits, which will @@ -1520,6 +1437,14 @@ EnsembleContext::ScheduleSteps( if (context->request_tracker_->Request()->IsCancelled()) { step->request_->Cancel(); } + // Acquire a slot from the global per-step limiter. This is done only for + // steps that will be dispatched, so that a failed ensemble does not + // block unnecessarily. + if (!context->info_->step_inflight_request_limiters_.empty()) { + context->info_->step_inflight_request_limiters_[this_step_idx]->Acquire( + context->request_tracker_->Request(), this_step_idx, + context->info_->ensemble_name_); + } // On a successful call to InferAsync(), the step will be released by // the response callback. When the response callback is invoked, the // step must not own (and release) the request as the request should be @@ -1541,16 +1466,17 @@ EnsembleContext::ScheduleSteps( // Reaching here means the step is not being scheduled, update corresponding // counters and attempt to finish ensemble if it is the last step. - if (!context->info_->step_inflight_request_limiters_.empty()) { + + + // Release the limiter slot if one was acquired, and update counters. + if (should_schedule && + !context->info_->step_inflight_request_limiters_.empty()) { context->info_->step_inflight_request_limiters_[this_step_idx]->Release(); } std::lock_guard lock(context->mutex_); - // Only decrement the request tracker when IncrementCounter was previously - // called. If should_schedule is false, IncrementCounter was never - // invoked; unconditionally calling DecrementCounter here would underflow - // the counter, prematurely release the request, and potentially cause a - // use-after-free. + // Decrement only when IncrementCounter was called. An unconditional + // decrement would underflow the counter and cause a use-after-free. if (should_schedule) { context->request_tracker_->DecrementCounter(); } @@ -1564,6 +1490,64 @@ EnsembleContext::ScheduleSteps( } // namespace +StepInflightRequestLimiter::StepInflightRequestLimiter( + const size_t max_inflight) + : inflight_count_(0), max_inflight_(max_inflight) +{ +} + +void +StepInflightRequestLimiter::Acquire( + const std::unique_ptr& request, const size_t step_idx, + const std::string& ensemble_name) +{ + // No limit is configured, so requests are not blocked. + if (max_inflight_ == 0) { + return; + } + + std::unique_lock lk(mutex_); + auto timeout = std::chrono::seconds(kMutexTimeoutSeconds); + + auto is_request_cancelled = [&]() { + return (request == nullptr) || request->IsCancelled(); + }; + + bool capacity_available = cv_.wait_for(lk, timeout, [&] { + return is_request_cancelled() || (inflight_count_ < max_inflight_); + }); + + if (!capacity_available && !is_request_cancelled()) { + LOG_ERROR << "[Internal Error] Ensemble '" << ensemble_name + << "' unable to schedule step " << step_idx + << " (inflight: " << inflight_count_ + << " >= limit: " << max_inflight_ << ") for " + << kMutexTimeoutSeconds + << " seconds. Proceeding to avoid deadlock."; + } + + // Increment while holding the lock to prevent transient oversubscription. + inflight_count_++; +} + +void +StepInflightRequestLimiter::Release() +{ + // No limit is configured, so requests are not blocked. + if (max_inflight_ == 0) { + return; + } + + std::lock_guard lk(mutex_); + if (inflight_count_ == 0) { + LOG_ERROR << "[Internal Error] step inflight request limiter underflow"; + return; + } + + inflight_count_--; + cv_.notify_one(); +} + Status EnsembleScheduler::Create( InferenceStatsAggregator* const stats_aggregator, diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 1f9bff2c6..0ca20666f 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -27,7 +27,9 @@ #ifdef TRITON_ENABLE_ENSEMBLE +#include #include +#include #include "metric_model_reporter.h" #include "model.h" @@ -49,7 +51,31 @@ using cudaStream_t = void*; #endif // TRITON_ENABLE_GPU class InferenceServer; -class StepInflightRequestLimiter; + +// Enforces a global per-step limit on concurrent in-flight requests shared +// across all in-progress ensemble requests. Tracks the number of in-flight +// requests and blocks producers when the limit is reached. +class StepInflightRequestLimiter { + public: + explicit StepInflightRequestLimiter(size_t max_inflight); + + // Blocks until a slot is available or the request is cancelled. Cancelled + // requests skip the wait so cancellation propagates via the normal + // step-scheduling path. The const reference prevents ownership transfer; + // only IsCancelled() is queried on the pointed-to request. + void Acquire( + const std::unique_ptr& request, size_t step_idx, + const std::string& ensemble_name); + + // Releases one acquired slot and wakes one waiting thread. + void Release(); + + private: + size_t inflight_count_; + const size_t max_inflight_; + std::mutex mutex_; + std::condition_variable cv_; +}; struct EnsembleInfo { struct StepInfo { From 0153fd6cbf9fc0e50b88dcb6e1d16622908aa55c Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 18 Mar 2026 15:44:11 +0000 Subject: [PATCH 3/6] Update --- src/ensemble_scheduler/ensemble_scheduler.cc | 2 ++ src/ensemble_scheduler/ensemble_scheduler.h | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index 6903f3543..c095f8953 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -28,7 +28,9 @@ #include "ensemble_scheduler.h" +#include #include +#include #include "constants.h" #include "cuda_utils.h" diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 0ca20666f..c6d097aea 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -27,9 +27,7 @@ #ifdef TRITON_ENABLE_ENSEMBLE -#include #include -#include #include "metric_model_reporter.h" #include "model.h" From ca3733a68ce743a4aa412d60553913ddab83a931 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 18 Mar 2026 15:46:18 +0000 Subject: [PATCH 4/6] Fix pre-commit --- src/ensemble_scheduler/ensemble_scheduler.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index c095f8953..bf3c54cea 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -28,8 +28,8 @@ #include "ensemble_scheduler.h" -#include #include +#include #include #include "constants.h" From 77e2440f532db064d856b0af680436ba589fa5c7 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 31 Mar 2026 11:24:44 +0530 Subject: [PATCH 5/6] Update comments --- src/ensemble_scheduler/ensemble_scheduler.cc | 11 ++++++----- src/ensemble_scheduler/ensemble_scheduler.h | 10 +++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.cc b/src/ensemble_scheduler/ensemble_scheduler.cc index bf3c54cea..dd4f6b9b7 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.cc +++ b/src/ensemble_scheduler/ensemble_scheduler.cc @@ -1439,9 +1439,9 @@ EnsembleContext::ScheduleSteps( if (context->request_tracker_->Request()->IsCancelled()) { step->request_->Cancel(); } - // Acquire a slot from the global per-step limiter. This is done only for - // steps that will be dispatched, so that a failed ensemble does not - // block unnecessarily. + // Acquire a slot from the per-step shared limiter only for steps that + // will be dispatched, so that a failed ensemble does not hold capacity + // unnecessarily. if (!context->info_->step_inflight_request_limiters_.empty()) { context->info_->step_inflight_request_limiters_[this_step_idx]->Acquire( context->request_tracker_->Request(), this_step_idx, @@ -1716,8 +1716,9 @@ EnsembleScheduler::EnsembleScheduler( << "' configured with max_inflight_requests: " << info_->max_inflight_requests_; - // Allocate one global limiter per step so that the in-flight bound is - // shared across all concurrent ensemble requests. + // Allocate one limiter per step to ensure max_inflight_requests is + // enforced as a shared limit across all concurrent requests for this + // ensemble model. info_->step_inflight_request_limiters_.reserve(info_->steps_.size()); for (size_t i = 0; i < info_->steps_.size(); ++i) { info_->step_inflight_request_limiters_.emplace_back( diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index c6d097aea..452d10f3e 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -50,9 +50,9 @@ using cudaStream_t = void*; class InferenceServer; -// Enforces a global per-step limit on concurrent in-flight requests shared -// across all in-progress ensemble requests. Tracks the number of in-flight -// requests and blocks producers when the limit is reached. +// Enforces a per-step limit on concurrent in-flight requests, shared across +// all active ensemble requests for a given ensemble model. Tracks in-flight +// request count and blocks producers when the limit is reached. class StepInflightRequestLimiter { public: explicit StepInflightRequestLimiter(size_t max_inflight); @@ -111,8 +111,8 @@ struct EnsembleInfo { // The maximum number of concurrent in-flight requests allowed at each // ensemble step across all concurrent ensemble requests for this model. - // The limit is applied per step index and is global to the ensemble - // model instance. + // The limit is applied per step index and is shared across all concurrent + // requests for this ensemble model. // This limit prevents unbounded memory growth when upstream steps // produce responses faster than downstream steps can consume them. // A value of 0 means no limit is enforced. From a3d8af1dbebea53d9113682acecda515e49c59bf Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 31 Mar 2026 15:23:33 +0530 Subject: [PATCH 6/6] Update --- src/ensemble_scheduler/ensemble_scheduler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ensemble_scheduler/ensemble_scheduler.h b/src/ensemble_scheduler/ensemble_scheduler.h index 452d10f3e..f3cbc2480 100644 --- a/src/ensemble_scheduler/ensemble_scheduler.h +++ b/src/ensemble_scheduler/ensemble_scheduler.h @@ -1,4 +1,4 @@ -// Copyright 2019-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2019-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions