import os
from ewoks import execute_graph
from ewokscore.task import Task
class InitIntegrator(Task, input_names=["params_file"], output_names=["integrator"]):
def run(self):
self.outputs.integrator = f"Integrator({self.inputs.params_file})"
class FindImages(
Task,
input_names=["image_directory", "integrator"],
optional_input_names=["image_index"],
output_names=["image_path", "next_image_index", "finished", "integrator"],
):
def run(self):
image_index = self.get_input_value("image_index", 0)
image_directory = self.inputs.image_directory
image_paths = [
os.path.join(image_directory, f"data{i}.tiff") for i in range(100)
]
self.outputs.image_path = image_paths[image_index]
print(f"Emit {self.outputs.image_path}")
self.outputs.next_image_index = image_index + 1
self.outputs.finished = self.outputs.next_image_index == len(image_paths)
self.outputs.integrator = self.inputs.integrator
class IntegrateImage(Task, input_names=["integrator", "image_path"]):
def run(self):
print(f"Integrate {self.inputs.image_path} with {self.inputs.integrator}")
workflow = {
"nodes": [
{
"id": "init_integrator",
"task_identifier": "__main__.InitIntegrator",
"task_type": "class",
},
{
"id": "find_images",
"task_identifier": "__main__.FindImages",
"task_type": "class",
},
{
"id": "integrate_image",
"task_identifier": "__main__.IntegrateImage",
"task_type": "class",
},
],
"links": [
{
"source": "init_integrator",
"target": "find_images",
"data_mapping": [
{"source_output": "integrator", "target_input": "integrator"}
],
},
{
"source": "find_images",
"target": "find_images",
"data_mapping": [
{"source_output": "next_image_index", "target_input": "image_index"},
],
"conditions": [{"source_output": "finished", "value": False}],
},
{
"source": "find_images",
"target": "integrate_image",
"data_mapping": [
{"source_output": "image_path", "target_input": "image_path"},
{"source_output": "integrator", "target_input": "integrator"},
],
},
],
"graph": {"id": "wf_pyFAI_1D_SAXSWAXS"},
}
if __name__ == "__main__":
inputs = [
{
"id": "init_integrator",
"name": "params_file",
"value": "/path/to/calib.poni",
},
{"id": "find_images", "name": "image_directory", "value": "/path/to/data"},
]
result = execute_graph(workflow, inputs=inputs, engine="ppf", pool_type="thread")
In GitLab by @woutdenolf on Oct 16, 2025, 19:44 GMT+2:
Migrated from GitLab: https://gitlab.esrf.fr/workflow/ewoks/ewoksppf/-/issues/22