diff --git a/.travis.yml b/.travis.yml index 89d189c7..6c36fd80 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,11 +23,11 @@ install: - python setup.py install - (cd $TRAVIS_BUILD_DIR && cd docs && make html) before_script: - - (cd $TRAVIS_BUILD_DIR && make setup_cluster) + - (cd $TRAVIS_BUILD_DIR && dacluster start) script: - (cd $TRAVIS_BUILD_DIR && make test_with_coverage) after_script: - - (cd $TRAVIS_BUILD_DIR && make teardown_cluster) + - (cd $TRAVIS_BUILD_DIR && dacluster stop) after_success: - coverage combine - coveralls diff --git a/Makefile b/Makefile index 36963a73..ebe47a57 100644 --- a/Makefile +++ b/Makefile @@ -29,9 +29,6 @@ MPI_EXEC_CMD = (${MPIEXEC} ${MPIEXEC_ARGS} ${PARALLEL_TEST} ; OUT=$$? ; \ for f in ${MPI_OUT_PREFIX}* ; do echo "====> " $$f ; cat $$f ; done ; \ exit $$OUT) -IPCLUSTER_SCRIPT := utils/ipcluster.py -PURGE_SCRIPT := utils/purge_cluster.py - # default number of engines to use. NENGINES := 4 @@ -87,34 +84,6 @@ coverage_report: ${COVERAGE} html .PHONY: coverage_report -# ---------------------------------------------------------------------------- -# Cluster management -# ---------------------------------------------------------------------------- - -setup_cluster: - ${PYTHON} ${IPCLUSTER_SCRIPT} start ${NENGINES} -.PHONY: setup_cluster - -restart_cluster: - ${PYTHON} ${IPCLUSTER_SCRIPT} restart ${NENGINES} -.PHONY: restart_cluster - -teardown_cluster: - ${PYTHON} ${IPCLUSTER_SCRIPT} stop -.PHONY: teardown_cluster - -clear_distarray: - ${PYTHON} ${IPCLUSTER_SCRIPT} reset -.PHONY: clear_distarray - -purge_cluster: - ${PYTHON} ${PURGE_SCRIPT} purge -.PHONY: purge_cluster - -dump_cluster: - ${PYTHON} ${PURGE_SCRIPT} dump -.PHONY: dump_cluster - # ---------------------------------------------------------------------------- # Cleanup. # ---------------------------------------------------------------------------- diff --git a/distarray/apps/__init__.py b/distarray/apps/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py new file mode 100755 index 00000000..204cdef5 --- /dev/null +++ b/distarray/apps/dacluster.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python +# encoding: utf-8 +# --------------------------------------------------------------------------- +# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. +# Distributed under the terms of the BSD License. See COPYING.rst. +# --------------------------------------------------------------------------- +""" +Start, stop and manage a IPython.parallel cluster. `dacluster` can take +all the commands IPython's `ipcluster` can, and a few extras that are +distarray specific. +""" + +from __future__ import print_function + +import argparse +import sys +from time import sleep +from subprocess import Popen, PIPE + +from distarray.externals import six +from distarray.context import Context + + +if six.PY2: + ipcluster_cmd = 'ipcluster' +elif six.PY3: + ipcluster_cmd = 'ipcluster3' +else: + raise NotImplementedError("Not run with Python 2 *or* 3?") + + +def start(n=4, engines=None, **kwargs): + """Convenient way to start an ipcluster for testing. + + Doesn't exit until the ipcluster prints a success message. + """ + if engines is None: + engines = "--engines=MPIEngineSetLauncher" + + cluster = Popen([ipcluster_cmd, 'start', '-n', str(n), engines], + stdout=PIPE, stderr=PIPE) + + started = "Engines appear to have started successfully" + running = "CRITICAL | Cluster is already running with" + while True: + line = cluster.stderr.readline().decode() + if not line: + break + print(line, end='') + if (started in line): + break + elif (running in line): + raise RuntimeError("ipcluster is already running.") + + +def stop(**kwargs): + """Convenient way to stop an ipcluster.""" + stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE) + + stopped = "Stopping cluster" + not_running = ("CRITICAL | Could not read pid file, cluster " + "is probably not running.") + while True: + line = stopping.stderr.readline().decode() + if not line: + break + print(line, end='') + if (stopped in line) or (not_running in line): + break + + +def restart(n=4, engines=None, **kwargs): + """Convenient way to restart an ipcluster.""" + stop() + + started = False + while not started: + sleep(2) + try: + start(n=n, engines=engines) + except RuntimeError: + pass + else: + started = True + +_RESET_ENGINE_DISTARRAY = ''' +from sys import modules +orig_mods = set(modules) +for m in modules.copy(): + if m.startswith('distarray'): + del modules[m] +deleted_mods = sorted(orig_mods - set(modules)) +''' + + +def clear(**kwargs): + from IPython.parallel import Client + c = Client() + dv = c[:] + dv.execute(_RESET_ENGINE_DISTARRAY, block=True) + mods = dv['deleted_mods'] + print("The following modules were removed from the engines' namespaces:") + for mod in mods[0]: + print(' ' + mod) + dv.clear() + + +def dump(**kwargs): + """ Print out key names that exist on the engines. """ + context = Context() + keylist = context.dump_keys(all_other_contexts=True) + num_keys = len(keylist) + print('*** %d ENGINE KEYS ***' % (num_keys)) + for key, targets in keylist: + print('%s : %r' % (key, targets)) + + +def purge(**kwargs): + """ Remove keys from the engine namespaces. """ + print('Purging keys from engines...') + context = Context() + context.cleanup(all_other_contexts=True) + + +def main(): + main_description = """ + Start, stop and manage a IPython.parallel cluster. `dacluster` can take + all the commands IPython's `ipcluster` can, and a few extras that are + distarray specific. For details on a subcommand, try `dacluster + --help`. + """ + parser = argparse.ArgumentParser(description=main_description) + + # Print help if no command line args are supplied + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + + subparsers = parser.add_subparsers() + + start_description = """ + Start a new IPython.parallel cluster. + """ + + stop_description = """ + Stop a IPython.parallel cluster. + """ + + restart_description = """ + Restart a IPython.parallel cluster. + """ + + clear_description = """ + Clear the namespace and imports on the cluster. This should be the + same as restarting the engines, but faster. + """ + + purge_description = """ + Clear all the DistArray objects from the engines. This sometimes + fails to delete all keys. + """ + + dump_description = """ + Print out key names that exist on the engines. + """ + + # subparses for all our commands + parser_start = subparsers.add_parser('start', + description=start_description) + parser_stop = subparsers.add_parser('stop', description=stop_description) + parser_restart = subparsers.add_parser('restart', + description=restart_description) + parser_clear = subparsers.add_parser('clear', + description=clear_description) + parser_purge = subparsers.add_parser('purge', + description=purge_description) + parser_dump = subparsers.add_parser('dump', description=dump_description) + + engine_help = """ + Number of engines to start. + """ + + # Add some optional arguments for `start` and `restart` + parser_start.add_argument('-n', '--n', type=int, nargs='?', default=4, + help=engine_help) + parser_restart.add_argument('-n', '--n', type=int, nargs='?', default=4, + help=engine_help) + + # set the functions each command should use + parser_start.set_defaults(func=start) + parser_stop.set_defaults(func=stop) + parser_restart.set_defaults(func=restart) + parser_clear.set_defaults(func=clear) + parser_purge.set_defaults(func=purge) + parser_dump.set_defaults(func=dump) + + # run it + args = parser.parse_args() + args.func(**vars(args)) + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py index b8de61bc..1a8fcc35 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,8 @@ 'url': 'https://github.com/enthought/distarray', 'license': 'BSD', 'platforms': ["Linux", "Mac OS-X"], + 'entry_points': {'console_scripts': ['dacluster = ' + 'distarray.apps.dacluster:main']}, 'classifiers': [c.strip() for c in """\ Development Status :: 2 - Pre-Alpha Intended Audience :: Developers diff --git a/utils/ipcluster.py b/utils/ipcluster.py deleted file mode 100644 index bb315122..00000000 --- a/utils/ipcluster.py +++ /dev/null @@ -1,120 +0,0 @@ -# encoding: utf-8 -# --------------------------------------------------------------------------- -# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. -# Distributed under the terms of the BSD License. See COPYING.rst. -# --------------------------------------------------------------------------- - -""" -Functions for starting and stopping ipclusters. -""" - -from __future__ import print_function - -from distarray.externals import six -from time import sleep -from subprocess import Popen, PIPE - - -if six.PY2: - ipcluster_cmd = 'ipcluster' -elif six.PY3: - ipcluster_cmd = 'ipcluster3' -else: - raise NotImplementedError("Not run with Python 2 *or* 3?") - - -def start(args): - """Convenient way to start an ipcluster for testing. - - Doesn't exit until the ipcluster prints a success message. - """ - nengines = args.nengines - engines = "--engines=MPIEngineSetLauncher" - cluster = Popen([ipcluster_cmd, 'start', '-n', str(nengines), engines], - stdout=PIPE, stderr=PIPE) - - started = "Engines appear to have started successfully" - running = "CRITICAL | Cluster is already running with" - while True: - line = cluster.stderr.readline().decode() - if not line: - break - print(line, end='') - if (started in line): - break - elif (running in line): - raise RuntimeError("ipcluster is already running.") - - -def stop(args): - """Convenient way to stop an ipcluster.""" - stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE) - - stopped = "Stopping cluster" - not_running = ("CRITICAL | Could not read pid file, cluster " - "is probably not running.") - while True: - line = stopping.stderr.readline().decode() - if not line: - break - print(line, end='') - if (stopped in line) or (not_running in line): - break - - -def restart(args): - """Convenient way to restart an ipcluster.""" - stop(args) - - started = False - while not started: - sleep(2) - try: - start(args) - except RuntimeError: - pass - else: - started = True - - -_RESET_ENGINE_DISTARRAY = ''' -from sys import modules -orig_mods = set(modules) -for m in modules.copy(): - if m.startswith('distarray'): - del modules[m] -deleted_mods = sorted(orig_mods - set(modules)) -''' - - -def reset(args): - from IPython.parallel import Client - c = Client() - dv = c[:] - dv.execute(_RESET_ENGINE_DISTARRAY, block=True) - mods = dv['deleted_mods'] - print("The following modules were removed from the engines' namespaces:") - for mod in mods[0]: - print(' ' + mod) - dv.clear() - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers() - - parser_start = subparsers.add_parser('start') - parser_start.add_argument('nengines', type=int) - parser_start.set_defaults(func=start) - - parser_restart = subparsers.add_parser('restart') - parser_restart.add_argument('nengines', type=int) - parser_restart.set_defaults(func=restart) - - subparsers.add_parser('stop').set_defaults(func=stop) - subparsers.add_parser('reset').set_defaults(func=reset) - - args = parser.parse_args() - args.func(args) diff --git a/utils/purge_cluster.py b/utils/purge_cluster.py deleted file mode 100644 index 7b39f1c4..00000000 --- a/utils/purge_cluster.py +++ /dev/null @@ -1,40 +0,0 @@ -# encoding: utf-8 -# --------------------------------------------------------------------------- -# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. -# Distributed under the terms of the BSD License. See COPYING.rst. -# --------------------------------------------------------------------------- - -""" Simple utility to clean out existing namespaces on engines. """ - -from __future__ import print_function - -import sys - -from distarray.context import Context - - -def dump(): - """ Print out key names that exist on the engines. """ - context = Context() - keylist = context.dump_keys(all_other_contexts=True) - num_keys = len(keylist) - print('*** %d ENGINE KEYS ***' % (num_keys)) - for key, targets in keylist: - print('%s : %r' % (key, targets)) - - -def purge(): - """ Remove keys from the engine namespaces. """ - print('Purging keys from engines...') - context = Context() - context.cleanup(all_other_contexts=True) - - -if __name__ == '__main__': - cmd = sys.argv[1] - if cmd == 'dump': - dump() - elif cmd == 'purge': - purge() - else: - raise ValueError("%s command not found" % (cmd,))