diff --git a/bin/dm_link.py b/bin/dm_link.py index 756a1201..06cd4dad 100755 --- a/bin/dm_link.py +++ b/bin/dm_link.py @@ -74,6 +74,7 @@ import glob import logging import os +import re import sys from docopt import docopt @@ -245,6 +246,8 @@ def get_scanid_from_lookup_table(archive_path): """ global lookup basename = os.path.basename(os.path.normpath(archive_path)) + # Strip whitespace, because the scans.csv entries can't contain them + basename = re.sub(r'\s+', '', basename) source_name = basename[:-len(datman.utils.get_extension(basename))] lookupinfo = lookup[lookup["source_name"] == source_name] diff --git a/bin/dm_qc_report.py b/bin/dm_qc_report.py index ba71033e..78fa8e76 100755 --- a/bin/dm_qc_report.py +++ b/bin/dm_qc_report.py @@ -20,6 +20,8 @@ --log-to-server If set, all log messages will also be sent to the configured logging server. This is useful when the script is run on the queue, since it swallows logging. + --max-mem STR The maximum memory required if submitted to the + queue. Should include the units (e.g. GB) [default: 5GB] -q --quiet Only report errors -v --verbose Be chatty -d --debug Be extra chatty @@ -64,6 +66,7 @@ def main(): REMAKE = arguments["--remake"] REFRESH = arguments["--refresh"] use_server = arguments["--log-to-server"] + max_mem = arguments["--max-mem"] verbose = arguments["--verbose"] debug = arguments["--debug"] quiet = arguments["--quiet"] @@ -81,7 +84,7 @@ def main(): logger.setLevel(logging.DEBUG) if not session: - return submit_subjects(config) + return submit_subjects(config, max_mem=max_mem) if not datman.dashboard.dash_found: logger.error("Dashboard database not found, can't run.") @@ -131,11 +134,13 @@ def add_server_handler(config): logger.addHandler(server_handler) -def submit_subjects(config): +def submit_subjects(config, max_mem="5GB"): """Submit a job for each subject in the study that still needs metrics. Args: config (:obj:`datman.config.config`): A config object for the study. + max_mem (int, optional): The maximum amount of memory needed for jobs. + Default = 5GB. """ missing_cmds = check_prerequisites() if missing_cmds: @@ -157,7 +162,7 @@ def submit_subjects(config): logger.info(f"Submitting QC job for {subject}.") datman.utils.submit_job( command, job_name, "/tmp", system=config.system, - argslist="--mem=5G" + argslist=f"--mem={max_mem}" ) diff --git a/bin/dm_sftp.py b/bin/dm_sftp.py index 1442de3c..496068a2 100755 --- a/bin/dm_sftp.py +++ b/bin/dm_sftp.py @@ -8,6 +8,12 @@ : Short name of the study to process Options: + --before Only download scans added to the server before + the given date. Date format is expected to be + YYYY-MM-DD. Can be combined with --after. + --after Only download scans added to the server after + the given date. Date format is expected to be + YYYY-MM-DD. Can be combined with --before. -h --help Show this screen. -q --quiet Suppress output. -v --verbose Show more output. @@ -15,10 +21,12 @@ --dry-run """ +import fnmatch import logging import os import shutil -import fnmatch +import time +from datetime import datetime import pysftp @@ -39,6 +47,10 @@ def main(): quiet = arguments["--quiet"] study = arguments[""] + start_date, end_date = get_date_range( + arguments["--after"], arguments["--before"] + ) + # setup logging log_level = logging.WARN @@ -108,7 +120,40 @@ def main(): # process each folder in turn logger.debug("Copying from:{} to:{}" .format(valid_dir, zips_path)) - process_dir(sftp, valid_dir, zips_path) + process_dir(sftp, valid_dir, zips_path, + start=start_date, end=end_date) + + +def get_date_range(after_str=None, before_str=None): + if before_str: + end_date = parse_date(before_str) + else: + end_date = None + + if after_str: + start_date = parse_date(after_str) + else: + start_date = None + + if start_date and end_date and start_date > end_date: + raise ValueError( + "Start of date range is after the cut off date. When both " + "--after and --before are in use, --after date must precede " + "--before date" + ) + + return start_date, end_date + + +def parse_date(date_str): + try: + dt = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError as e: + raise ValueError( + "Invalid date format given, expected format YYYY-MM-DD" + ) from e + parsed = time.mktime(dt.timetuple()) + return parsed def get_server_config(cfg): @@ -204,7 +249,7 @@ def get_valid_remote_dirs(connection, mrfolders): return valid_dirs -def process_dir(connection, directory, zips_path): +def process_dir(connection, directory, zips_path, start=None, end=None): """Process a directory on the ftp server, copy new files to zips_path """ @@ -218,14 +263,18 @@ def process_dir(connection, directory, zips_path): return for file_name in files: if connection.isfile(file_name): - get_file(connection, file_name, zips_path) + get_file(connection, file_name, zips_path, + start=start, end=end) else: - get_folder(connection, file_name, zips_path) + get_folder(connection, file_name, zips_path, + start=start, end=end) -def get_folder(connection, folder_name, dst_path): +def get_folder(connection, folder_name, dst_path, start=None, end=None): expected_file = os.path.join(dst_path, folder_name + ".zip") - if not download_needed(connection, folder_name, expected_file): + if not download_needed(connection, folder_name, expected_file, + start=start, end=end + ): logger.debug("File: {} already exists, skipping".format(folder_name)) return @@ -239,26 +288,34 @@ def get_folder(connection, folder_name, dst_path): new_zip)) -def get_file(connection, file_name, zips_path): +def get_file(connection, file_name, zips_path, start=None, end=None): target = os.path.join(zips_path, file_name) - if download_needed(connection, file_name, target): + if download_needed(connection, file_name, target, start=start, end=end): logger.info("Copying new remote file: {}".format(file_name)) connection.get(file_name, target, preserve_mtime=True) else: logger.debug("File: {} already exists, skipping".format(file_name)) -def download_needed(sftp, filename, target): +def download_needed(sftp, filename, target, start=None, end=None): """Check if a local copy of the file exists, If no local copy exists return True If local copy exists and is older than remote return True otherwise return false""" + + remote_mtime = sftp.stat(filename).st_mtime + + if start and remote_mtime < start: + return False + + if end and remote_mtime > end: + return False + if not os.path.isfile(target): return True # check the file modification times local_mtime = os.path.getmtime(target) - remote_mtime = sftp.stat(filename).st_mtime if local_mtime < remote_mtime: return True diff --git a/bin/dm_xnat_extract.py b/bin/dm_xnat_extract.py index 9d73f436..49eeff38 100755 --- a/bin/dm_xnat_extract.py +++ b/bin/dm_xnat_extract.py @@ -386,7 +386,12 @@ def collect_zips(config, args): return [] if args.experiment: - ident = get_identifier(config, args.experiment) + try: + ident = get_identifier(config, args.experiment) + except datman.exceptions.ParseException as e: + logger.error(f"Ignoring invalid ID {args.experiment} - {e}") + return [] + if not ident: logger.error(f"Invalid session ID {args.experiment}.") return [] @@ -400,12 +405,22 @@ def collect_zips(config, args): zip_files = [] for zip_path in glob.glob(os.path.join(zip_folder, "*.zip")): + if not os.path.exists(zip_path): + logger.debug(f"Broken symlink found. Ignoring {zip_path}") + continue + sess_name = os.path.basename(zip_path).replace(".zip", "") - ident = get_identifier(config, sess_name) + try: + ident = get_identifier(config, sess_name) + except datman.exceptions.ParseException as e: + logger.error(f"Ignoring invalid ID {sess_name} - {e}") + continue + if not ident: logger.error( f"Ignoring invalid zip file name in dicom dir: {sess_name}") continue + zip_files.append( (None, datman.importers.ZipImporter(ident, zip_path)) ) diff --git a/bin/dm_xnat_upload.py b/bin/dm_xnat_upload.py index e0f7264c..876fe973 100755 --- a/bin/dm_xnat_upload.py +++ b/bin/dm_xnat_upload.py @@ -156,6 +156,28 @@ def process_archive(file_name, dicom_dir): exper_id)) return + + # Hold off on resource uploads until scan data exists. + # We upload using xnat's direct-to-archive options, but if any data + # exists for a session (even just resources) during upload then + # everything gets punted to the prearchive and ends up requiring a manual + # merge. + if data_exists and not resource_exists: + logger.debug("Uploading resource from: {}".format(archive_file)) + try: + upload_non_dicom_data(archive_file, xnat_subject.project, scanid, + xnat) + except Exception as e: + logger.debug("An exception occurred: {}".format(e)) + pass + elif not data_exists and not resource_exists: + logger.info( + f"Skipping resource upload of {archive_file} until scans exist. " + "Resource files will upload on a later run once scan upload " + "complete." + ) + + if not data_exists: logger.info("Uploading dicoms from {}".format(archive_file)) try: @@ -166,14 +188,6 @@ def process_archive(file_name, dicom_dir): .format(archive_file, xnat_subject.project, xnat_subject.name, e)) - if not resource_exists: - logger.debug("Uploading resource from: {}".format(archive_file)) - try: - upload_non_dicom_data(archive_file, xnat_subject.project, scanid, - xnat) - except Exception as e: - logger.debug("An exception occurred: {}".format(e)) - pass def get_xnat_subject(ident, xnat): diff --git a/datman/exporters/bids.py b/datman/exporters/bids.py index d9af464f..9e59b426 100644 --- a/datman/exporters/bids.py +++ b/datman/exporters/bids.py @@ -137,7 +137,11 @@ def export(self, raw_data_dir: str, **kwargs): self.make_output_dir() try: - self.run_dcm2bids(raw_data_dir, force_dcm2niix=force_dcm2niix) + self.run_dcm2bids( + raw_data_dir, + force_dcm2niix=force_dcm2niix, + refresh=self.opts.refresh + ) except Exception as e: logger.error(f"Failed to extract to BIDs - {e}") @@ -178,7 +182,7 @@ def _get_scan_dir(self, download_dir: str, refresh: bool = False) -> str: if refresh: # Use existing tmp_dir instead of raw dcms return self.bids_tmp - return download_dir + return os.path.join(download_dir, self.dcm_dir) def make_command( self, raw_data_dir: str, force_dcm2niix: bool = False diff --git a/datman/exporters/bids_legacy.py b/datman/exporters/bids_legacy.py index 06026987..24631931 100644 --- a/datman/exporters/bids_legacy.py +++ b/datman/exporters/bids_legacy.py @@ -189,8 +189,12 @@ def check_contents(self, expected, actual): misnamed[actual_name] = expected_name continue - expected_name = expected[scan][0] actual_name = actual[scan][0] + if "blacklisted" in actual_name: + # Do not 'fix' blacklisted scans by moving them back. + continue + + expected_name = expected[scan][0] if expected_name == actual_name: continue misnamed[actual_name] = expected_name diff --git a/datman/importers.py b/datman/importers.py index 3e9d6d1d..d48f2083 100644 --- a/datman/importers.py +++ b/datman/importers.py @@ -1072,7 +1072,7 @@ def __init__(self, ident, zip_path): self.contents = self.parse_contents() self.scans = self.get_scans() self.resource_files = self.contents['resources'] - self.dcm_subdir = os.path.split(self.scans[0].series_dir)[0] + self.dcm_subdir = self.get_dcm_subdir() self.date = self.scans[0].date @property @@ -1138,6 +1138,15 @@ def dcm_subdir(self) -> str: def dcm_subdir(self, value: str): self._dcm_subdir = value + def get_dcm_subdir(self) -> str: + """Find the common parent folder for all scan dicoms. + """ + series_dirs = [scan.series_dir for scan in self.scans] + prefix = os.path.commonpath(series_dirs) + if prefix == '': + return os.path.split(self.scans[0].series_dir)[0] + return prefix + def is_shared(self) -> bool: # Can't track shared sessions with zip files. return False diff --git a/datman/metrics.py b/datman/metrics.py index e31aef26..e2b8ad8d 100644 --- a/datman/metrics.py +++ b/datman/metrics.py @@ -229,23 +229,14 @@ class DTIMetrics(MetricDTI): "_b0.png": QCOutput(2, "b0 Montage") }, "qc-dti": { - "_qascripts_dti.csv": None, "_stats.csv": None, "_directions.png": QCOutput(3, "bvec Directions") - }, - "qc-spikecount": { - "_spikecount.csv": None } } def generate(self, img_gap=2, width=1600): self.run(f"qc-dti {self.input} {self.bvec} {self.bval} " f"{self.output_root}", "qc-dti") - - self.run(f"qc-spikecount {self.input} " - f"{self.output_root + '_spikecount.csv'} {self.bval}", - "qc-spikecount") - self.make_montage(self.output_root + "_montage.png") self.make_image(self.output_root + "_b0.png", img_gap, width) @@ -267,10 +258,6 @@ class FMRIMetrics(Metric): "_scanlengths.csv": None }, "qc-fmri": { - "_fd.csv": None, - "_qascripts_bold.csv": None, - "_spectra.csv": None, - "_stats.csv": None, "_sfnr.nii.gz": None, "_corr.nii.gz": None },