Skip to content

xgboost.dask broken for larger clusters #11461

@jrbourbeau

Description

@jrbourbeau

In distributed=2025.4.0, client.scheduler_info() started truncating the number of worker info returned by default dask/distributed#9045 (this scaled very poorly for large clusters).

Today while running this example on a large Dask cluster on Coiled with xgboost=3.0.1 I got this error:

KeyError                                  Traceback (most recent call last)
Cell In[5], line 20
     17 d_train = xgboost.dask.DaskDMatrix(None, X_train, y_train, enable_categorical=True)
     19 print("Training ...")
---> 20 model = xgboost.dask.train(
     21     None,
     22     {"tree_method": "hist"},
     23     d_train,
     24     num_boost_round=4,
     25     evals=[(d_train, "train")],
     26 )
     28 print("Scoring ...")
     29 predictions = xgboost.dask.predict(None, model, X_test)

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/core.py:729](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/core.py#line=728), in require_keyword_args.<locals>.throw_if.<locals>.inner_f(*args, **kwargs)
    727 for k, arg in zip(sig.parameters, args):
    728     kwargs[k] = arg
--> 729 return func(**kwargs)

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py:927](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py#line=926), in train(client, params, dtrain, num_boost_round, evals, obj, early_stopping_rounds, xgb_model, verbose_eval, callbacks, custom_metric, coll_cfg)
    925 client = _get_client(client)
    926 args = locals()
--> 927 return client.sync(
    928     _train_async,
    929     global_config=config.get_config(),
    930     dconfig=_get_dask_config(),
    931     **args,
    932 )

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/distributed/utils.py:376](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/distributed/utils.py#line=375), in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    374     return future
    375 else:
--> 376     return sync(
    377         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    378     )

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/distributed/utils.py:452](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/distributed/utils.py#line=451), in sync(loop, func, callback_timeout, *args, **kwargs)
    449         wait(10)
    451 if error is not None:
--> 452     raise error
    453 else:
    454     return result

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/distributed/utils.py:426](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/distributed/utils.py#line=425), in sync.<locals>.f()
    424         awaitable = wait_for(awaitable, timeout)
    425     future = asyncio.ensure_future(awaitable)
--> 426     result = yield future
    427 except Exception as exception:
    428     error = exception

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/tornado/gen.py:766](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/tornado/gen.py#line=765), in Runner.run(self)
    764 try:
    765     try:
--> 766         value = future.result()
    767     except Exception as e:
    768         # Save the exception for later. It's important that
    769         # gen.throw() not be called inside this try[/except](http://localhost:8888/except) block
    770         # because that makes sys.exc_info behave unexpectedly.
    771         exc: Optional[Exception] = e

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py:860](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py#line=859), in _train_async(client, global_config, dconfig, params, dtrain, num_boost_round, evals, obj, early_stopping_rounds, verbose_eval, xgb_model, callbacks, custom_metric, coll_cfg)
    857     evals_name = []
    858     evals_id = []
--> 860 result = await map_worker_partitions(
    861     client,
    862     do_train,
    863     # extra function parameters
    864     params,
    865     coll_args,
    866     id(dtrain),
    867     evals_name,
    868     evals_id,
    869     *([dtrain] + evals_data),
    870     # workers to be used for training
    871     workers=workers,
    872 )
    873 return result

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py:594](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py#line=593), in map_worker_partitions(client, func, workers, *refs)
    592 bag = db.from_delayed(futures)
    593 fut = await bag.reduction(first_valid, first_valid)
--> 594 result = await client.compute(fut).result()
    596 return result

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/distributed/client.py:400](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/distributed/client.py#line=399), in Future._result(self, raiseit)
    398 if raiseit:
    399     typ, exc, tb = exc
--> 400     raise exc.with_traceback(tb)
    401 else:
    402     return exc

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py:558](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py#line=557), in fn()
    552     raise ValueError(
    553         f"Invalid worker address: {worker.address}, expecting {_address}. "
    554         "This is likely caused by one of the workers died and Dask "
    555         "re-scheduled a different one. Resilience is not yet supported."
    556     )
    557 # Turn result into a list for bag construction
--> 558 return [func(*args, **kwargs)]

File [~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py:815](http://localhost:8888/~/miniforge3/envs/examples/lib/python3.10/site-packages/xgboost/dask/__init__.py#line=814), in do_train()
    812 local_history: TrainingCallback.EvalsLog = {}
    813 global_config.update({"nthread": n_threads})
--> 815 with CommunicatorContext(**coll_args), config.config_context(**global_config):
    816     Xy, evals = _get_dmatrices(
    817         train_ref,
    818         train_id,
   (...)
    822         n_threads=n_threads,
    823     )
    825     booster = worker_train(
    826         params=local_param,
    827         dtrain=Xy,
   (...)
    836         callbacks=callbacks,
    837     )

File [/opt/coiled/env/lib/python3.10/site-packages/xgboost/dask/__init__.py:259](http://localhost:8888/opt/coiled/env/lib/python3.10/site-packages/xgboost/dask/__init__.py#line=258), in __init__()

KeyError: 'tls://10.0.3.248:42285'

which I think is coming from here

worker = distributed.get_worker()
with distributed.worker_client() as client:
info = client.scheduler_info()
w = info["workers"][worker.address]
wid = w["id"]

due to the smaller number of default workers returned by scheduler_into().

A quick fix would be to set n_workes=-1 in that scheduler_info() call.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions