Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions python/ucxx/_lib_async/application_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import threading
import warnings
import weakref
from queue import Queue

Expand Down Expand Up @@ -50,6 +51,12 @@ def __init__(
enable_python_future, self.progress_mode
)

logger.info(
f"Progress mode: {self.progress_mode}, "
f"delayed submission: {enable_delayed_submission}, "
f"Python future: {enable_python_future}"
)

self.exchange_peer_info_timeout = exchange_peer_info_timeout

# For now, a application context only has one worker
Expand All @@ -71,11 +78,10 @@ def __init__(

@staticmethod
def _check_progress_mode(progress_mode):
if progress_mode is None:
if "UCXPY_PROGRESS_MODE" in os.environ:
progress_mode = os.environ["UCXPY_PROGRESS_MODE"]
else:
progress_mode = "thread"
if "UCXPY_PROGRESS_MODE" in os.environ:
progress_mode = os.environ["UCXPY_PROGRESS_MODE"]
elif progress_mode is None:
progress_mode = "thread"

valid_progress_modes = ["polling", "thread", "thread-polling"]
if not isinstance(progress_mode, str) or not any(
Expand All @@ -90,15 +96,12 @@ def _check_progress_mode(progress_mode):

@staticmethod
def _check_enable_delayed_submission(enable_delayed_submission, progress_mode):
if enable_delayed_submission is None:
if "UCXPY_ENABLE_DELAYED_SUBMISSION" in os.environ:
explicit_enable_delayed_submission = (
False
if os.environ["UCXPY_ENABLE_DELAYED_SUBMISSION"] == "0"
else True
)
else:
explicit_enable_delayed_submission = progress_mode.startswith("thread")
if "UCXPY_ENABLE_DELAYED_SUBMISSION" in os.environ:
explicit_enable_delayed_submission = (
False if os.environ["UCXPY_ENABLE_DELAYED_SUBMISSION"] == "0" else True
)
elif enable_delayed_submission is None:
explicit_enable_delayed_submission = progress_mode.startswith("thread")
else:
explicit_enable_delayed_submission = enable_delayed_submission

Expand All @@ -115,18 +118,17 @@ def _check_enable_delayed_submission(enable_delayed_submission, progress_mode):

@staticmethod
def _check_enable_python_future(enable_python_future, progress_mode):
if enable_python_future is None:
if "UCXPY_ENABLE_PYTHON_FUTURE" in os.environ:
explicit_enable_python_future = (
os.environ["UCXPY_ENABLE_PYTHON_FUTURE"] != "0"
)
else:
explicit_enable_python_future = False
if "UCXPY_ENABLE_PYTHON_FUTURE" in os.environ:
explicit_enable_python_future = (
os.environ["UCXPY_ENABLE_PYTHON_FUTURE"] != "0"
)
elif enable_python_future is None:
explicit_enable_python_future = False
else:
explicit_enable_python_future = enable_python_future

if not progress_mode.startswith("thread") and explicit_enable_python_future:
logger.warning(
warnings.warn(
f"Notifier thread requested, but {progress_mode} does not "
"support it, using Python wait_yield()."
)
Expand Down
71 changes: 71 additions & 0 deletions python/ucxx/_lib_async/tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

import contextlib
import os
from unittest.mock import patch

Expand Down Expand Up @@ -91,3 +92,73 @@ def test_logging():
ucxx.init(options)

assert len(foreign_log.getvalue()) == 0


@pytest.mark.parametrize(
"progress_mode", ["blocking", "polling", "thread", "thread-polling"]
)
@pytest.mark.asyncio
async def test_python_future_warnings_env(progress_mode):
with patch.dict(
os.environ,
{"UCXPY_PROGRESS_MODE": progress_mode, "UCXPY_ENABLE_PYTHON_FUTURE": "1"},
):
ctx = (
contextlib.nullcontext()
if progress_mode.startswith("thread")
else pytest.warns(
UserWarning,
match=f"Notifier thread requested, but {progress_mode} does "
"not support it, using Python wait_yield().",
)
)
with ctx:
ucxx.init()


@pytest.mark.parametrize(
"progress_mode", ["blocking", "polling", "thread", "thread-polling"]
)
@pytest.mark.asyncio
async def test_python_future_warnings_init_options(progress_mode):
ctx = (
contextlib.nullcontext()
if progress_mode.startswith("thread")
else pytest.warns(
UserWarning,
match=f"Notifier thread requested, but {progress_mode} does "
"not support it, using Python wait_yield().",
)
)
with ctx:
ucxx.init(progress_mode=progress_mode, enable_python_future=True)


@pytest.mark.parametrize(
"progress_mode", ["blocking", "polling", "thread", "thread-polling"]
)
@pytest.mark.asyncio
async def test_python_future_warnings_init_options_and_env(progress_mode):
# Environment variables have higher priority. To test that we're getting the
# proper warnings (or not getting them), we use reverse conditions here as to
# what should trigger them with environment variables.
kwargs = {
"progress_mode": "blocking" if progress_mode.startswith("thread") else "thread",
"enable_python_future": False,
}

with patch.dict(
os.environ,
{"UCXPY_PROGRESS_MODE": progress_mode, "UCXPY_ENABLE_PYTHON_FUTURE": "1"},
):
ctx = (
contextlib.nullcontext()
if progress_mode.startswith("thread")
else pytest.warns(
UserWarning,
match=f"Notifier thread requested, but {progress_mode} does "
"not support it, using Python wait_yield().",
)
)
with ctx:
ucxx.init(**kwargs)