-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdynamictorque.py
More file actions
executable file
·186 lines (148 loc) · 6.23 KB
/
dynamictorque.py
File metadata and controls
executable file
·186 lines (148 loc) · 6.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2013 eResearch SA, CoEPP
# You may distribute under the terms of either the GNU General Public
# License or the Apache v2 License, as specified in the README file.
## Auth: Shunde Zhang. 30/10/2013.
##
## Dynamic Torque main script
## This script starts dynamic torque and sets up all functionalities
##
## Using optparse for command line options (http://docs.python.org/library/optparse.html)
from __future__ import with_statement
import sys
import time
import logging
import logging.handlers
import signal
import ConfigParser
from optparse import OptionParser
import dynamictorque.__version__ as version
import dynamictorque.info_server as info_server
import dynamictorque.utilities as utilities
import dynamictorque.config as config
from dynamictorque.cloud_tools import ICloud
import dynamictorque.res_management as res_management
from dynamictorque.cluster_management import JobPoller
from dynamictorque.cloud_management import CloudPoller
log = utilities.get_logger()
def main(argv=None):
version_str = "Dynamic Torque " + version.version
parser = OptionParser(version=version_str)
set_options(parser)
(cli_options, args) = parser.parse_args()
# Look for global configuration file, and initialize config
if (cli_options.config_file):
config.setup(path=cli_options.config_file)
else:
config.setup()
# Set up logging
logging._srcfile = None
logging.logProcesses = 0
log.setLevel(utilities.LEVELS[config.log_level])
log_formatter = logging.Formatter(config.log_format)
if config.log_stdout:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(log_formatter)
log.addHandler(stream_handler)
if config.log_location:
file_handler = None
if config.log_max_size:
file_handler = logging.handlers.RotatingFileHandler(
config.log_location,
maxBytes=config.log_max_size)
else:
try:
file_handler = logging.handlers.WatchedFileHandler(
config.log_location,)
except AttributeError:
# Python 2.5 doesn't support WatchedFileHandler
file_handler = logging.handlers.RotatingFileHandler(
config.log_location,)
file_handler.setFormatter(log_formatter)
log.addHandler(file_handler)
if not config.log_location and not config.log_stdout:
null_handler = utilities.NullHandler()
log.addHandler(null_handler)
# Log entry message (for timestamp in log)
log.info("Dynamic Torque starting...")
if config.log_level == 'VERBOSE':
log.warning("WARNING - using VERBOSE logging will result is poor performance with more than a few hundred jobs in the queue")
if config.log_level == 'DEBUG':
log.warning("WARNING - using DEBUG logging can result in poor performance with more than a few thousand jobs in the queue")
service_threads = []
server_threads = []
log.info("Dynamic Torque is configured with the following resources:")
for res in config.cloud_resources:
log.info(" %s: %s"%(res, config.cloud_resources[res]))
log.info("locations %s" % config.location_properties)
# Create a job pool
res_center = res_management.ResourceCenter("Resource Center")
# Create the Job Polling thread
job_poller = JobPoller(res_center)
service_threads.append(job_poller)
# Create the Cloud Polling thread
cloud_poller = CloudPoller(res_center)
service_threads.append(cloud_poller)
# Start the vm server for RPCs
info_serv = info_server.InfoServer(res_center)
info_serv.daemon = True
server_threads.append(info_serv)
# Set SIGTERM (kill) handler
signal.signal(signal.SIGTERM, term_handler)
# Set SIGUSR1 (reconfig) handler
#reconfig_handler = make_reconfig_handler(cloud_resources)
#signal.signal(signal.SIGUSR1, reconfig_handler)
# Set SIGUSR2 (reload_ban) handler
#reload_ban_handler = make_banned_job_fileload_handler(cloud_resources)
#signal.signal(signal.SIGUSR2, reload_ban_handler)
# Set SIGUSR2 (quick_exit) handler
#quick_exit_handler = make_quick_exit_handler(scheduler)
#signal.signal(signal.SIGUSR2, quick_exit_handler)
# Start all the threads
for thread in server_threads:
thread.start()
for thread in service_threads:
thread.start()
should_be_running = True
# Wait for keyboard input to exit the cloud scheduler
try:
die = False
while not die:
for thread in service_threads:
if not thread.isAlive():
log.error("%s thread died!" % thread.name)
die = True
time.sleep(1)
except (SystemExit, KeyboardInterrupt):
log.info("Caught a signal that someone wants me to quit!")
should_be_running = False
if should_be_running:
log.error("Whoops. Wasn't expecting to exit. Did a thread crash?")
log.info("Dynamic Torque quitting normally. (It might take a while, don't panic!)")
log.info("Deleting the created instances.")
res_center.forceoff()
# Kill all the service threads, then the info_server
for thread in service_threads:
thread.stop()
for thread in service_threads:
thread.join()
for thread in server_threads:
thread.stop()
for thread in server_threads:
thread.join()
log.info("Dynamic Torque stopped. Bye!")
sys.exit()
def term_handler(signal, handler):
"""Custom SIGTERM handler."""
log.info("Recieved SIGTERM signal")
sys.exit()
def set_options(parser):
"""Sets the command-line options for a passed in OptionParser object (via optparse)."""
# Option attributes: action, type, dest, help. See optparse documentation.
# Defaults: action=store, type=string, dest=[name of the option] help=none
parser.add_option("-f", "--config-file", dest="config_file",
metavar="FILE",
help="Designate a config file for VM Pool")
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))