diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 07af1390649..275670f9c1a 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -564,7 +564,7 @@ grpc::Status FlashService::IsAlive( return check_result; auto & tmt_context = context->getTMTContext(); - response->set_available(tmt_context.checkRunning()); + response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable()); response->set_mpp_version(DB::GetMppVersion()); return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 3296da53c37..bcf9c35a366 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -83,6 +84,39 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } +void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) +{ + // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down + static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; + // The default value of flash.graceful_wait_before_shutdown + static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; + auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( + GRACEFUL_WIAT_BEFORE_SHUTDOWN, + DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); + LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); + Stopwatch watch; + // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched + std::this_thread::sleep_for(std::chrono::seconds(1)); + while (true) + { + auto elapsed_ms = watch.elapsedMilliseconds(); + { + std::unique_lock lock(mu); + if (monitored_tasks.empty()) + { + LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); + break; + } + } + if (elapsed_ms >= graceful_wait_before_shutdown * 1000) + { + LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms); + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } +} + MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index f49d14600ab..35fb030f7e9 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -194,6 +194,8 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } + void waitAllMPPTasksFinish(const std::unique_ptr & global_context); + std::mutex mu; std::condition_variable cv; bool is_shutdown = false; @@ -221,6 +223,8 @@ class MPPTaskManager : private boost::noncopyable std::shared_ptr monitor; + std::atomic is_available{true}; + public: explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler); @@ -273,6 +277,9 @@ class MPPTaskManager : private boost::noncopyable bool isTaskExists(const MPPTaskId & id); + void setUnavailable() { is_available = false; } + bool isAvailable() { return is_available; } + private: MPPQueryPtr addMPPQuery( const MPPQueryId & query_id, diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 3e665946980..cb4d70f5d15 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1252,6 +1252,12 @@ try LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); + // Note: `waitAllMPPTasksFinish` must be called before stopping the proxy. + // Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully. + LOG_INFO(log, "Set unavailable for MPPTask"); + tmt_context.getMPPTaskManager()->setUnavailable(); + tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); + { // Set limiters stopping and wakeup threads in waitting queue. global_context->getIORateLimiter().setStop();