Project

General

Profile

Feature #1417 » distributed_offline.py

modified version of analyze_data_online for parallel offline running - Rajaram, Durga, 05 March 2014 10:48

 
1
#  This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
2
#
3
#  MAUS is free software: you can redistribute it and/or modify
4
#  it under the terms of the GNU General Public License as published by
5
#  the Free Software Foundation, either version 3 of the License, or
6
#  (at your option) any later version.
7
#
8
#  MAUS is distributed in the hope that it will be useful,
9
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
10
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
#  GNU General Public License for more details.
12
#
13
#  You should have received a copy of the GNU General Public License
14
#  along with MAUS.  If not, see <http://www.gnu.org/licenses/>.
15

    
16
"""
17
Online run script to get maus to run online.
18

    
19
Contains subprocess commands to
20
  - set up celery nodes
21
  - set up input_transform
22
  - set up merger_outputs
23
  - logging
24
Also some reasonably complicated process handling.
25

    
26
Each of the subprocesses is handled by the subprocess module. Cleanup of child
27
processes is done automatically on exit.
28

    
29
Only one instance of analyze_data_online is allowed to run at any time. This is
30
controlled by a lock file. Additionally the lock file contains a list of child
31
process ids. If this script fails to clean up at the end of the run, for example
32
if it is killed by an external signal, then next time it is run we can
33
kill all child processes.
34

    
35
Any command line arguments are passed to the MAUS input-transform and all MAUS
36
merge-output processes
37
"""
38

    
39
# would be great to have some tests for this
40

    
41
import sys
42
import os
43
import signal
44
import subprocess
45
import time
46
import string # pylint: disable=W0402
47
import pymongo
48

    
49
MONGODB = 'maus-new' # no '.' character
50
LOCKFILE = os.path.join(os.environ['MAUS_ROOT_DIR'], 'tmp', '.maus_lockfile')
51
PROCESSES = []
52
# maximum % of total memory usage before a process is restarted
53
MAX_MEM_USAGE = 95.
54
OUTPUT_DIR = os.environ['MAUS_ROOT_DIR']
55

    
56
# time between polls in seconds
57
POLL_TIME = 10
58

    
59
# list of reducers to be used in the online job
60
REDUCER_LIST = [
61
  'reduce_output.py',
62
]
63

    
64
class OnlineProcess:
65
    """
66
    Wrapper for a subprocess POpen object
67

    
68
    """
69

    
70
    def __init__(self, subprocess_arg_list, log_file):
71
        """
72
        Set up the log file and start the subprocess
73
        """
74
        self.arg_list = subprocess_arg_list
75
        self.log_name = log_file
76
        self.log = open(log_file, "w")
77
        self.subproc = None
78
        self._start_process()
79

    
80
    def poll(self):
81
        """
82
        Returns None if the process is running or the returncode if it finished
83

    
84
        Checks memory footprint for the subprocess and restarts the process if
85
        it exceeeds MAX_MEM_USAGE
86
        """
87
        proc = self.subproc
88
        poll_out = proc.poll()
89
        if poll_out == None:
90
            mem_usage = self.memory_usage()
91
            print str(proc.pid).rjust(6), str(mem_usage).ljust(6),
92
        else:
93
            print '\nProcess', proc.pid, 'failed'
94
        return poll_out
95

    
96
    def memory_usage(self):
97
        """
98
        Return the memory usage (%) of the associated subprocess
99
        """
100
        ps_out = subprocess.check_output(['ps', '-p', str(self.subproc.pid),
101
                                          'h', '-o%mem'])
102
        return float(ps_out.strip(' \n'))
103

    
104
    def _start_process(self):
105
        """
106
        Start the subprocess
107
        """
108
        self.subproc = subprocess.Popen(self.arg_list, \
109
                                      stdout=self.log, stderr=subprocess.STDOUT)
110
        print 'Started process with pid', self.subproc.pid, 'and log file', \
111
              self.log_name
112

    
113
def poll_processes(proc_list):
114
    """
115
    Poll processes in process list. Return True if processes are all running,
116
    false if any have finished.
117
    """
118
    all_ok = True
119
    for proc in proc_list:
120
        all_ok = all_ok and proc.poll() == None
121
    print
122
    return all_ok
123

    
124
def celeryd_process(celeryd_log_file_name):
125
    """
126
    Open the celery demon process - sets up workers for MAUS to reconstruct on
127
    """
128
    print 'Starting celery... ',
129
    proc = OnlineProcess(['celeryd', '-lINFO', '--purge'],
130
                         celeryd_log_file_name)
131
    return proc
132

    
133
def maus_input_transform_process(maus_input_log, _extra_args):
134
    """
135
    Open the input transform process - runs against data and performs
136
    reconstruction, leaving reconstructed data in a database somewhere.
137
    """
138
    print 'Starting input-transform...',
139
    maus_inp = \
140
             os.path.join(os.environ['MAUS_ROOT_DIR'],
141
                          'bin//analyze_data_offline.py')
142
    proc = OnlineProcess(['python', maus_inp,
143
                          '-mongodb_database_name='+MONGODB,
144
                          '-type_of_dataflow=multi_process_input_transform',
145
				                  '-verbose_level=0',
146
						              '-DAQ_hostname=localhost']+_extra_args,
147
                          maus_input_log)
148
    return proc
149
    
150
def maus_merge_output_process(maus_output_log, reducer_name, output_name,
151
                              _extra_args):
152
    """
153
    Open the merge output process - runs against reconstructed data and collects
154
    into a bunch of histograms.
155
    """
156
    print 'Starting reducer...',
157
    maus_red = os.path.join(os.environ['MAUS_ROOT_DIR'], 'bin/examples',
158
                                                                   reducer_name)
159
    # strip trailing .py or whatever
160
    root_name = string.join(reducer_name.split('.')[0:-1], '.')
161
    if root_name == '':
162
        root_name = reducer_name
163
    root_name += '.root'
164
    # set directory in which output root file will be written
165
    # default is MAUS_WEB_MEDIA_RAW/end_of_run/
166
    output_root_dir = os.path.join(os.environ['MAUS_ROOT_DIR'], 'end_of_run')
167
    # check if end_of_run output directory exists
168
    # create it if it doesn't
169
    if not os.path.exists(output_root_dir):
170
        os.makedirs(output_root_dir)
171
    proc = OnlineProcess(['python', maus_red,
172
                          '-mongodb_database_name='+MONGODB,
173
                          '-type_of_dataflow=multi_process_merge_output',
174
                          '-output_json_file_name='+output_name,
175
                          '-reduce_plot_refresh_rate=60',
176
                          '-end_of_run_output_root_directory='+output_root_dir,
177
                          '-output_root_file_name='+root_name,
178
                          '-output_root_file_mode=end_of_run_file_per_run']+\
179
                          _extra_args,
180
                          maus_output_log)
181
    return proc
182

    
183
def monitor_mongodb(url, database_name, file_handle):
184
    """
185
    Summarise the database.
186
    @param url URL.
187
    @param database_name Database name or "ALL" for all.
188
    """
189
    mongo = pymongo.Connection(url)
190
    database_names = mongo.database_names()
191
    if (database_name != "ALL"):
192
        if (database_name not in database_names):
193
            print >> file_handle, "Database %s not found" % database_name
194
            return
195
        else:
196
            database_names = [database_name]
197
    for database_name in database_names:
198
        print >> file_handle, "Database: %s" % database_name,
199
        mongodb = mongo[database_name]
200
        collection_names = mongodb.collection_names()
201
        if ("system.indexes" in collection_names):
202
            collection_names.remove("system.indexes")
203
        if (len(collection_names) == 0):
204
            print >> file_handle, "  No collections"
205
            continue
206
        for collection_name in collection_names:
207
            collection = mongodb[collection_name]
208
            validate = mongodb.validate_collection(collection_name)
209
            if "datasize" in validate.keys():
210
                space = validate["datasize"]
211
                space_kb = space / 1024
212
                space_mb = space_kb / 1024
213
                print >> file_handle, \
214
                    "  Collection: %s : %d documents (%d bytes %d Kb %d Mb)" \
215
                    % (collection_name, collection.count(), space, \
216
                    space_kb, space_mb)
217
    file_handle.flush()
218

    
219
def force_kill_celeryd():
220
    """
221
    celeryd likes to leave lurking subprocesses. This function searches the
222
    process table for celeryd child process and kills it.
223
    """
224
    ps_out =  subprocess.check_output(['ps', '-e', '-F'])
225
    pids = []
226
    for line in ps_out.split('\n')[1:]:
227
        if line.find('celeryd') > -1:
228
            words = line.split()
229
            pids.append(int(words[1]))
230
            print "Found lurking celeryd process", pids[-1]
231
    for a_pid in pids:
232
        os.kill(a_pid, signal.SIGKILL)
233
        print "Killed", a_pid
234

    
235
def remove_lockfile():
236
    """
237
    Delete the lockfile
238
    """
239
    if os.path.exists(LOCKFILE):
240
        os.remove(LOCKFILE)
241
        print 'Cleared lockfile'
242
    else:
243
        print 'Strange, I lost the lockfile...'
244

    
245
def clear_lockfile():
246
    """
247
    Clear an existing lockfile
248

    
249
    If the script fails to exit gracefully, we leave a lock file and can leave
250
    associated child processes running. In this case, this function kills all
251
    child processes.
252
    """
253
    if os.path.exists(LOCKFILE):
254
        print """
255
Found lockfile - this may mean you have an existing session running elsewhere.
256
Kill existing session? (y/N)""" 
257
        sys.stdout.flush()
258
        user_input = raw_input()
259
        if len(user_input) == 0 or user_input[0].lower() != 'y':
260
            # note this doesnt go through cleanup function - just exits
261
            os.abort()
262
        print 'Lockfile', LOCKFILE, 'found - killing processes'
263
        fin = open(LOCKFILE)
264
        for line in fin.readlines():
265
            pid = int(line.rstrip('\n'))
266
            try:
267
                os.kill(pid, signal.SIGKILL)
268
            except OSError:
269
                pass # maybe the pid didn't exist
270
            print 'Killed', pid
271
        # celeryd must die
272
        force_kill_celeryd()
273
        time.sleep(3)
274

    
275
def make_lockfile(_procs):
276
    """
277
    Make a lock file listing pid of this process and all children
278
    """
279
    print 'Making lockfile '+LOCKFILE
280
    fout = open(LOCKFILE, 'w')
281
    print >> fout, os.getpid()
282
    for proc in _procs  :
283
        print >> fout, proc.subproc.pid
284
    fout.close()
285

    
286
def cleanup(_procs):
287
    """
288
    Kill any subprocesses in _procs list of OnlineProcesses
289
    """
290
    returncode = 0
291
    for online_process in _procs:
292
        process = online_process.subproc
293
        if process.poll() == None:
294
            print 'Attempting to kill process', str(process.pid)
295
            process.send_signal(signal.SIGINT)
296
    while len(_procs) > 0:
297
        _proc_alive = []
298
        for online_process in _procs:
299
            process = online_process.subproc
300
            print 'Polling process', process.pid,
301
            if process.poll() == None:
302
                print '... process did not die - it is still working '+\
303
                      '(check the log file)'
304
                _proc_alive.append(online_process)
305
            else:
306
                print '... process', str(process.pid), \
307
                      'is dead with return code', str(process.returncode)
308
                returncode = process.returncode
309
        sys.stdout.flush()
310
        _procs = _proc_alive
311
        time.sleep(10)
312
    return returncode
313

    
314
def main():
315
    """
316
    Make a lockfile; spawn child processes; and poll subprocesses until user
317
    hits ctrl-c
318

    
319
    If the subprocesses fail, have a go at setting up rabbitmcq and mongo
320

    
321
    Pass any command line arguments to all MAUS processes
322
    """
323
    extra_args = sys.argv[1:]
324
    returncode = 0
325
    try:
326
        force_kill_celeryd()
327
        clear_lockfile()
328
        log_dir = OUTPUT_DIR
329

    
330
        celery_log = os.path.join(log_dir, 'celeryd.log')
331
        input_log = os.path.join(log_dir, 'maus-input-transform.log')
332
        debug_json = os.path.join(log_dir, 'reduce_output.json')
333

    
334
        PROCESSES.append(celeryd_process(celery_log))
335
        PROCESSES.append(maus_input_transform_process(input_log, extra_args))
336
        for reducer in REDUCER_LIST:
337
            reduce_log = os.path.join(log_dir, reducer[0:-3]+'.log')
338
            PROCESSES.append(maus_merge_output_process(reduce_log,
339
                                              reducer, debug_json, extra_args))
340
        make_lockfile(PROCESSES)
341
        print '\nCTRL-C to quit\n'
342
        mongo_log = open(os.path.join(log_dir, 'mongodb.log'), 'w')
343
        while poll_processes(PROCESSES):
344
            monitor_mongodb("localhost:27017", MONGODB, mongo_log)
345
            sys.stdout.flush()
346
            sys.stderr.flush()
347
            time.sleep(POLL_TIME)
348
    except KeyboardInterrupt:
349
        print "Closing"
350
    except Exception:
351
        sys.excepthook(*sys.exc_info())
352
        returncode = 1
353
    finally:
354
        returncode = cleanup(PROCESSES)+returncode
355
        remove_lockfile()
356
        sys.exit(returncode)
357

    
358
if __name__ == "__main__":
359
    main()
360

    
(1-1/9)