-
Notifications
You must be signed in to change notification settings - Fork 1
cluster management #298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
cluster management #298
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
7dae10c
Move util scripts to apps.
cowlicks ac9d7c4
Add CLI entry point for cluster management.
cowlicks a51ed4a
Changes to ipcluster_tools for a nicer cli.
cowlicks 6aed21e
Add cli entry_point to setup.py
cowlicks c904ee1
Use dacluster in .travis.yml
cowlicks 737d774
Remove cluster management from Makefile.
cowlicks 3a3d62c
Forgot the __init__.py in apps/
cowlicks 3a0f771
Py3 compatible imports.
cowlicks 57792a4
File metadata.
cowlicks 4c9ffeb
Make functions more amenable to *args, and **kwargs for argparse.
cowlicks f00bffb
Add help text. Move module level code into main(). Add dump.
cowlicks 997ba09
Make args python 2 compatible.
cowlicks 60797ad
Remove a bad idea
cowlicks 8c3200f
Better docstrings in dacluster.
cowlicks d96de86
Merge purge_cluster.py into ipcluster_tools.py
cowlicks 3b7b057
Remove extraneous *args.
cowlicks 66bb827
Better description as suggested by @bgrant.
cowlicks 464598a
Move ipcluster_tools.py functions into dacluster.py
cowlicks e2d6c8c
git rm ipcluster_tools.py
cowlicks File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,202 @@ | ||
| #!/usr/bin/env python | ||
| # encoding: utf-8 | ||
| # --------------------------------------------------------------------------- | ||
| # Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. | ||
| # Distributed under the terms of the BSD License. See COPYING.rst. | ||
| # --------------------------------------------------------------------------- | ||
| """ | ||
| Start, stop and manage a IPython.parallel cluster. `dacluster` can take | ||
| all the commands IPython's `ipcluster` can, and a few extras that are | ||
| distarray specific. | ||
| """ | ||
|
|
||
| from __future__ import print_function | ||
|
|
||
| import argparse | ||
| import sys | ||
| from time import sleep | ||
| from subprocess import Popen, PIPE | ||
|
|
||
| from distarray.externals import six | ||
| from distarray.context import Context | ||
|
|
||
|
|
||
| if six.PY2: | ||
| ipcluster_cmd = 'ipcluster' | ||
| elif six.PY3: | ||
| ipcluster_cmd = 'ipcluster3' | ||
| else: | ||
| raise NotImplementedError("Not run with Python 2 *or* 3?") | ||
|
|
||
|
|
||
| def start(n=4, engines=None, **kwargs): | ||
| """Convenient way to start an ipcluster for testing. | ||
|
|
||
| Doesn't exit until the ipcluster prints a success message. | ||
| """ | ||
| if engines is None: | ||
| engines = "--engines=MPIEngineSetLauncher" | ||
|
|
||
| cluster = Popen([ipcluster_cmd, 'start', '-n', str(n), engines], | ||
| stdout=PIPE, stderr=PIPE) | ||
|
|
||
| started = "Engines appear to have started successfully" | ||
| running = "CRITICAL | Cluster is already running with" | ||
| while True: | ||
| line = cluster.stderr.readline().decode() | ||
| if not line: | ||
| break | ||
| print(line, end='') | ||
| if (started in line): | ||
| break | ||
| elif (running in line): | ||
| raise RuntimeError("ipcluster is already running.") | ||
|
|
||
|
|
||
| def stop(**kwargs): | ||
| """Convenient way to stop an ipcluster.""" | ||
| stopping = Popen([ipcluster_cmd, 'stop'], stdout=PIPE, stderr=PIPE) | ||
|
|
||
| stopped = "Stopping cluster" | ||
| not_running = ("CRITICAL | Could not read pid file, cluster " | ||
| "is probably not running.") | ||
| while True: | ||
| line = stopping.stderr.readline().decode() | ||
| if not line: | ||
| break | ||
| print(line, end='') | ||
| if (stopped in line) or (not_running in line): | ||
| break | ||
|
|
||
|
|
||
| def restart(n=4, engines=None, **kwargs): | ||
| """Convenient way to restart an ipcluster.""" | ||
| stop() | ||
|
|
||
| started = False | ||
| while not started: | ||
| sleep(2) | ||
| try: | ||
| start(n=n, engines=engines) | ||
| except RuntimeError: | ||
| pass | ||
| else: | ||
| started = True | ||
|
|
||
| _RESET_ENGINE_DISTARRAY = ''' | ||
| from sys import modules | ||
| orig_mods = set(modules) | ||
| for m in modules.copy(): | ||
| if m.startswith('distarray'): | ||
| del modules[m] | ||
| deleted_mods = sorted(orig_mods - set(modules)) | ||
| ''' | ||
|
|
||
|
|
||
| def clear(**kwargs): | ||
| from IPython.parallel import Client | ||
| c = Client() | ||
| dv = c[:] | ||
| dv.execute(_RESET_ENGINE_DISTARRAY, block=True) | ||
| mods = dv['deleted_mods'] | ||
| print("The following modules were removed from the engines' namespaces:") | ||
| for mod in mods[0]: | ||
| print(' ' + mod) | ||
| dv.clear() | ||
|
|
||
|
|
||
| def dump(**kwargs): | ||
| """ Print out key names that exist on the engines. """ | ||
| context = Context() | ||
| keylist = context.dump_keys(all_other_contexts=True) | ||
| num_keys = len(keylist) | ||
| print('*** %d ENGINE KEYS ***' % (num_keys)) | ||
| for key, targets in keylist: | ||
| print('%s : %r' % (key, targets)) | ||
|
|
||
|
|
||
| def purge(**kwargs): | ||
| """ Remove keys from the engine namespaces. """ | ||
| print('Purging keys from engines...') | ||
| context = Context() | ||
| context.cleanup(all_other_contexts=True) | ||
|
|
||
|
|
||
| def main(): | ||
| main_description = """ | ||
| Start, stop and manage a IPython.parallel cluster. `dacluster` can take | ||
| all the commands IPython's `ipcluster` can, and a few extras that are | ||
| distarray specific. For details on a subcommand, try `dacluster | ||
| <subcommand> --help`. | ||
| """ | ||
| parser = argparse.ArgumentParser(description=main_description) | ||
|
|
||
| # Print help if no command line args are supplied | ||
| if len(sys.argv) == 1: | ||
| parser.print_help() | ||
| sys.exit(1) | ||
|
|
||
| subparsers = parser.add_subparsers() | ||
|
|
||
| start_description = """ | ||
| Start a new IPython.parallel cluster. | ||
| """ | ||
|
|
||
| stop_description = """ | ||
| Stop a IPython.parallel cluster. | ||
| """ | ||
|
|
||
| restart_description = """ | ||
| Restart a IPython.parallel cluster. | ||
| """ | ||
|
|
||
| clear_description = """ | ||
| Clear the namespace and imports on the cluster. This should be the | ||
| same as restarting the engines, but faster. | ||
| """ | ||
|
|
||
| purge_description = """ | ||
| Clear all the DistArray objects from the engines. This sometimes | ||
| fails to delete all keys. | ||
| """ | ||
|
|
||
| dump_description = """ | ||
| Print out key names that exist on the engines. | ||
| """ | ||
|
|
||
| # subparses for all our commands | ||
| parser_start = subparsers.add_parser('start', | ||
| description=start_description) | ||
| parser_stop = subparsers.add_parser('stop', description=stop_description) | ||
| parser_restart = subparsers.add_parser('restart', | ||
| description=restart_description) | ||
| parser_clear = subparsers.add_parser('clear', | ||
| description=clear_description) | ||
| parser_purge = subparsers.add_parser('purge', | ||
| description=purge_description) | ||
| parser_dump = subparsers.add_parser('dump', description=dump_description) | ||
|
|
||
| engine_help = """ | ||
| Number of engines to start. | ||
| """ | ||
|
|
||
| # Add some optional arguments for `start` and `restart` | ||
| parser_start.add_argument('-n', '--n', type=int, nargs='?', default=4, | ||
| help=engine_help) | ||
| parser_restart.add_argument('-n', '--n', type=int, nargs='?', default=4, | ||
| help=engine_help) | ||
|
|
||
| # set the functions each command should use | ||
| parser_start.set_defaults(func=start) | ||
| parser_stop.set_defaults(func=stop) | ||
| parser_restart.set_defaults(func=restart) | ||
| parser_clear.set_defaults(func=clear) | ||
| parser_purge.set_defaults(func=purge) | ||
| parser_dump.set_defaults(func=dump) | ||
|
|
||
| # run it | ||
| args = parser.parse_args() | ||
| args.func(**vars(args)) | ||
|
|
||
| if __name__ == '__main__': | ||
| main() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Encoding statement, copyright...