Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 173 additions & 54 deletions runasp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import shutil
from itertools import count
import numpy as np
import subprocess
import logging

import Ska.arc5gl
from Ska.Shell import getenv, bash, tcsh_shell
import pyyaks.logger
from astropy.io import fits
from mica.starcheck import get_starcheck_catalog_at_date
Expand Down Expand Up @@ -99,6 +100,9 @@ def get_options():
help='Log file (default=pipe.log)')
parser.add_option('--fdc-file',
help="fdc file")
parser.add_option("--param",
action='append',
help="additional params to be pset before the run")
opt, args = parser.parse_args()
return opt, args

Expand Down Expand Up @@ -217,7 +221,7 @@ def dir_setup(dir, istart, label=None, inplace=False, rev=1):
if not os.path.exists(outdir):
os.makedirs(outdir)
logger.info('Making output directory {}'.format(outdir))
return workdir, indir, outdir
return os.path.abspath(workdir), os.path.abspath(indir), os.path.abspath(outdir)


def link_files(dir, indir, outdir, istart, istop, obiroot, skip_slot=None):
Expand All @@ -242,24 +246,24 @@ def link_files(dir, indir, outdir, istart, istop, obiroot, skip_slot=None):
logger.verbose(
"skipping file out of timerange {}".format(mfile))
continue
aca0 = re.search('aca.*_(\d)_img0', mfile)
aca0 = re.search(r'aca.*_(\d)_img0', mfile)
if skip_slot and aca0:
aca_file_slot = int(aca0.group(1))
if aca_file_slot in skip_slot:
logger.verbose(
"skipping slot file on {}".format(mfile))
continue
obsparmatch = re.match('.*obs0a\.par(\.gz)?', mfile)
obsparmatch = re.match(r'.*obs0a\.par(\.gz)?', mfile)
if obsparmatch:
obimatch = re.match(
'.*axaf%s_obs0a\.par(\.gz)?' % obiroot, mfile)
r'.*axaf%s_obs0a\.par(\.gz)?' % obiroot, mfile)
if not obimatch:
logger.verbose("skipping obspar for different obi")
continue
if not os.path.exists(os.path.join(ldir, os.path.basename(mfile))):
logger.info(
"ln -s {} {}".format(os.path.relpath(mfile, ldir), ldir))
bash("ln -s %s %s" % (os.path.relpath(mfile, ldir), ldir))
subprocess.run(['ln', '-s', os.path.relpath(mfile, ldir), ldir])


def make_list_files(dir, indir, outdir, root):
Expand Down Expand Up @@ -312,22 +316,22 @@ def get_range_ai(ai_cmds, proc_range):
if not proc_range:
return ai_cmds
# if a single integer, return on that aspect interval
intmatch = re.match('^(\d+)$', proc_range)
intmatch = re.match(r'^(\d+)$', proc_range)
if intmatch:
interv = int(intmatch.group(1))
return [ai_cmds[int(intmatch.group(1))]]
# if of the form 0:1, return that range of intervals
# (python form, not inclusive)
imatch = re.match('^(\d+):(\d+)$', proc_range)
imatch = re.match(r'^(\d+):(\d+)$', proc_range)
if imatch:
return ai_cmds[int(imatch.group(1)):int(imatch.group(2))]
# if of the form 1: , return range 1 -> end
omatch = re.match('^(\d+):$', proc_range)
omatch = re.match(r'^(\d+):$', proc_range)
if omatch:
return ai_cmds[int(omatch.group(1)):]
# if of the form 0:+3000, find a tstop corresponding
# to tstart of aspect interval 0 plus 3000 seconds
tmatch = re.match('^(\d+):\+(\d+)$', proc_range)
tmatch = re.match(r'^(\d+):\+(\d+)$', proc_range)
if tmatch:
# get n seconds of specified interval
interv = int(tmatch.group(1))
Expand All @@ -348,7 +352,7 @@ def cut_stars(ai):
starlines = open(starfiles[0]).read().split("\n")
for slot in ai['skip_slot']:
starlines = [i for i in starlines
if not re.match("^\s+{}\s+1.*".format(slot), i)]
if not re.match(r"^\s+{}\s+1.*".format(slot), i)]
logger.info('Cutting stars by updating {}'.format(starfiles[0]))
with open(starfiles[0], "w") as newlist:
newlist.write("\n".join(starlines))
Expand Down Expand Up @@ -379,34 +383,99 @@ def close(self):
pass


def communicate(process, logger=None, level='WARNING', text=False):
"""
Real-time reading of a subprocess stdout.

Parameters
----------
process:
process returned by subprocess.Popen
logger: logging.Logger
a logging.Logger instance
level: str or int
the logging level
text: bool
whether the process output is text or binary
"""
if type(level) is str:
level = getattr(logging, level)
if logger is None:
logger = logging.getLogger()

while True:
if process.poll() is not None:
break
line = process.stdout.readline()
line = line if text else line.decode()
logger.log(level, line.rstrip("\n"))

# in case the buffer is still not empty after the process ended
for line in process.stdout.readlines():
line = line if text else line.decode()
logger.log(level, line[:-1])


def run_ai(ais):
"""
Run aspect pipeline 'flt_run_pipe' over the aspect intervals described
in the list of dictionaries passed as an argument
"""
logger.info('About to get ASCDS environment')
ascds_env = getenv('source /home/ascds/.ascrc -r release', shell='tcsh')
ocat_env = getenv(
'source /proj/sot/ska/data/aspect_authorization/set_ascds_ocat_vars.csh',
shell='tcsh')
for var in ['ASCDS_OCAT_UNAME', 'ASCDS_OCAT_SERVER', 'ASCDS_OCAT_PWORD']:
ascds_env[var] = ocat_env[var]

logger_fh = FilelikeLogger(logger)

loglines = tcsh_shell("punlearn asp_l1_std",
env=ascds_env, logfile=logger_fh)
for var in ['SYBASE_OCS', 'SYBASE']:
ascds_env[var] = os.environ[var]

logger.info('About to run')
proc = subprocess.run(
['punlearn', 'asp_l1_std'],
env=ascds_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
for line in proc.stdout.decode().split('\n'):
logger.info(line)

if opt.fdc_file is not None:
tcsh_shell("pset asp_l1_std fdc='{}'".format(opt.fdc_file),
env=ascds_env, logfile=logger_fh)
logger.info('pset asp_l1_std fdc')
proc = subprocess.run(
['pset', 'asp_l1_std', f"fdc={opt.fdc_file}"],
env=ascds_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
for line in proc.stdout.decode().split('\n'):
logger.info(line)

if opt.param is not None and len(opt.param):
for param in opt.param:
logger.info(f'pset asp_l1_std {param}')
proc = subprocess.run(
['pset', 'asp_l1_std', param],
env=ascds_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
for line in proc.stdout.decode().split('\n'):
logger.info(line)

for ai in ais:
pipe_cmd = 'flt_run_pipe -r {root} -i {indir} -o {outdir} \
-t {pipe_ped} \
-a "INTERVAL_START"={istart} \
-a "INTERVAL_STOP"={istop} \
-a obiroot={obiroot} \
-a revision=1 '.format(**ai)
pipe_cmd = [
'flt_run_pipe',
'-r', ai['root'],
'-i', ai['indir'],
'-o', ai['outdir'],
'-t', ai['pipe_ped'],
'-a', f'INTERVAL_START={ai["istart"]}',
'-a', f'INTERVAL_STOP={ai["istop"]}',
'-a', f'obiroot={ai["obiroot"]}',
'-a', 'revision=1'
]

start_pipe = PIPES[0]
stop_pipe = None
Expand All @@ -421,24 +490,38 @@ def run_ai(ais):

# if the options start after or end before the stage to remove stars,
# just do what is asked
if (PIPES.index(start_pipe) > PIPES.index('check_star_data') or
((stop_pipe is not None) and
(PIPES.index(stop_pipe) < PIPES.index('check_star_data')))):
pipe_cmd = pipe_cmd + f' -s {start_pipe} '
if (PIPES.index(start_pipe) > PIPES.index('check_star_data')
or ((stop_pipe is not None)
and (PIPES.index(stop_pipe) < PIPES.index('check_star_data')))):
pipe_cmd = pipe_cmd + ['-s', start_pipe]
if stop_pipe is not None:
pipe_cmd = pipe_cmd + f' -S {stop_pipe} '
logger.info('Running pipe command {}'.format(
pipe_cmd))
tcsh_shell(pipe_cmd,
env=ascds_env,
logfile=logger_fh)
pipe_cmd = pipe_cmd + ['-S', stop_pipe]
filename = os.path.join(ai['outdir'], 'pipe_1.log')
logger.info(f'Running pipe command {" ".join(pipe_cmd)}')
logger.info(f'Log: {filename}')
proc = subprocess.Popen(
pipe_cmd, env=ascds_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
logger_2 = pyyaks.logger.get_logger(
name='pipe', level=15, filename=filename, format="%(asctime)s %(message)s"
)
communicate(proc, logger_2, level=15)
if proc.returncode:
logger.warning('pipe 1 failed')
else:
first_pipe = pipe_cmd + \
f' -s {start_pipe} ' + " -S check_star_data"
logger.info('Running pipe command {}'.format(first_pipe))
tcsh_shell(first_pipe,
env=ascds_env,
logfile=logger_fh)
first_pipe = pipe_cmd + ['-s', start_pipe, '-S', 'check_star_data']
filename = os.path.join(ai['outdir'], 'pipe_1.log')
logger.info(f'Running pipe command {" ".join(first_pipe)}')
logger.info(f'Log: {filename}')
proc = subprocess.Popen(
first_pipe, env=ascds_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
logger_2 = pyyaks.logger.get_logger(
name='pipe', level=15, filename=filename, format="%(asctime)s %(message)s"
)
communicate(proc, logger_2, level=15)
if proc.returncode:
logger.warning('pipe 1 failed')
star_files = glob(os.path.join(ai['outdir'], "*stars.txt"))
if not len(star_files) == 1:
logger.info(
Expand All @@ -447,13 +530,21 @@ def run_ai(ais):
if 'skip_slot' in ai:
logger.info("Cutting star as requested")
cut_stars(ai)
second_pipe = pipe_cmd + f' -s check_star_data '
second_pipe = pipe_cmd + ['-s', 'check_star_data']
if stop_pipe is not None:
second_pipe = second_pipe + f' -S {stop_pipe}'
logger.info('Running pipe command {}'.format(second_pipe))
tcsh_shell(second_pipe,
env=ascds_env,
logfile=logger_fh)
second_pipe = second_pipe + ['-S', stop_pipe]
filename = os.path.join(ai['outdir'], 'pipe_2.log')
logger.info(f'Running pipe command {" ".join(second_pipe)}')
logger.info(f'Log: {filename}')
proc = subprocess.Popen(
second_pipe, env=ascds_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
logger_2 = pyyaks.logger.get_logger(
name='pipe', level=15, filename=filename, format="%(asctime)s %(message)s"
)
communicate(proc, logger_2, level=15)
if proc.returncode:
logger.warning('pipe 2 failed')


def mock_stars_file(opt, ai):
Expand All @@ -462,8 +553,8 @@ def mock_stars_file(opt, ai):
"""
sc = get_starcheck_catalog_at_date(ai['istart'])

acqs = sc['cat'][(sc['cat']['type'] == 'ACQ') |
(sc['cat']['type'] == 'BOT')]
acqs = sc['cat'][(sc['cat']['type'] == 'ACQ')
| (sc['cat']['type'] == 'BOT')]
acqs.sort('slot')
acqs['soe_type'] = 0
full_table = acqs[['slot', 'soe_type', 'id', 'yang', 'zang']]
Expand All @@ -473,8 +564,8 @@ def mock_stars_file(opt, ai):
fids['soe_type'] = 2
for f in fids[['slot', 'soe_type', 'id', 'yang', 'zang']]:
full_table.add_row(f)
gui = sc['cat'][(sc['cat']['type'] == 'BOT') |
(sc['cat']['type'] == 'GUI')]
gui = sc['cat'][(sc['cat']['type'] == 'BOT')
| (sc['cat']['type'] == 'GUI')]
gui.sort('slot')
gui['soe_type'] = 1
for g in gui[['slot', 'soe_type', 'id', 'yang', 'zang']]:
Expand Down Expand Up @@ -545,6 +636,34 @@ def mock_cai_file(opt):
fh.close()


def getenv(cmd, shell, diff=True):
from Ska.Shell.shell import _parse_keyvals, _fix_paths
p = subprocess.run(['which', shell], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p.returncode:
raise Exception(f'Failed to find "{shell}" shell: {p.stderr.decode()}')
shell = p.stdout.decode().strip()
p = subprocess.run(
f'{cmd} && env',
shell=True,
executable=shell,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
if p.returncode:
raise Exception(f'Failed to get environment: {p.stderr.decode().strip()}')
newenv = _parse_keyvals(p.stdout.decode().split('\n'))
if diff:
deltaenv = dict()
expected_diff_set = set(('PS1', 'PS2', '_', 'SHLVL')) if shell == 'bash' else set()
currenv = dict(os.environ)
_fix_paths(newenv)
for key in set(newenv) - expected_diff_set:
if key not in currenv or currenv[key] != newenv[key]:
deltaenv[key] = newenv[key]
return deltaenv
return newenv


def main(opt):
# get files
if opt.obsid:
Expand Down Expand Up @@ -590,9 +709,9 @@ def main(opt):
raise ValueError("No files found for glob %s"
% fileglob)
for mfile in match:
if re.match(".*\.gz", mfile):
if re.match(r".*\.gz", mfile):
logger.verbose('Unzipping {}'.format(mfile))
bash("gunzip -f %s" % os.path.abspath(mfile))
subprocess.run(['gunzip', '-f', os.path.abspath(mfile)])

# reset this to get unzipped names
caiprops_files = glob(os.path.join(opt.dir, "asp05",
Expand All @@ -610,7 +729,7 @@ def main(opt):
and "istop_%d" % interval in cai):
obi[cai['obi_num']][interval] = \
{'istart': cai['istart_%d' % interval],
'istop': cai['istop_%d' % interval]}
'istop': cai['istop_%d' % interval]}
interval += 1

ai_cmds = []
Expand All @@ -620,7 +739,7 @@ def main(opt):
for ofile in obspar_files:
obspar = get_obspar(ofile)
if obspar['obi_num'] == obi_num:
obsmatch = re.search('axaf(.+)_obs0a\.par', ofile)
obsmatch = re.search(r'axaf(.+)_obs0a\.par', ofile)
obiroot = obsmatch.group(1)
if not obiroot:
raise ValueError("no obspar for obi %d" % obi_num)
Expand Down