diff --git a/python/ucxx/_lib_async/application_context.py b/python/ucxx/_lib_async/application_context.py index 3303c23e7..7a42ad8f0 100644 --- a/python/ucxx/_lib_async/application_context.py +++ b/python/ucxx/_lib_async/application_context.py @@ -4,6 +4,7 @@ import logging import os import threading +import warnings import weakref from queue import Queue @@ -50,6 +51,12 @@ def __init__( enable_python_future, self.progress_mode ) + logger.info( + f"Progress mode: {self.progress_mode}, " + f"delayed submission: {enable_delayed_submission}, " + f"Python future: {enable_python_future}" + ) + self.exchange_peer_info_timeout = exchange_peer_info_timeout # For now, a application context only has one worker @@ -71,11 +78,10 @@ def __init__( @staticmethod def _check_progress_mode(progress_mode): - if progress_mode is None: - if "UCXPY_PROGRESS_MODE" in os.environ: - progress_mode = os.environ["UCXPY_PROGRESS_MODE"] - else: - progress_mode = "thread" + if "UCXPY_PROGRESS_MODE" in os.environ: + progress_mode = os.environ["UCXPY_PROGRESS_MODE"] + elif progress_mode is None: + progress_mode = "thread" valid_progress_modes = ["polling", "thread", "thread-polling"] if not isinstance(progress_mode, str) or not any( @@ -90,15 +96,12 @@ def _check_progress_mode(progress_mode): @staticmethod def _check_enable_delayed_submission(enable_delayed_submission, progress_mode): - if enable_delayed_submission is None: - if "UCXPY_ENABLE_DELAYED_SUBMISSION" in os.environ: - explicit_enable_delayed_submission = ( - False - if os.environ["UCXPY_ENABLE_DELAYED_SUBMISSION"] == "0" - else True - ) - else: - explicit_enable_delayed_submission = progress_mode.startswith("thread") + if "UCXPY_ENABLE_DELAYED_SUBMISSION" in os.environ: + explicit_enable_delayed_submission = ( + False if os.environ["UCXPY_ENABLE_DELAYED_SUBMISSION"] == "0" else True + ) + elif enable_delayed_submission is None: + explicit_enable_delayed_submission = progress_mode.startswith("thread") else: explicit_enable_delayed_submission = enable_delayed_submission @@ -115,18 +118,17 @@ def _check_enable_delayed_submission(enable_delayed_submission, progress_mode): @staticmethod def _check_enable_python_future(enable_python_future, progress_mode): - if enable_python_future is None: - if "UCXPY_ENABLE_PYTHON_FUTURE" in os.environ: - explicit_enable_python_future = ( - os.environ["UCXPY_ENABLE_PYTHON_FUTURE"] != "0" - ) - else: - explicit_enable_python_future = False + if "UCXPY_ENABLE_PYTHON_FUTURE" in os.environ: + explicit_enable_python_future = ( + os.environ["UCXPY_ENABLE_PYTHON_FUTURE"] != "0" + ) + elif enable_python_future is None: + explicit_enable_python_future = False else: explicit_enable_python_future = enable_python_future if not progress_mode.startswith("thread") and explicit_enable_python_future: - logger.warning( + warnings.warn( f"Notifier thread requested, but {progress_mode} does not " "support it, using Python wait_yield()." ) diff --git a/python/ucxx/_lib_async/tests/test_config.py b/python/ucxx/_lib_async/tests/test_config.py index e5974f87c..02aca04ae 100644 --- a/python/ucxx/_lib_async/tests/test_config.py +++ b/python/ucxx/_lib_async/tests/test_config.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: BSD-3-Clause +import contextlib import os from unittest.mock import patch @@ -91,3 +92,73 @@ def test_logging(): ucxx.init(options) assert len(foreign_log.getvalue()) == 0 + + +@pytest.mark.parametrize( + "progress_mode", ["blocking", "polling", "thread", "thread-polling"] +) +@pytest.mark.asyncio +async def test_python_future_warnings_env(progress_mode): + with patch.dict( + os.environ, + {"UCXPY_PROGRESS_MODE": progress_mode, "UCXPY_ENABLE_PYTHON_FUTURE": "1"}, + ): + ctx = ( + contextlib.nullcontext() + if progress_mode.startswith("thread") + else pytest.warns( + UserWarning, + match=f"Notifier thread requested, but {progress_mode} does " + "not support it, using Python wait_yield().", + ) + ) + with ctx: + ucxx.init() + + +@pytest.mark.parametrize( + "progress_mode", ["blocking", "polling", "thread", "thread-polling"] +) +@pytest.mark.asyncio +async def test_python_future_warnings_init_options(progress_mode): + ctx = ( + contextlib.nullcontext() + if progress_mode.startswith("thread") + else pytest.warns( + UserWarning, + match=f"Notifier thread requested, but {progress_mode} does " + "not support it, using Python wait_yield().", + ) + ) + with ctx: + ucxx.init(progress_mode=progress_mode, enable_python_future=True) + + +@pytest.mark.parametrize( + "progress_mode", ["blocking", "polling", "thread", "thread-polling"] +) +@pytest.mark.asyncio +async def test_python_future_warnings_init_options_and_env(progress_mode): + # Environment variables have higher priority. To test that we're getting the + # proper warnings (or not getting them), we use reverse conditions here as to + # what should trigger them with environment variables. + kwargs = { + "progress_mode": "blocking" if progress_mode.startswith("thread") else "thread", + "enable_python_future": False, + } + + with patch.dict( + os.environ, + {"UCXPY_PROGRESS_MODE": progress_mode, "UCXPY_ENABLE_PYTHON_FUTURE": "1"}, + ): + ctx = ( + contextlib.nullcontext() + if progress_mode.startswith("thread") + else pytest.warns( + UserWarning, + match=f"Notifier thread requested, but {progress_mode} does " + "not support it, using Python wait_yield().", + ) + ) + with ctx: + ucxx.init(**kwargs)