Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,6 @@ src/ansys/dpf/core/examples/python_plugins/gltf_plugin/requirements.txt
src/ansys/dpf/core/examples/python_plugins/gltf_plugin/texture.png
src/ansys/dpf/core/examples/python_plugins/easy_statistics.py
src/ansys/dpf/core/examples/python_plugins/gltf_plugin.xml

# Git worktrees
.worktrees/
10 changes: 10 additions & 0 deletions src/ansys/dpf/core/_model_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,22 @@ def named_selection(self, name):
return self._metadata().named_selection(name)
return None

@property
def _streams_container_direct(self):
if self._metadata():
return self._metadata()._streams_container_direct
return None

def __connect_op__(self, op, mesh_by_default=True):
"""Connect the data sources or the streams to the operator."""
if self.streams_provider is not None and hasattr(op.inputs, "streams"):
op.inputs.streams.connect(self.streams_provider.outputs)
elif self.streams_provider is not None and hasattr(op.inputs, "streams_container"):
op.inputs.streams_container.connect(self.streams_provider.outputs)
elif self._streams_container_direct is not None and hasattr(op.inputs, "streams"):
op.inputs.streams.connect(self._streams_container_direct)
elif self._streams_container_direct is not None and hasattr(op.inputs, "streams_container"):
op.inputs.streams_container.connect(self._streams_container_direct)
elif self.data_sources is not None and hasattr(op.inputs, "data_sources"):
op.inputs.data_sources.connect(self.data_sources)

Expand Down
95 changes: 72 additions & 23 deletions src/ansys/dpf/core/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ class Model:

Parameters
----------
data_sources : str, dpf.core.DataSources, os.PathLike
Accepts either a :class:`dpf.core.DataSources` instance or the path of the
result file to open as an os.PathLike object or a str. The default is ``None``.
data_sources : str, dpf.core.DataSources, dpf.core.StreamsContainer, os.PathLike
Accepts a :class:`dpf.core.DataSources` instance, a
:class:`dpf.core.StreamsContainer` instance, or the path of the result file to
open as an os.PathLike object or a str. Providing a
:class:`dpf.core.StreamsContainer` avoids reopening already-open file streams
and can improve performance when shared across multiple models or operators.
The default is ``None``.
server : server.DPFServer, optional
Server with the channel connected to the remote or local instance. The
default is ``None``, in which case an attempt is made to use the global
Expand All @@ -66,6 +70,11 @@ class Model:
>>> transient = examples.download_transient_result()
>>> model = dpf.Model(transient)

Create a model from an existing :class:`dpf.core.StreamsContainer`:

>>> streams = dpf.Model(transient).metadata.streams_provider.outputs.streams_container()
>>> model2 = dpf.Model(streams)

"""

def __init__(self, data_sources=None, server=None):
Expand All @@ -79,6 +88,19 @@ def __init__(self, data_sources=None, server=None):
self._results = None
self._mesh_by_default = True

def __del__(self):
"""Release internally created streams on garbage collection.

Streams that were provided externally via a
:class:`~ansys.dpf.core.streams_container.StreamsContainer` argument are
**not** touched — the caller retains ownership of that object's lifecycle.
"""
try:
if self._metadata is not None and self._metadata._streams_container_direct is None:
self._metadata.release_streams()
except Exception: # pylint: disable=broad-except
pass

@property
def metadata(self):
"""Model metadata.
Expand Down Expand Up @@ -278,20 +300,22 @@ def mesh_by_default(self, value):


class Metadata:
"""Contains the metadata of a data source.
"""Contains the metadata of a data source or streams container.

Parameters
----------
data_sources : DataSources

data_sources : DataSources, StreamsContainer, str, os.PathLike
Accepts a :class:`~ansys.dpf.core.data_sources.DataSources` instance, a
:class:`~ansys.dpf.core.streams_container.StreamsContainer` instance, a file
path, or ``None``.
server : server.DPFServer
Server with the channel connected to the remote or local instance.

"""

def __init__(self, data_sources, server):
self._server = server
self._set_data_sources(data_sources)
self._streams_container_direct = None
self._meshed_region = None
self._meshes_container = None
self._result_info = None
Expand All @@ -300,7 +324,7 @@ def __init__(self, data_sources, server):
self._time_freq_support = None
self._mesh_selection_manager = None
self._mesh_provider_cached_instance = None
self._cache_streams_provider()
self._set_data_sources(data_sources)

def _cache_result_info(self):
"""Store result information."""
Expand All @@ -314,6 +338,9 @@ def _cache_mesh_info(self):

def _cache_streams_provider(self):
"""Create a stream provider and cache it."""
if self._streams_container_direct is not None:
# Streams are provided directly; no need to create a provider operator.
return
from ansys.dpf.core import operators

if hasattr(operators, "metadata") and hasattr(operators.metadata, "stream_provider"):
Expand All @@ -328,11 +355,27 @@ def _cache_streams_provider(self):
except:
self._stream_provider = None

@property
def _streams_container_connectable(self):
"""Return a connectable streams container for operator inputs.

Returns either the output pin of the cached streams provider operator, or the
:class:`~ansys.dpf.core.streams_container.StreamsContainer` passed directly at
construction time. ``None`` is returned when no streams are available.
"""
if self._streams_container_direct is not None:
return self._streams_container_direct
if self._stream_provider is not None:
return self._stream_provider.outputs.streams_container
return None

def release_streams(self):
"""Release the streams if any."""
if self.streams_provider is not None:
sc = self.streams_provider.outputs.streams_container()
sc.release_handles()
elif self._streams_container_direct is not None:
self._streams_container_direct.release_handles()

@property
@protect_source_op_not_found
Expand Down Expand Up @@ -373,10 +416,9 @@ def time_freq_support(self):
time_provider: dpf.core.operators.metadata.time_freq_provider = Operator(
name="TimeFreqSupportProvider", server=self._server
)
if self._stream_provider:
time_provider.inputs.streams_container.connect(
self._stream_provider.outputs.streams_container
)
sc = self._streams_container_connectable
if sc is not None:
time_provider.inputs.streams_container.connect(sc)
else:
time_provider.inputs.data_sources.connect(self.data_sources)
self._time_freq_support = time_provider.get_output(0, types.time_freq_support)
Expand Down Expand Up @@ -437,7 +479,12 @@ def streams_provider(self):
def _set_data_sources(self, var_inp):
from pathlib import PurePath

if isinstance(var_inp, dpf.core.DataSources):
from ansys.dpf.core.streams_container import StreamsContainer

if isinstance(var_inp, StreamsContainer):
self._streams_container_direct = var_inp
self._data_sources = var_inp.datasources
elif isinstance(var_inp, dpf.core.DataSources):
self._data_sources = var_inp
elif isinstance(var_inp, (str, PurePath)):
self._data_sources = DataSources(var_inp, server=self._server)
Expand All @@ -448,7 +495,7 @@ def _set_data_sources(self, var_inp):
def _load_result_info(self):
"""Return a result info object."""
op = Operator("ResultInfoProvider", server=self._server)
op.inputs.streams_container.connect(self._stream_provider.outputs.streams_container)
op.inputs.streams_container.connect(self._streams_container_connectable)
try:
result_info = op.get_output(0, types.result_info)
except Exception as e:
Expand All @@ -464,7 +511,11 @@ def _load_result_info(self):
def _load_mesh_info(self):
"""Return a mesh info object."""
op = Operator("mesh_info_provider", server=self._server)
op.inputs.connect(self._stream_provider.outputs)
sc = self._streams_container_connectable
if sc is not None:
op.inputs.streams_container.connect(sc)
else:
op.inputs.connect(self._stream_provider.outputs)
try:
mesh_info = op.outputs.mesh_info()
except Exception as e:
Expand Down Expand Up @@ -512,10 +563,9 @@ def mesh_provider(self) -> mesh_provider_op:
mesh_provider: dpf.core.operators.mesh.mesh_provider = Operator(
name="MeshProvider", server=self._server
)
if self._stream_provider:
mesh_provider.inputs.streams_container.connect(
self._stream_provider.outputs.streams_container
)
sc = self._streams_container_connectable
if sc is not None:
mesh_provider.inputs.streams_container.connect(sc)
else:
mesh_provider.inputs.data_sources.connect(self.data_sources)
return mesh_provider
Expand Down Expand Up @@ -589,10 +639,9 @@ def meshes_provider(self):
meshes_provider: dpf.core.operators.mesh.meshes_provider = Operator(
name="meshes_provider", server=self._server
)
if self._stream_provider:
meshes_provider.inputs.streams_container.connect(
self._stream_provider.outputs.streams_container
)
sc = self._streams_container_connectable
if sc is not None:
meshes_provider.inputs.streams_container.connect(sc)
else:
meshes_provider.inputs.data_sources.connect(self.data_sources)
return meshes_provider
Expand Down
114 changes: 114 additions & 0 deletions tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
# SOFTWARE.

import functools
import gc
from unittest.mock import patch

import numpy as np
import pytest
Expand Down Expand Up @@ -258,6 +260,118 @@ def test_model_meshes_provider(simple_bar):
assert meshes[0].nodes.n_nodes == 3751


# ---------------------------------------------------------------------------
# StreamsContainer-based construction
# ---------------------------------------------------------------------------


def _get_streams_container(path):
"""Helper: open a path with a plain Model and return its StreamsContainer.

Skips the calling test automatically when the active server uses a gRPC
communication protocol, because StreamsContainer is InProcess-only.
"""
if dpf.core._global_server().has_client():
pytest.skip("StreamsContainer is only available with an InProcess server")
base_model = dpf.core.Model(path)
return base_model.metadata.streams_provider.outputs.streams_container()


def test_model_from_streams_container(simple_bar):
sc = _get_streams_container(simple_bar)
model = dpf.core.Model(sc)
assert model is not None
assert "displacement" in model.metadata.result_info


def test_model_metadata_from_streams_container(simple_bar):
sc = _get_streams_container(simple_bar)
model = dpf.core.Model(sc)
assert model.metadata.result_info is not None
assert model.metadata.time_freq_support is not None
assert model.metadata.meshed_region is not None
# data_sources is populated from the StreamsContainer's own DataSources
assert model.metadata.data_sources is not None
assert len(model.metadata.data_sources.result_files) > 0


def test_model_meshed_region_from_streams_container(simple_bar):
sc = _get_streams_container(simple_bar)
model = dpf.core.Model(sc)
mesh = model.metadata.meshed_region
assert mesh.nodes.n_nodes == 3751


def test_model_results_from_streams_container(simple_bar):
sc = _get_streams_container(simple_bar)
model = dpf.core.Model(sc)
# results should be queryable and give correct data
disp_fc = model.results.displacement.eval()
assert len(disp_fc) > 0
assert disp_fc[0].data.shape[1] == 3 # 3-component displacement


def test_model_operator_from_streams_container(simple_bar):
sc = _get_streams_container(simple_bar)
model = dpf.core.Model(sc)
op = model.operator("U")
fc = op.outputs.fields_container()
assert len(fc) > 0


def test_model_streams_provider_is_none_for_direct_streams(simple_bar):
"""When a StreamsContainer is passed directly no extra streams_provider is created."""
sc = _get_streams_container(simple_bar)
model = dpf.core.Model(sc)
assert model.metadata.streams_provider is None
assert model.metadata._streams_container_direct is sc
# data_sources must be populated from the StreamsContainer, not empty
assert len(model.metadata.data_sources.result_files) > 0


def test_model_results_match_between_datasources_and_streams_container(simple_bar):
"""Results produced from a StreamsContainer must match those from a DataSources."""
if dpf.core._global_server().has_client():
pytest.skip("StreamsContainer is only available with an InProcess server")
model_ds = dpf.core.Model(dpf.core.DataSources(simple_bar))
sc = model_ds.metadata.streams_provider.outputs.streams_container()
model_sc = dpf.core.Model(sc)

field_ds = model_ds.results.displacement.eval()[0]
field_sc = model_sc.results.displacement.eval()[0]
iden = dpf.core.operators.logic.identical_fields(field_ds, field_sc)
assert iden.outputs.boolean()


def test_model_release_streams_from_streams_container(simple_bar):
"""release_streams() must not raise when using a direct StreamsContainer."""
sc = _get_streams_container(simple_bar)
model = dpf.core.Model(sc)
# Should not raise
model.metadata.release_streams()


def test_model_del_releases_internally_created_streams(simple_bar):
"""Model.__del__ must release streams created internally (from path / DataSources)."""
model = dpf.core.Model(simple_bar)
metadata = model.metadata # ensure lazy-init is done
with patch.object(metadata, "release_streams") as mock_release:
del model
gc.collect()
mock_release.assert_called_once()


def test_model_del_does_not_release_external_streams(simple_bar):
"""Model.__del__ must NOT release a StreamsContainer that was provided by the caller."""
sc = _get_streams_container(simple_bar)
model = dpf.core.Model(sc)
metadata = model.metadata # ensure lazy-init is done
with patch.object(metadata, "release_streams") as mock_release:
del model
gc.collect()
mock_release.assert_not_called()


# @pytest.mark.skipif(NO_PLOTTING, reason="Requires system to support plotting")
# def test_displacements_plot(static_model):
# from pyvista import CameraPosition
Expand Down
Loading