Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b6350df
implemented timepix end-run workflow
Nov 9, 2023
73221d7
Update processing.py
JGoodrichBNL Nov 13, 2023
c90a3de
made Abby's changes
Nov 16, 2023
3bd96b6
Merge branch 'timepix-workflow' of https://github.com/JGoodrichBNL/NS…
Nov 16, 2023
231c1a4
typo fix
Nov 16, 2023
5b92726
Update processing.py
JGoodrichBNL Nov 17, 2023
cebc6fb
Update processing.py
JGoodrichBNL Nov 17, 2023
f33d157
Update processing.py
JGoodrichBNL Nov 17, 2023
798ac44
Added tpx3awkward utils.
JGoodrichBNL Nov 21, 2023
91b34b1
Fixing filepaths
JGoodrichBNL Nov 21, 2023
94b51b8
Fixing filepaths
JGoodrichBNL Nov 21, 2023
212b471
Update processing.py
JGoodrichBNL Nov 28, 2023
628f165
imported Tiled TableStructure
Nov 30, 2023
4482339
commented out tpx3 check
Nov 30, 2023
51a8952
modified detector check
Nov 30, 2023
e0d148d
edited run object query
Nov 30, 2023
e298a80
modified run object query
Nov 30, 2023
b1c17bf
modified run object query pt3
Nov 30, 2023
1fa5938
modified run object query pt4
Nov 30, 2023
335063d
imported tpx3utils function
Nov 30, 2023
a152d50
made extract_fpaths function take in a run object
Dec 1, 2023
8ce4ec6
typo fix
Dec 1, 2023
e3a698a
added log message
Dec 1, 2023
6279059
changed query for gathering raw filepaths
Dec 5, 2023
775fa7e
fixed syntax error
Dec 5, 2023
e596ef0
commented out sandbox tiled connection
Dec 5, 2023
a6b482d
fixed syntax error
Dec 5, 2023
5f8d3f5
removed compute() attribute for file path gathering
Dec 5, 2023
a823472
imported os
Dec 5, 2023
e3c406f
imported function from tpx3util
Dec 5, 2023
ecfd18e
fixed run object query
Dec 5, 2023
6d16c29
added sid to processed metadata
Dec 5, 2023
80260ff
split container into raw and cent
Dec 5, 2023
6e70db1
syntax fix
Dec 5, 2023
6edc55f
added multiprocessing
Dec 5, 2023
e6a6385
modified global variable node
Dec 5, 2023
27c22cc
imported multiprocessing
Dec 5, 2023
06b5faf
number of workers fix
Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 61 additions & 6 deletions processing.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,60 @@
from prefect import flow, task, get_run_logger
from tiled.client import from_profile
from tiled.structures.table import TableStructure
from tpx3utils import extract_fpaths_from_sid, raw_to_sorted_df
import os
import multiprocessing

node = None

tiled_client = from_profile("nsls2")["chx"]
tiled_client_chx = tiled_client["raw"]
tiled_cilent_sandbox = tiled_client["sandbox"]
tiled_cilent_processed = tiled_client["processed"]
# tiled_client_sandbox = tiled_client["sandbox"]
tiled_client_processed = tiled_client["processed"]

def get_df_uncent(run):
raw_file_paths = run['primary']['data']['tpx3_files_raw_filepaths'][0]
for file in raw_file_paths:
if (os.path.exists(file)):
yield raw_to_sorted_df(file)

def process_file(args):
file_path = args[0]
partition_num = args[1]

if (os.path.exists(file)):
df = raw_to_sorted_df(file)
node.write_partition(df, partition_num)



def insert_to_tiled(container, run):
num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images']
raw_file_paths = run['primary']['data']['tpx3_files_raw_filepaths'][0]

struct_df = raw_to_sorted_df(raw_file_paths[0])
structure = TableStructure.from_pandas(struct_df)
structure.npartitions = num_img
node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']})

args = []
for i in range(0, len(raw_file_paths)):
args.append([raw_file_paths[i], i])

num_cores = multiprocessing.cpu_count()
max_workers = num_cores-1

with multiprocessing.Pool(processes=max_workers) as pool:
pool.map(process_file, args)

# for partition_num, df in enumerate(get_df_uncent(run)):
# if (structure == None):
# structure = TableStructure.from_pandas(df)
# structure.npartitions = num_img
# node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']})

# node.write_partition(df, partition_num)


@task
def process_run(ref):
Expand All @@ -17,15 +66,19 @@ def process_run(ref):
ref : int, str
reference to BlueSky. It can be scan_id, uid or index
"""


print("REFERENCE: ")
print(ref)
logger = get_run_logger()
# Grab the BlueSky run
run = tiled_client_chx[ref]

# Grab the full uid for logging purposes
full_uid = run.start["uid"]
logger.info(f"{full_uid = }")
logger.info("Do something with this uid")
# Do some additional processing or call otehr python processing functions

insert_to_tiled(tiled_client_processed, run)


@flow
Expand All @@ -38,5 +91,7 @@ def processing_flow(ref):
ref : int, str
reference to BlueSky. It can be scan_id, uid or index
"""

process_run(ref)
run = tiled_client_chx[ref]

if (run.start['detectors'][0] == 'tpx3'):
process_run(ref)
Loading