From 7dae10cdb7760cb1b167ffae9bf4a836a4a42651 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 9 Apr 2014 12:39:20 -0500 Subject: [PATCH 01/19] Move util scripts to apps. --- utils/ipcluster.py => distarray/apps/ipcluster_tools.py | 0 {utils => distarray/apps}/purge_cluster.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename utils/ipcluster.py => distarray/apps/ipcluster_tools.py (100%) rename {utils => distarray/apps}/purge_cluster.py (100%) diff --git a/utils/ipcluster.py b/distarray/apps/ipcluster_tools.py similarity index 100% rename from utils/ipcluster.py rename to distarray/apps/ipcluster_tools.py diff --git a/utils/purge_cluster.py b/distarray/apps/purge_cluster.py similarity index 100% rename from utils/purge_cluster.py rename to distarray/apps/purge_cluster.py From ac9d7c4af5221aac94b0d599e77abd8738b63611 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 9 Apr 2014 12:41:56 -0500 Subject: [PATCH 02/19] Add CLI entry point for cluster management. --- distarray/apps/dacluster.py | 49 +++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100755 distarray/apps/dacluster.py diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py new file mode 100755 index 00000000..36a2e994 --- /dev/null +++ b/distarray/apps/dacluster.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python + +import argparse +import sys + +import ipcluster_tools +import purge_cluster + + +class ArgumentParser(argparse.ArgumentParser): + def error(self, message): + # We failed parsing the args, pass them directly to ipcluster + # to see if it can handle them. + ipcluster_tools.run_ipcluster(sys.argv[1:]) + + +description = """ +Start, stop and manage a IPython.parallel cluster. `dacluster` can take +all the commands IPython's `ipcluster` can, and a few extras that are +distarray specific. +""" +parser = ArgumentParser(description=description) + +# Print help if no command line args are supplied +if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + + +subparsers = parser.add_subparsers(help='subparsers') + +# subparses for all our commands +parser_start = subparsers.add_parser('start') +parser_stop = subparsers.add_parser('stop') +parser_restart = subparsers.add_parser('restart') +parser_clear = subparsers.add_parser('clear') +parser_purge = subparsers.add_parser('purge') + +# set the functions each command should use +parser_start.set_defaults(func=ipcluster_tools.start) +parser_stop.set_defaults(func=ipcluster_tools.stop) +parser_restart.set_defaults(func=ipcluster_tools.restart) +parser_clear.set_defaults(func=ipcluster_tools.clear) +parser_purge.set_defaults(func=purge_cluster.purge) + + +def main(): + args = parser.parse_args() + args.func() From a51ed4aa5117eb219ac4d8b900406c565a0ddb25 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 9 Apr 2014 12:47:28 -0500 Subject: [PATCH 03/19] Changes to ipcluster_tools for a nicer cli. --- distarray/apps/ipcluster_tools.py | 51 ++++++++++++++----------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/distarray/apps/ipcluster_tools.py b/distarray/apps/ipcluster_tools.py index bb315122..5ebe1bd8 100644 --- a/distarray/apps/ipcluster_tools.py +++ b/distarray/apps/ipcluster_tools.py @@ -10,6 +10,7 @@ from __future__ import print_function +import sys from distarray.externals import six from time import sleep from subprocess import Popen, PIPE @@ -23,14 +24,23 @@ raise NotImplementedError("Not run with Python 2 *or* 3?") -def start(args): +def run_ipcluster(args): + """Takes a list of arguments to pass to ipcluster, then tries to + open it. + """ + command = [ipcluster_cmd].extend(args) + Popen(command, stdout=PIPE, stderr=PIPE) + + +def start(n=4, engines=None): """Convenient way to start an ipcluster for testing. Doesn't exit until the ipcluster prints a success message. """ - nengines = args.nengines - engines = "--engines=MPIEngineSetLauncher" - cluster = Popen([ipcluster_cmd, 'start', '-n', str(nengines), engines], + if engines is None: + engines = "--engines=MPIEngineSetLauncher" + + cluster = Popen([ipcluster_cmd, 'start', '-n', str(n), engines], stdout=PIPE, stderr=PIPE) started = "Engines appear to have started successfully" @@ -46,7 +56,7 @@ def start(args): raise RuntimeError("ipcluster is already running.") -def stop(args): +def stop(): """Convenient way to stop an ipcluster.""" stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE) @@ -62,21 +72,20 @@ def stop(args): break -def restart(args): +def restart(): """Convenient way to restart an ipcluster.""" - stop(args) + stop() started = False while not started: sleep(2) try: - start(args) + start() except RuntimeError: pass else: started = True - _RESET_ENGINE_DISTARRAY = ''' from sys import modules orig_mods = set(modules) @@ -87,7 +96,7 @@ def restart(args): ''' -def reset(args): +def clear(): from IPython.parallel import Client c = Client() dv = c[:] @@ -100,21 +109,7 @@ def reset(args): if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers() - - parser_start = subparsers.add_parser('start') - parser_start.add_argument('nengines', type=int) - parser_start.set_defaults(func=start) - - parser_restart = subparsers.add_parser('restart') - parser_restart.add_argument('nengines', type=int) - parser_restart.set_defaults(func=restart) - - subparsers.add_parser('stop').set_defaults(func=stop) - subparsers.add_parser('reset').set_defaults(func=reset) - - args = parser.parse_args() - args.func(args) + cmd = sys.argv[1] + if cmd not in 'start stop restart reset'.split(): + sys.exit("Error: %r not a valid command." % cmd) + globals()[cmd]() From 6aed21ef3ee8a5e76755928abb0f42080ece8f4f Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 9 Apr 2014 12:47:55 -0500 Subject: [PATCH 04/19] Add cli entry_point to setup.py --- setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.py b/setup.py index b8de61bc..1a8fcc35 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,8 @@ 'url': 'https://github.com/enthought/distarray', 'license': 'BSD', 'platforms': ["Linux", "Mac OS-X"], + 'entry_points': {'console_scripts': ['dacluster = ' + 'distarray.apps.dacluster:main']}, 'classifiers': [c.strip() for c in """\ Development Status :: 2 - Pre-Alpha Intended Audience :: Developers From c904ee1c717d279d0743fdee81fde406c72238f7 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 9 Apr 2014 14:03:06 -0500 Subject: [PATCH 05/19] Use dacluster in .travis.yml --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 89d189c7..6c36fd80 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,11 +23,11 @@ install: - python setup.py install - (cd $TRAVIS_BUILD_DIR && cd docs && make html) before_script: - - (cd $TRAVIS_BUILD_DIR && make setup_cluster) + - (cd $TRAVIS_BUILD_DIR && dacluster start) script: - (cd $TRAVIS_BUILD_DIR && make test_with_coverage) after_script: - - (cd $TRAVIS_BUILD_DIR && make teardown_cluster) + - (cd $TRAVIS_BUILD_DIR && dacluster stop) after_success: - coverage combine - coveralls From 737d774ef16f4013266178d6930b7ec332e73f7e Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 9 Apr 2014 14:03:29 -0500 Subject: [PATCH 06/19] Remove cluster management from Makefile. --- Makefile | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/Makefile b/Makefile index 36963a73..ebe47a57 100644 --- a/Makefile +++ b/Makefile @@ -29,9 +29,6 @@ MPI_EXEC_CMD = (${MPIEXEC} ${MPIEXEC_ARGS} ${PARALLEL_TEST} ; OUT=$$? ; \ for f in ${MPI_OUT_PREFIX}* ; do echo "====> " $$f ; cat $$f ; done ; \ exit $$OUT) -IPCLUSTER_SCRIPT := utils/ipcluster.py -PURGE_SCRIPT := utils/purge_cluster.py - # default number of engines to use. NENGINES := 4 @@ -87,34 +84,6 @@ coverage_report: ${COVERAGE} html .PHONY: coverage_report -# ---------------------------------------------------------------------------- -# Cluster management -# ---------------------------------------------------------------------------- - -setup_cluster: - ${PYTHON} ${IPCLUSTER_SCRIPT} start ${NENGINES} -.PHONY: setup_cluster - -restart_cluster: - ${PYTHON} ${IPCLUSTER_SCRIPT} restart ${NENGINES} -.PHONY: restart_cluster - -teardown_cluster: - ${PYTHON} ${IPCLUSTER_SCRIPT} stop -.PHONY: teardown_cluster - -clear_distarray: - ${PYTHON} ${IPCLUSTER_SCRIPT} reset -.PHONY: clear_distarray - -purge_cluster: - ${PYTHON} ${PURGE_SCRIPT} purge -.PHONY: purge_cluster - -dump_cluster: - ${PYTHON} ${PURGE_SCRIPT} dump -.PHONY: dump_cluster - # ---------------------------------------------------------------------------- # Cleanup. # ---------------------------------------------------------------------------- From 3a3d62c56b98996574ea8adaecb7e9d94372282f Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 9 Apr 2014 14:42:59 -0500 Subject: [PATCH 07/19] Forgot the __init__.py in apps/ --- distarray/apps/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 distarray/apps/__init__.py diff --git a/distarray/apps/__init__.py b/distarray/apps/__init__.py new file mode 100644 index 00000000..e69de29b From 3a0f771497404317badcbd3410f1e2fb8f088dff Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Wed, 9 Apr 2014 18:11:27 -0500 Subject: [PATCH 08/19] Py3 compatible imports. --- distarray/apps/dacluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py index 36a2e994..39e240a1 100755 --- a/distarray/apps/dacluster.py +++ b/distarray/apps/dacluster.py @@ -3,8 +3,8 @@ import argparse import sys -import ipcluster_tools -import purge_cluster +from . import ipcluster_tools +from . import purge_cluster class ArgumentParser(argparse.ArgumentParser): From 57792a4fc8a425097592bf22b3a73d9536ad2139 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 11 Apr 2014 11:14:33 -0500 Subject: [PATCH 09/19] File metadata. --- distarray/apps/dacluster.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py index 39e240a1..cac7529d 100755 --- a/distarray/apps/dacluster.py +++ b/distarray/apps/dacluster.py @@ -1,4 +1,14 @@ #!/usr/bin/env python +# encoding: utf-8 +# --------------------------------------------------------------------------- +# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. +# Distributed under the terms of the BSD License. See COPYING.rst. +# --------------------------------------------------------------------------- +""" +Start, stop and manage a IPython.parallel cluster. `dacluster` can take +all the commands IPython's `ipcluster` can, and a few extras that are +distarray specific. +""" import argparse import sys From 4c9ffeb188c322e723cfccb5822aa8f04ab29f4d Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 11 Apr 2014 11:17:04 -0500 Subject: [PATCH 10/19] Make functions more amenable to *args, and **kwargs for argparse. --- distarray/apps/ipcluster_tools.py | 12 ++++++------ distarray/apps/purge_cluster.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/distarray/apps/ipcluster_tools.py b/distarray/apps/ipcluster_tools.py index 5ebe1bd8..b577858e 100644 --- a/distarray/apps/ipcluster_tools.py +++ b/distarray/apps/ipcluster_tools.py @@ -24,7 +24,7 @@ raise NotImplementedError("Not run with Python 2 *or* 3?") -def run_ipcluster(args): +def run_ipcluster(*args, **kwargs): """Takes a list of arguments to pass to ipcluster, then tries to open it. """ @@ -32,7 +32,7 @@ def run_ipcluster(args): Popen(command, stdout=PIPE, stderr=PIPE) -def start(n=4, engines=None): +def start(*args, n=4, engines=None, **kwargs): """Convenient way to start an ipcluster for testing. Doesn't exit until the ipcluster prints a success message. @@ -56,7 +56,7 @@ def start(n=4, engines=None): raise RuntimeError("ipcluster is already running.") -def stop(): +def stop(*args, **kwargs): """Convenient way to stop an ipcluster.""" stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE) @@ -72,7 +72,7 @@ def stop(): break -def restart(): +def restart(*args, n=4, engines=None, **kwargs): """Convenient way to restart an ipcluster.""" stop() @@ -80,7 +80,7 @@ def restart(): while not started: sleep(2) try: - start() + start(n=n, engines=engines) except RuntimeError: pass else: @@ -96,7 +96,7 @@ def restart(): ''' -def clear(): +def clear(*args, **kwargs): from IPython.parallel import Client c = Client() dv = c[:] diff --git a/distarray/apps/purge_cluster.py b/distarray/apps/purge_cluster.py index 7b39f1c4..7be1c247 100644 --- a/distarray/apps/purge_cluster.py +++ b/distarray/apps/purge_cluster.py @@ -13,7 +13,7 @@ from distarray.context import Context -def dump(): +def dump(*args, **kwargs): """ Print out key names that exist on the engines. """ context = Context() keylist = context.dump_keys(all_other_contexts=True) @@ -23,7 +23,7 @@ def dump(): print('%s : %r' % (key, targets)) -def purge(): +def purge(*args, **kwargs): """ Remove keys from the engine namespaces. """ print('Purging keys from engines...') context = Context() From f00bffb9215e76c6c4618229b9ac4ad55240d5c4 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 11 Apr 2014 11:19:05 -0500 Subject: [PATCH 11/19] Add help text. Move module level code into main(). Add dump. --- distarray/apps/dacluster.py | 94 +++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py index cac7529d..30312228 100755 --- a/distarray/apps/dacluster.py +++ b/distarray/apps/dacluster.py @@ -24,36 +24,80 @@ def error(self, message): ipcluster_tools.run_ipcluster(sys.argv[1:]) -description = """ -Start, stop and manage a IPython.parallel cluster. `dacluster` can take -all the commands IPython's `ipcluster` can, and a few extras that are -distarray specific. -""" -parser = ArgumentParser(description=description) +def main(): + main_description = """ + Start, stop and manage a IPython.parallel cluster. `dacluster` can take + all the commands IPython's `ipcluster` can, and a few extras that are + distarray specific. + """ + parser = ArgumentParser(description=main_description) -# Print help if no command line args are supplied -if len(sys.argv) == 1: - parser.print_help() - sys.exit(1) + # Print help if no command line args are supplied + if len(sys.argv) == 1: + parser.print_help() + sys.exit(1) + subparsers = parser.add_subparsers() -subparsers = parser.add_subparsers(help='subparsers') + start_description = """ + Start a new IPython.parallel cluster. + """ -# subparses for all our commands -parser_start = subparsers.add_parser('start') -parser_stop = subparsers.add_parser('stop') -parser_restart = subparsers.add_parser('restart') -parser_clear = subparsers.add_parser('clear') -parser_purge = subparsers.add_parser('purge') + stop_description = """ + Stop a IPython.parallel cluster. + """ -# set the functions each command should use -parser_start.set_defaults(func=ipcluster_tools.start) -parser_stop.set_defaults(func=ipcluster_tools.stop) -parser_restart.set_defaults(func=ipcluster_tools.restart) -parser_clear.set_defaults(func=ipcluster_tools.clear) -parser_purge.set_defaults(func=purge_cluster.purge) + restart_description = """ + Restart a IPython.parallel cluster. + """ + clear_description = """ + Clear the namespace and imports on the cluster. This should be the + same as restarting the engines, but faster. + """ -def main(): + purge_description = """ + Clear all the DistArray objects from the engines. This has a few + leaks. + """ + + dump_description = """ + Print out key names that exist on the engines. + """ + + # subparses for all our commands + parser_start = subparsers.add_parser('start', + description=start_description) + parser_stop = subparsers.add_parser('stop', description=stop_description) + parser_restart = subparsers.add_parser('restart', + description=restart_description) + parser_clear = subparsers.add_parser('clear', + description=clear_description) + parser_purge = subparsers.add_parser('purge', + description=purge_description) + parser_dump = subparsers.add_parser('dump', description=dump_description) + + engine_help = """ + Number of engines to start. + """ + + # Add some optional arguments for `start` and `restart` + parser_start.add_argument('-n', '--n', type=int, nargs='?', default=4, + help=engine_help) + parser_restart.add_argument('-n', '--n', type=int, nargs='?', default=4, + help=engine_help) + + # set the functions each command should use + parser_start.set_defaults(func=ipcluster_tools.start) + parser_stop.set_defaults(func=ipcluster_tools.stop) + parser_restart.set_defaults(func=ipcluster_tools.restart) + parser_clear.set_defaults(func=ipcluster_tools.clear) + parser_purge.set_defaults(func=purge_cluster.purge) + parser_dump.set_defaults(func=purge_cluster.dump) + + # run it args = parser.parse_args() - args.func() + args.func(**vars(args)) + +if __name__ == '__main__': + main() From 997ba09da1e2fbc55f252555e67ea23d8184d42c Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 11 Apr 2014 14:52:30 -0500 Subject: [PATCH 12/19] Make args python 2 compatible. --- distarray/apps/ipcluster_tools.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distarray/apps/ipcluster_tools.py b/distarray/apps/ipcluster_tools.py index b577858e..c40e4093 100644 --- a/distarray/apps/ipcluster_tools.py +++ b/distarray/apps/ipcluster_tools.py @@ -32,7 +32,7 @@ def run_ipcluster(*args, **kwargs): Popen(command, stdout=PIPE, stderr=PIPE) -def start(*args, n=4, engines=None, **kwargs): +def start(n=4, engines=None, *args, **kwargs): """Convenient way to start an ipcluster for testing. Doesn't exit until the ipcluster prints a success message. @@ -72,7 +72,7 @@ def stop(*args, **kwargs): break -def restart(*args, n=4, engines=None, **kwargs): +def restart(n=4, engines=None, *args, **kwargs): """Convenient way to restart an ipcluster.""" stop() From 60797ad259340f321d354880e9c628ab6a9bb7c8 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 14 Apr 2014 20:24:46 -0500 Subject: [PATCH 13/19] Remove a bad idea --- distarray/apps/dacluster.py | 9 +-------- distarray/apps/ipcluster_tools.py | 8 -------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py index 30312228..c1c9df05 100755 --- a/distarray/apps/dacluster.py +++ b/distarray/apps/dacluster.py @@ -17,20 +17,13 @@ from . import purge_cluster -class ArgumentParser(argparse.ArgumentParser): - def error(self, message): - # We failed parsing the args, pass them directly to ipcluster - # to see if it can handle them. - ipcluster_tools.run_ipcluster(sys.argv[1:]) - - def main(): main_description = """ Start, stop and manage a IPython.parallel cluster. `dacluster` can take all the commands IPython's `ipcluster` can, and a few extras that are distarray specific. """ - parser = ArgumentParser(description=main_description) + parser = argparse.ArgumentParser(description=main_description) # Print help if no command line args are supplied if len(sys.argv) == 1: diff --git a/distarray/apps/ipcluster_tools.py b/distarray/apps/ipcluster_tools.py index c40e4093..3ae2b18b 100644 --- a/distarray/apps/ipcluster_tools.py +++ b/distarray/apps/ipcluster_tools.py @@ -24,14 +24,6 @@ raise NotImplementedError("Not run with Python 2 *or* 3?") -def run_ipcluster(*args, **kwargs): - """Takes a list of arguments to pass to ipcluster, then tries to - open it. - """ - command = [ipcluster_cmd].extend(args) - Popen(command, stdout=PIPE, stderr=PIPE) - - def start(n=4, engines=None, *args, **kwargs): """Convenient way to start an ipcluster for testing. From 8c3200feb493af0b56d19460eada3bd318123b9b Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 14 Apr 2014 20:25:17 -0500 Subject: [PATCH 14/19] Better docstrings in dacluster. --- distarray/apps/dacluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py index c1c9df05..05aa8114 100755 --- a/distarray/apps/dacluster.py +++ b/distarray/apps/dacluster.py @@ -50,8 +50,8 @@ def main(): """ purge_description = """ - Clear all the DistArray objects from the engines. This has a few - leaks. + Clear all the DistArray objects from the engines. This sometimes + fails to delete all keys. """ dump_description = """ From d96de8644ed48c7a818d4c1db98f3171f0b82744 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 14 Apr 2014 20:36:58 -0500 Subject: [PATCH 15/19] Merge purge_cluster.py into ipcluster_tools.py --- distarray/apps/dacluster.py | 5 ++-- distarray/apps/ipcluster_tools.py | 17 +++++++++++++ distarray/apps/purge_cluster.py | 40 ------------------------------- 3 files changed, 19 insertions(+), 43 deletions(-) delete mode 100644 distarray/apps/purge_cluster.py diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py index 05aa8114..49fa2ee4 100755 --- a/distarray/apps/dacluster.py +++ b/distarray/apps/dacluster.py @@ -14,7 +14,6 @@ import sys from . import ipcluster_tools -from . import purge_cluster def main(): @@ -85,8 +84,8 @@ def main(): parser_stop.set_defaults(func=ipcluster_tools.stop) parser_restart.set_defaults(func=ipcluster_tools.restart) parser_clear.set_defaults(func=ipcluster_tools.clear) - parser_purge.set_defaults(func=purge_cluster.purge) - parser_dump.set_defaults(func=purge_cluster.dump) + parser_purge.set_defaults(func=ipcluster_tools.purge) + parser_dump.set_defaults(func=ipcluster_tools.dump) # run it args = parser.parse_args() diff --git a/distarray/apps/ipcluster_tools.py b/distarray/apps/ipcluster_tools.py index 3ae2b18b..05a41cda 100644 --- a/distarray/apps/ipcluster_tools.py +++ b/distarray/apps/ipcluster_tools.py @@ -12,6 +12,7 @@ import sys from distarray.externals import six +from distarray.context import Context from time import sleep from subprocess import Popen, PIPE @@ -100,6 +101,22 @@ def clear(*args, **kwargs): dv.clear() +def dump(*args, **kwargs): + """ Print out key names that exist on the engines. """ + context = Context() + keylist = context.dump_keys(all_other_contexts=True) + num_keys = len(keylist) + print('*** %d ENGINE KEYS ***' % (num_keys)) + for key, targets in keylist: + print('%s : %r' % (key, targets)) + + +def purge(*args, **kwargs): + """ Remove keys from the engine namespaces. """ + print('Purging keys from engines...') + context = Context() + context.cleanup(all_other_contexts=True) + if __name__ == '__main__': cmd = sys.argv[1] if cmd not in 'start stop restart reset'.split(): diff --git a/distarray/apps/purge_cluster.py b/distarray/apps/purge_cluster.py deleted file mode 100644 index 7be1c247..00000000 --- a/distarray/apps/purge_cluster.py +++ /dev/null @@ -1,40 +0,0 @@ -# encoding: utf-8 -# --------------------------------------------------------------------------- -# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. -# Distributed under the terms of the BSD License. See COPYING.rst. -# --------------------------------------------------------------------------- - -""" Simple utility to clean out existing namespaces on engines. """ - -from __future__ import print_function - -import sys - -from distarray.context import Context - - -def dump(*args, **kwargs): - """ Print out key names that exist on the engines. """ - context = Context() - keylist = context.dump_keys(all_other_contexts=True) - num_keys = len(keylist) - print('*** %d ENGINE KEYS ***' % (num_keys)) - for key, targets in keylist: - print('%s : %r' % (key, targets)) - - -def purge(*args, **kwargs): - """ Remove keys from the engine namespaces. """ - print('Purging keys from engines...') - context = Context() - context.cleanup(all_other_contexts=True) - - -if __name__ == '__main__': - cmd = sys.argv[1] - if cmd == 'dump': - dump() - elif cmd == 'purge': - purge() - else: - raise ValueError("%s command not found" % (cmd,)) From 3b7b0572511f4ef31615cac9fbb3c094d8e8e9ca Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 14 Apr 2014 20:47:39 -0500 Subject: [PATCH 16/19] Remove extraneous *args. --- distarray/apps/ipcluster_tools.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distarray/apps/ipcluster_tools.py b/distarray/apps/ipcluster_tools.py index 05a41cda..21172ef0 100644 --- a/distarray/apps/ipcluster_tools.py +++ b/distarray/apps/ipcluster_tools.py @@ -25,7 +25,7 @@ raise NotImplementedError("Not run with Python 2 *or* 3?") -def start(n=4, engines=None, *args, **kwargs): +def start(n=4, engines=None, **kwargs): """Convenient way to start an ipcluster for testing. Doesn't exit until the ipcluster prints a success message. @@ -49,7 +49,7 @@ def start(n=4, engines=None, *args, **kwargs): raise RuntimeError("ipcluster is already running.") -def stop(*args, **kwargs): +def stop(**kwargs): """Convenient way to stop an ipcluster.""" stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE) @@ -65,7 +65,7 @@ def stop(*args, **kwargs): break -def restart(n=4, engines=None, *args, **kwargs): +def restart(n=4, engines=None, **kwargs): """Convenient way to restart an ipcluster.""" stop() @@ -89,7 +89,7 @@ def restart(n=4, engines=None, *args, **kwargs): ''' -def clear(*args, **kwargs): +def clear(**kwargs): from IPython.parallel import Client c = Client() dv = c[:] @@ -101,7 +101,7 @@ def clear(*args, **kwargs): dv.clear() -def dump(*args, **kwargs): +def dump(**kwargs): """ Print out key names that exist on the engines. """ context = Context() keylist = context.dump_keys(all_other_contexts=True) @@ -111,7 +111,7 @@ def dump(*args, **kwargs): print('%s : %r' % (key, targets)) -def purge(*args, **kwargs): +def purge(**kwargs): """ Remove keys from the engine namespaces. """ print('Purging keys from engines...') context = Context() From 66bb827801cea515b7f367123b14901471b1d357 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 14 Apr 2014 20:50:37 -0500 Subject: [PATCH 17/19] Better description as suggested by @bgrant. --- distarray/apps/dacluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py index 49fa2ee4..ff05d330 100755 --- a/distarray/apps/dacluster.py +++ b/distarray/apps/dacluster.py @@ -20,7 +20,8 @@ def main(): main_description = """ Start, stop and manage a IPython.parallel cluster. `dacluster` can take all the commands IPython's `ipcluster` can, and a few extras that are - distarray specific. + distarray specific. For details on a subcommand, try `dacluster + --help`. """ parser = argparse.ArgumentParser(description=main_description) From 464598a4308a16d9b3c52657b3f17f4880d86130 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 15 Apr 2014 12:32:01 -0500 Subject: [PATCH 18/19] Move ipcluster_tools.py functions into dacluster.py --- distarray/apps/dacluster.py | 120 +++++++++++++++++++++++++++++++++--- 1 file changed, 113 insertions(+), 7 deletions(-) diff --git a/distarray/apps/dacluster.py b/distarray/apps/dacluster.py index ff05d330..204cdef5 100755 --- a/distarray/apps/dacluster.py +++ b/distarray/apps/dacluster.py @@ -10,10 +10,116 @@ distarray specific. """ +from __future__ import print_function + import argparse import sys +from time import sleep +from subprocess import Popen, PIPE + +from distarray.externals import six +from distarray.context import Context + + +if six.PY2: + ipcluster_cmd = 'ipcluster' +elif six.PY3: + ipcluster_cmd = 'ipcluster3' +else: + raise NotImplementedError("Not run with Python 2 *or* 3?") -from . import ipcluster_tools + +def start(n=4, engines=None, **kwargs): + """Convenient way to start an ipcluster for testing. + + Doesn't exit until the ipcluster prints a success message. + """ + if engines is None: + engines = "--engines=MPIEngineSetLauncher" + + cluster = Popen([ipcluster_cmd, 'start', '-n', str(n), engines], + stdout=PIPE, stderr=PIPE) + + started = "Engines appear to have started successfully" + running = "CRITICAL | Cluster is already running with" + while True: + line = cluster.stderr.readline().decode() + if not line: + break + print(line, end='') + if (started in line): + break + elif (running in line): + raise RuntimeError("ipcluster is already running.") + + +def stop(**kwargs): + """Convenient way to stop an ipcluster.""" + stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE) + + stopped = "Stopping cluster" + not_running = ("CRITICAL | Could not read pid file, cluster " + "is probably not running.") + while True: + line = stopping.stderr.readline().decode() + if not line: + break + print(line, end='') + if (stopped in line) or (not_running in line): + break + + +def restart(n=4, engines=None, **kwargs): + """Convenient way to restart an ipcluster.""" + stop() + + started = False + while not started: + sleep(2) + try: + start(n=n, engines=engines) + except RuntimeError: + pass + else: + started = True + +_RESET_ENGINE_DISTARRAY = ''' +from sys import modules +orig_mods = set(modules) +for m in modules.copy(): + if m.startswith('distarray'): + del modules[m] +deleted_mods = sorted(orig_mods - set(modules)) +''' + + +def clear(**kwargs): + from IPython.parallel import Client + c = Client() + dv = c[:] + dv.execute(_RESET_ENGINE_DISTARRAY, block=True) + mods = dv['deleted_mods'] + print("The following modules were removed from the engines' namespaces:") + for mod in mods[0]: + print(' ' + mod) + dv.clear() + + +def dump(**kwargs): + """ Print out key names that exist on the engines. """ + context = Context() + keylist = context.dump_keys(all_other_contexts=True) + num_keys = len(keylist) + print('*** %d ENGINE KEYS ***' % (num_keys)) + for key, targets in keylist: + print('%s : %r' % (key, targets)) + + +def purge(**kwargs): + """ Remove keys from the engine namespaces. """ + print('Purging keys from engines...') + context = Context() + context.cleanup(all_other_contexts=True) def main(): @@ -81,12 +187,12 @@ def main(): help=engine_help) # set the functions each command should use - parser_start.set_defaults(func=ipcluster_tools.start) - parser_stop.set_defaults(func=ipcluster_tools.stop) - parser_restart.set_defaults(func=ipcluster_tools.restart) - parser_clear.set_defaults(func=ipcluster_tools.clear) - parser_purge.set_defaults(func=ipcluster_tools.purge) - parser_dump.set_defaults(func=ipcluster_tools.dump) + parser_start.set_defaults(func=start) + parser_stop.set_defaults(func=stop) + parser_restart.set_defaults(func=restart) + parser_clear.set_defaults(func=clear) + parser_purge.set_defaults(func=purge) + parser_dump.set_defaults(func=dump) # run it args = parser.parse_args() From e2d6c8c4d3f8d3cf9d5aff8a2ea1c441f3056c85 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 15 Apr 2014 12:32:42 -0500 Subject: [PATCH 19/19] git rm ipcluster_tools.py --- distarray/apps/ipcluster_tools.py | 124 ------------------------------ 1 file changed, 124 deletions(-) delete mode 100644 distarray/apps/ipcluster_tools.py diff --git a/distarray/apps/ipcluster_tools.py b/distarray/apps/ipcluster_tools.py deleted file mode 100644 index 21172ef0..00000000 --- a/distarray/apps/ipcluster_tools.py +++ /dev/null @@ -1,124 +0,0 @@ -# encoding: utf-8 -# --------------------------------------------------------------------------- -# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. -# Distributed under the terms of the BSD License. See COPYING.rst. -# --------------------------------------------------------------------------- - -""" -Functions for starting and stopping ipclusters. -""" - -from __future__ import print_function - -import sys -from distarray.externals import six -from distarray.context import Context -from time import sleep -from subprocess import Popen, PIPE - - -if six.PY2: - ipcluster_cmd = 'ipcluster' -elif six.PY3: - ipcluster_cmd = 'ipcluster3' -else: - raise NotImplementedError("Not run with Python 2 *or* 3?") - - -def start(n=4, engines=None, **kwargs): - """Convenient way to start an ipcluster for testing. - - Doesn't exit until the ipcluster prints a success message. - """ - if engines is None: - engines = "--engines=MPIEngineSetLauncher" - - cluster = Popen([ipcluster_cmd, 'start', '-n', str(n), engines], - stdout=PIPE, stderr=PIPE) - - started = "Engines appear to have started successfully" - running = "CRITICAL | Cluster is already running with" - while True: - line = cluster.stderr.readline().decode() - if not line: - break - print(line, end='') - if (started in line): - break - elif (running in line): - raise RuntimeError("ipcluster is already running.") - - -def stop(**kwargs): - """Convenient way to stop an ipcluster.""" - stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE) - - stopped = "Stopping cluster" - not_running = ("CRITICAL | Could not read pid file, cluster " - "is probably not running.") - while True: - line = stopping.stderr.readline().decode() - if not line: - break - print(line, end='') - if (stopped in line) or (not_running in line): - break - - -def restart(n=4, engines=None, **kwargs): - """Convenient way to restart an ipcluster.""" - stop() - - started = False - while not started: - sleep(2) - try: - start(n=n, engines=engines) - except RuntimeError: - pass - else: - started = True - -_RESET_ENGINE_DISTARRAY = ''' -from sys import modules -orig_mods = set(modules) -for m in modules.copy(): - if m.startswith('distarray'): - del modules[m] -deleted_mods = sorted(orig_mods - set(modules)) -''' - - -def clear(**kwargs): - from IPython.parallel import Client - c = Client() - dv = c[:] - dv.execute(_RESET_ENGINE_DISTARRAY, block=True) - mods = dv['deleted_mods'] - print("The following modules were removed from the engines' namespaces:") - for mod in mods[0]: - print(' ' + mod) - dv.clear() - - -def dump(**kwargs): - """ Print out key names that exist on the engines. """ - context = Context() - keylist = context.dump_keys(all_other_contexts=True) - num_keys = len(keylist) - print('*** %d ENGINE KEYS ***' % (num_keys)) - for key, targets in keylist: - print('%s : %r' % (key, targets)) - - -def purge(**kwargs): - """ Remove keys from the engine namespaces. """ - print('Purging keys from engines...') - context = Context() - context.cleanup(all_other_contexts=True) - -if __name__ == '__main__': - cmd = sys.argv[1] - if cmd not in 'start stop restart reset'.split(): - sys.exit("Error: %r not a valid command." % cmd) - globals()[cmd]()