From cfcf83c53a25cdc5d0871bc34cfeb7d3fd1929c5 Mon Sep 17 00:00:00 2001 From: Javier Gonzalez Date: Tue, 1 Mar 2022 14:40:50 -0500 Subject: [PATCH 1/7] added param option which is used to override the align file --- runasp.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/runasp.py b/runasp.py index 1c590b0..e64b2fb 100644 --- a/runasp.py +++ b/runasp.py @@ -99,6 +99,9 @@ def get_options(): help='Log file (default=pipe.log)') parser.add_option('--fdc-file', help="fdc file") + parser.add_option("--param", + action='append', + help="additional params to be pset before the run") opt, args = parser.parse_args() return opt, args @@ -399,6 +402,11 @@ def run_ai(ais): if opt.fdc_file is not None: tcsh_shell("pset asp_l1_std fdc='{}'".format(opt.fdc_file), env=ascds_env, logfile=logger_fh) + if opt.param is not None and len(opt.param): + for param in opt.param: + cmd = 'pset asp_l1_std {}'.format(param) + tcsh_shell(cmd, + env=ascds_env) for ai in ais: pipe_cmd = 'flt_run_pipe -r {root} -i {indir} -o {outdir} \ From 3147417997d5c349ad1d2b18896347b3a75025d6 Mon Sep 17 00:00:00 2001 From: Javier Gonzalez Date: Wed, 2 Mar 2022 10:03:57 -0500 Subject: [PATCH 2/7] use absolute paths from the start --- runasp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runasp.py b/runasp.py index e64b2fb..d42fb4a 100644 --- a/runasp.py +++ b/runasp.py @@ -220,7 +220,7 @@ def dir_setup(dir, istart, label=None, inplace=False, rev=1): if not os.path.exists(outdir): os.makedirs(outdir) logger.info('Making output directory {}'.format(outdir)) - return workdir, indir, outdir + return os.path.abspath(workdir), os.path.abspath(indir), os.path.abspath(outdir) def link_files(dir, indir, outdir, istart, istop, obiroot, skip_slot=None): From e09f5361705592fd0bfa8aa9ef800f4f81dc0d3e Mon Sep 17 00:00:00 2001 From: Javier Gonzalez Date: Wed, 2 Mar 2022 10:05:19 -0500 Subject: [PATCH 3/7] explicitly add SYBASE vars to environment --- runasp.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runasp.py b/runasp.py index d42fb4a..984585d 100644 --- a/runasp.py +++ b/runasp.py @@ -393,6 +393,8 @@ def run_ai(ais): shell='tcsh') for var in ['ASCDS_OCAT_UNAME', 'ASCDS_OCAT_SERVER', 'ASCDS_OCAT_PWORD']: ascds_env[var] = ocat_env[var] + for var in ['SYBASE_OCS', 'SYBASE']: + ascds_env[var] = os.environ[var] logger_fh = FilelikeLogger(logger) From 45a0be08489b4efad727ed2ae00df66e26c21601 Mon Sep 17 00:00:00 2001 From: Javier Gonzalez Date: Wed, 2 Mar 2022 10:15:07 -0500 Subject: [PATCH 4/7] remove uses ot Ska.Shell.tcsh --- runasp.py | 148 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 113 insertions(+), 35 deletions(-) diff --git a/runasp.py b/runasp.py index 984585d..719644c 100644 --- a/runasp.py +++ b/runasp.py @@ -8,9 +8,11 @@ import shutil from itertools import count import numpy as np +import subprocess +import logging import Ska.arc5gl -from Ska.Shell import getenv, bash, tcsh_shell +from Ska.Shell import getenv, bash import pyyaks.logger from astropy.io import fits from mica.starcheck import get_starcheck_catalog_at_date @@ -382,6 +384,39 @@ def close(self): pass +def communicate(process, logger=None, level='WARNING', text=False): + """ + Real-time reading of a subprocess stdout. + + Parameters + ---------- + process: + process returned by subprocess.Popen + logger: logging.Logger + a logging.Logger instance + level: str or int + the logging level + text: bool + whether the process output is text or binary + """ + if type(level) is str: + level = getattr(logging, level) + if logger is None: + logger = logging.getLogger() + + while True: + if process.poll() is not None: + break + line = process.stdout.readline() + line = line if text else line.decode() + logger.log(level, line.rstrip("\n")) + + # in case the buffer is still not empty after the process ended + for line in process.stdout.readlines(): + line = line if text else line.decode() + logger.log(level, line[:-1]) + + def run_ai(ais): """ Run aspect pipeline 'flt_run_pipe' over the aspect intervals described @@ -396,27 +431,48 @@ def run_ai(ais): for var in ['SYBASE_OCS', 'SYBASE']: ascds_env[var] = os.environ[var] - logger_fh = FilelikeLogger(logger) - - loglines = tcsh_shell("punlearn asp_l1_std", - env=ascds_env, logfile=logger_fh) + proc = subprocess.run( + ['punlearn', 'asp_l1_std'], + env=ascds_env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + for line in proc.stdout.decode().split('\n'): + logger.info(line) if opt.fdc_file is not None: - tcsh_shell("pset asp_l1_std fdc='{}'".format(opt.fdc_file), - env=ascds_env, logfile=logger_fh) + proc = subprocess.run( + ['pset', 'asp_l1_std', f"fdc={opt.fdc_file}"], + env=ascds_env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + for line in proc.stdout.decode().split('\n'): + logger.info(line) + if opt.param is not None and len(opt.param): for param in opt.param: - cmd = 'pset asp_l1_std {}'.format(param) - tcsh_shell(cmd, - env=ascds_env) + proc = subprocess.run( + ['pset', 'asp_l1_std', param], + env=ascds_env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + for line in proc.stdout.decode().split('\n'): + logger.info(line) for ai in ais: - pipe_cmd = 'flt_run_pipe -r {root} -i {indir} -o {outdir} \ --t {pipe_ped} \ --a "INTERVAL_START"={istart} \ --a "INTERVAL_STOP"={istop} \ --a obiroot={obiroot} \ --a revision=1 '.format(**ai) + pipe_cmd = [ + 'flt_run_pipe', + '-r', ai['root'], + '-i', ai['indir'], + '-o', ai['outdir'], + '-t', ai['pipe_ped'], + '-a', f'INTERVAL_START={ai["istart"]}', + '-a', f'INTERVAL_STOP={ai["istop"]}', + '-a', f'obiroot={ai["obiroot"]}', + '-a', 'revision=1' + ] start_pipe = PIPES[0] stop_pipe = None @@ -434,21 +490,35 @@ def run_ai(ais): if (PIPES.index(start_pipe) > PIPES.index('check_star_data') or ((stop_pipe is not None) and (PIPES.index(stop_pipe) < PIPES.index('check_star_data')))): - pipe_cmd = pipe_cmd + f' -s {start_pipe} ' + pipe_cmd = pipe_cmd + ['-s', start_pipe] if stop_pipe is not None: - pipe_cmd = pipe_cmd + f' -S {stop_pipe} ' - logger.info('Running pipe command {}'.format( - pipe_cmd)) - tcsh_shell(pipe_cmd, - env=ascds_env, - logfile=logger_fh) + pipe_cmd = pipe_cmd + ['-S', stop_pipe] + filename = os.path.join(ai['outdir'], 'pipe_1.log') + logger.info(f'Running pipe command {" ".join(pipe_cmd)}') + logger.info(f'Log: {filename}') + proc = subprocess.Popen( + pipe_cmd, env=ascds_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + logger_2 = pyyaks.logger.get_logger( + name='pipe', level=15, filename=filename, format="%(asctime)s %(message)s" + ) + communicate(proc, logger_2, level=15) + if proc.returncode: + logger.warning('pipe 1 failed') else: - first_pipe = pipe_cmd + \ - f' -s {start_pipe} ' + " -S check_star_data" - logger.info('Running pipe command {}'.format(first_pipe)) - tcsh_shell(first_pipe, - env=ascds_env, - logfile=logger_fh) + first_pipe = pipe_cmd + ['-s', start_pipe, '-S', 'check_star_data'] + filename = os.path.join(ai['outdir'], 'pipe_1.log') + logger.info(f'Running pipe command {" ".join(first_pipe)}') + logger.info(f'Log: {filename}') + proc = subprocess.Popen( + first_pipe, env=ascds_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + logger_2 = pyyaks.logger.get_logger( + name='pipe', level=15, filename=filename, format="%(asctime)s %(message)s" + ) + communicate(proc, logger_2, level=15) + if proc.returncode: + logger.warning('pipe 1 failed') star_files = glob(os.path.join(ai['outdir'], "*stars.txt")) if not len(star_files) == 1: logger.info( @@ -457,13 +527,21 @@ def run_ai(ais): if 'skip_slot' in ai: logger.info("Cutting star as requested") cut_stars(ai) - second_pipe = pipe_cmd + f' -s check_star_data ' + second_pipe = pipe_cmd + ['-s', 'check_star_data'] if stop_pipe is not None: - second_pipe = second_pipe + f' -S {stop_pipe}' - logger.info('Running pipe command {}'.format(second_pipe)) - tcsh_shell(second_pipe, - env=ascds_env, - logfile=logger_fh) + second_pipe = second_pipe + ['-S', stop_pipe] + filename = os.path.join(ai['outdir'], 'pipe_2.log') + logger.info(f'Running pipe command {" ".join(second_pipe)}') + logger.info(f'Log: {filename}') + proc = subprocess.Popen( + second_pipe, env=ascds_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + logger_2 = pyyaks.logger.get_logger( + name='pipe', level=15, filename=filename, format="%(asctime)s %(message)s" + ) + communicate(proc, logger_2, level=15) + if proc.returncode: + logger.warning('pipe 2 failed') def mock_stars_file(opt, ai): From 174bc420f20942d1b3508cca56c551b234c74613 Mon Sep 17 00:00:00 2001 From: Javier Gonzalez Date: Wed, 2 Mar 2022 10:15:29 -0500 Subject: [PATCH 5/7] some logging --- runasp.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runasp.py b/runasp.py index 719644c..b6e379a 100644 --- a/runasp.py +++ b/runasp.py @@ -422,6 +422,7 @@ def run_ai(ais): Run aspect pipeline 'flt_run_pipe' over the aspect intervals described in the list of dictionaries passed as an argument """ + logger.info('About to get ASCDS environment') ascds_env = getenv('source /home/ascds/.ascrc -r release', shell='tcsh') ocat_env = getenv( 'source /proj/sot/ska/data/aspect_authorization/set_ascds_ocat_vars.csh', @@ -431,6 +432,7 @@ def run_ai(ais): for var in ['SYBASE_OCS', 'SYBASE']: ascds_env[var] = os.environ[var] + logger.info('About to run') proc = subprocess.run( ['punlearn', 'asp_l1_std'], env=ascds_env, @@ -441,6 +443,7 @@ def run_ai(ais): logger.info(line) if opt.fdc_file is not None: + logger.info('pset asp_l1_std fdc') proc = subprocess.run( ['pset', 'asp_l1_std', f"fdc={opt.fdc_file}"], env=ascds_env, @@ -452,6 +455,7 @@ def run_ai(ais): if opt.param is not None and len(opt.param): for param in opt.param: + logger.info(f'pset asp_l1_std {param}') proc = subprocess.run( ['pset', 'asp_l1_std', param], env=ascds_env, From f65419c93e18da570926a9d80410adea3cd2c63d Mon Sep 17 00:00:00 2001 From: Javier Gonzalez Date: Wed, 2 Mar 2022 09:42:50 -0500 Subject: [PATCH 6/7] remove remaining uses of Ska.Shell --- runasp.py | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/runasp.py b/runasp.py index b6e379a..831a122 100644 --- a/runasp.py +++ b/runasp.py @@ -12,7 +12,6 @@ import logging import Ska.arc5gl -from Ska.Shell import getenv, bash import pyyaks.logger from astropy.io import fits from mica.starcheck import get_starcheck_catalog_at_date @@ -264,8 +263,7 @@ def link_files(dir, indir, outdir, istart, istop, obiroot, skip_slot=None): if not os.path.exists(os.path.join(ldir, os.path.basename(mfile))): logger.info( "ln -s {} {}".format(os.path.relpath(mfile, ldir), ldir)) - bash("ln -s %s %s" % (os.path.relpath(mfile, ldir), ldir)) - + subprocess.run(['ln', '-s', os.path.relpath(mfile, ldir), ldir]) def make_list_files(dir, indir, outdir, root): """ @@ -637,6 +635,34 @@ def mock_cai_file(opt): fh.close() +def getenv(cmd, shell, diff=True): + from Ska.Shell.shell import _parse_keyvals, _fix_paths + p = subprocess.run(['which', shell], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if p.returncode: + raise Exception(f'Failed to find "{shell}" shell: {p.stderr.decode()}') + shell = p.stdout.decode().strip() + p = subprocess.run( + f'{cmd} && env', + shell=True, + executable=shell, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + if p.returncode: + raise Exception(f'Failed to get environment: {p.stderr.decode().strip()}') + newenv = _parse_keyvals(p.stdout.decode().split('\n')) + if diff: + deltaenv = dict() + expected_diff_set = set(('PS1', 'PS2', '_', 'SHLVL')) if shell == 'bash' else set() + currenv = dict(os.environ) + _fix_paths(newenv) + for key in set(newenv) - expected_diff_set: + if key not in currenv or currenv[key] != newenv[key]: + deltaenv[key] = newenv[key] + return deltaenv + return newenv + + def main(opt): # get files if opt.obsid: @@ -684,7 +710,7 @@ def main(opt): for mfile in match: if re.match(".*\.gz", mfile): logger.verbose('Unzipping {}'.format(mfile)) - bash("gunzip -f %s" % os.path.abspath(mfile)) + subprocess.run(['gunzip', '-f', os.path.abspath(mfile)]) # reset this to get unzipped names caiprops_files = glob(os.path.join(opt.dir, "asp05", From bad49016b680afaafaa5cb95d5aaae8384108a8b Mon Sep 17 00:00:00 2001 From: Javier Gonzalez Date: Wed, 2 Mar 2022 10:57:06 -0500 Subject: [PATCH 7/7] flake --- runasp.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/runasp.py b/runasp.py index 831a122..a519dfb 100644 --- a/runasp.py +++ b/runasp.py @@ -246,17 +246,17 @@ def link_files(dir, indir, outdir, istart, istop, obiroot, skip_slot=None): logger.verbose( "skipping file out of timerange {}".format(mfile)) continue - aca0 = re.search('aca.*_(\d)_img0', mfile) + aca0 = re.search(r'aca.*_(\d)_img0', mfile) if skip_slot and aca0: aca_file_slot = int(aca0.group(1)) if aca_file_slot in skip_slot: logger.verbose( "skipping slot file on {}".format(mfile)) continue - obsparmatch = re.match('.*obs0a\.par(\.gz)?', mfile) + obsparmatch = re.match(r'.*obs0a\.par(\.gz)?', mfile) if obsparmatch: obimatch = re.match( - '.*axaf%s_obs0a\.par(\.gz)?' % obiroot, mfile) + r'.*axaf%s_obs0a\.par(\.gz)?' % obiroot, mfile) if not obimatch: logger.verbose("skipping obspar for different obi") continue @@ -265,6 +265,7 @@ def link_files(dir, indir, outdir, istart, istop, obiroot, skip_slot=None): "ln -s {} {}".format(os.path.relpath(mfile, ldir), ldir)) subprocess.run(['ln', '-s', os.path.relpath(mfile, ldir), ldir]) + def make_list_files(dir, indir, outdir, root): """ Create .lis files for the pipeline. @@ -315,22 +316,22 @@ def get_range_ai(ai_cmds, proc_range): if not proc_range: return ai_cmds # if a single integer, return on that aspect interval - intmatch = re.match('^(\d+)$', proc_range) + intmatch = re.match(r'^(\d+)$', proc_range) if intmatch: interv = int(intmatch.group(1)) return [ai_cmds[int(intmatch.group(1))]] # if of the form 0:1, return that range of intervals # (python form, not inclusive) - imatch = re.match('^(\d+):(\d+)$', proc_range) + imatch = re.match(r'^(\d+):(\d+)$', proc_range) if imatch: return ai_cmds[int(imatch.group(1)):int(imatch.group(2))] # if of the form 1: , return range 1 -> end - omatch = re.match('^(\d+):$', proc_range) + omatch = re.match(r'^(\d+):$', proc_range) if omatch: return ai_cmds[int(omatch.group(1)):] # if of the form 0:+3000, find a tstop corresponding # to tstart of aspect interval 0 plus 3000 seconds - tmatch = re.match('^(\d+):\+(\d+)$', proc_range) + tmatch = re.match(r'^(\d+):\+(\d+)$', proc_range) if tmatch: # get n seconds of specified interval interv = int(tmatch.group(1)) @@ -351,7 +352,7 @@ def cut_stars(ai): starlines = open(starfiles[0]).read().split("\n") for slot in ai['skip_slot']: starlines = [i for i in starlines - if not re.match("^\s+{}\s+1.*".format(slot), i)] + if not re.match(r"^\s+{}\s+1.*".format(slot), i)] logger.info('Cutting stars by updating {}'.format(starfiles[0])) with open(starfiles[0], "w") as newlist: newlist.write("\n".join(starlines)) @@ -489,9 +490,9 @@ def run_ai(ais): # if the options start after or end before the stage to remove stars, # just do what is asked - if (PIPES.index(start_pipe) > PIPES.index('check_star_data') or - ((stop_pipe is not None) and - (PIPES.index(stop_pipe) < PIPES.index('check_star_data')))): + if (PIPES.index(start_pipe) > PIPES.index('check_star_data') + or ((stop_pipe is not None) + and (PIPES.index(stop_pipe) < PIPES.index('check_star_data')))): pipe_cmd = pipe_cmd + ['-s', start_pipe] if stop_pipe is not None: pipe_cmd = pipe_cmd + ['-S', stop_pipe] @@ -552,8 +553,8 @@ def mock_stars_file(opt, ai): """ sc = get_starcheck_catalog_at_date(ai['istart']) - acqs = sc['cat'][(sc['cat']['type'] == 'ACQ') | - (sc['cat']['type'] == 'BOT')] + acqs = sc['cat'][(sc['cat']['type'] == 'ACQ') + | (sc['cat']['type'] == 'BOT')] acqs.sort('slot') acqs['soe_type'] = 0 full_table = acqs[['slot', 'soe_type', 'id', 'yang', 'zang']] @@ -563,8 +564,8 @@ def mock_stars_file(opt, ai): fids['soe_type'] = 2 for f in fids[['slot', 'soe_type', 'id', 'yang', 'zang']]: full_table.add_row(f) - gui = sc['cat'][(sc['cat']['type'] == 'BOT') | - (sc['cat']['type'] == 'GUI')] + gui = sc['cat'][(sc['cat']['type'] == 'BOT') + | (sc['cat']['type'] == 'GUI')] gui.sort('slot') gui['soe_type'] = 1 for g in gui[['slot', 'soe_type', 'id', 'yang', 'zang']]: @@ -708,7 +709,7 @@ def main(opt): raise ValueError("No files found for glob %s" % fileglob) for mfile in match: - if re.match(".*\.gz", mfile): + if re.match(r".*\.gz", mfile): logger.verbose('Unzipping {}'.format(mfile)) subprocess.run(['gunzip', '-f', os.path.abspath(mfile)]) @@ -728,7 +729,7 @@ def main(opt): and "istop_%d" % interval in cai): obi[cai['obi_num']][interval] = \ {'istart': cai['istart_%d' % interval], - 'istop': cai['istop_%d' % interval]} + 'istop': cai['istop_%d' % interval]} interval += 1 ai_cmds = [] @@ -738,7 +739,7 @@ def main(opt): for ofile in obspar_files: obspar = get_obspar(ofile) if obspar['obi_num'] == obi_num: - obsmatch = re.search('axaf(.+)_obs0a\.par', ofile) + obsmatch = re.search(r'axaf(.+)_obs0a\.par', ofile) obiroot = obsmatch.group(1) if not obiroot: raise ValueError("no obspar for obi %d" % obi_num)