Project

General

Profile

Bug #1759 » single_thread.py

Dobbs, Adam, 28 September 2015 15:03

 
1
"""
2
Single-threaded dataflows module.
3
"""
4
#  This file is part of MAUS: http://micewww.pp.rl.ac.uk:8080/projects/maus
5
#
6
#  MAUS is free software: you can redistribute it and/or modify
7
#  it under the terms of the GNU General Public License as published by
8
#  the Free Software Foundation, either version 3 of the License, or
9
#  (at your option) any later version.
10
#
11
#  MAUS is distributed in the hope that it will be useful,
12
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
#  GNU General Public License for more details.
15
#
16
#  You should have received a copy of the GNU General Public License
17
#  along with MAUS.  If not, see <http://www.gnu.org/licenses/>.
18

    
19
import json
20
import maus_cpp.run_action_manager
21
import maus_cpp.converter
22

    
23
from ROOT import TProcessID # pylint: disable=E0611
24

    
25
from framework.utilities import DataflowUtilities
26

    
27
class PipelineSingleThreadDataflowExecutor: # pylint: disable=R0902
28
    """
29
    @class PipelineSingleThreadDataflowExecutor
30
    Execute MAUS dataflows as a single-threaded pipeline.
31
    """
32

    
33
    def __init__(self, inputer, transformer, merger, outputer, json_config_doc): # pylint: disable=R0913,C0301
34
        """
35
        Save references to arguments and parse the JSON configuration
36
        document.
37
        
38
        @param self Object reference.
39
        @param inputer Input task.
40
        @param transformer Transformer task.
41
        @param merger Merger task.
42
        @param outputer Output task.
43
        @param json_config_doc JSON configuration document.
44
        """
45
        self.inputer = inputer
46
        self.transformer = transformer
47
        self.merger = merger
48
        self.outputer = outputer
49
        self.json_config_doc = json_config_doc
50
        #  Parse the configuration JSON
51
        head_mode = json.loads(self.json_config_doc)["header_and_footer_mode"]
52
        self.write_headers = head_mode == "append"
53
        self.run_number = "first" # used to register first run
54
        self.end_of_run_spill = None
55

    
56
    def execute(self, job_header, job_footer):
57
        """
58
        Execute the dataflow
59

    
60
        Birth outputter, write job header, birth merger, transformer, inputter.
61
        Read events from the input, pass through the transform, merge and
62
        output. Death inputter, transformer, merger; write job footer; death
63
        outputter.
64

    
65
        Birth order is chosen because I want to write JobHeader as early as
66
        possible and JobFooter as late as possible.
67

    
68
        @param job_header JobHeader in python (i.e. dicts etc) format.
69
        @param job_footer JobFooter in python (i.e. dicts etc) format.
70
        """
71
        # Note in all the assert statements - new style (API-compliant) modules
72
        # should raise an exception on fail and return void. Old style modules
73
        # would return true/false on success/failure of birth and death.
74
        try:
75
            print("OUTPUT: Setting up outputer")
76
            birth = self.outputer.birth(self.json_config_doc)
77
            assert(birth == True or birth == None)
78

    
79
            print("Writing JobHeader...")
80
            if self.write_headers:
81
                self.outputer.save(json.dumps(job_header))
82

    
83
            print("INPUT: Setting up input")
84
            birth = self.inputer.birth(self.json_config_doc)
85
            assert(birth == True or birth == None)
86

    
87
            print("PIPELINE: Get event, TRANSFORM, MERGE, OUTPUT, repeat")
88

    
89
            emitter = self.inputer.emitter()
90
            # This helps us time how long the setup that sometimes happens
91
            # in the first event takes
92
            print("HINT: MAUS will process 1 event only at first...")
93
            map_buffer = DataflowUtilities.buffer_input(emitter, 1)
94

    
95
            i = 0
96
            while len(map_buffer) != 0:
97
                # Save number of objects before & reset after to stop 
98
                # TRefArray overspill
99
                object_number = TProcessID.GetObjectCount()
100
                for event in map_buffer:
101
                    self.process_event(event)
102
                i += len(map_buffer)
103
                map_buffer = DataflowUtilities.buffer_input(emitter, 1)
104
                TProcessID.SetObjectCount(object_number)
105

    
106
                # Not Python 3 compatible print() due to backward
107
                # compatability. 
108
                print "TRANSFORM/MERGE/OUTPUT: ",
109
                print "Processed %d events so far," % i,
110
                print "%d events in buffer." % (len(map_buffer))
111
        except:
112
            raise
113

    
114
        finally:
115
            if self.run_number == "first":
116
                self.run_number = 0
117
            self.end_of_run(self.run_number)
118

    
119
            print("INPUT: Shutting down inputer")
120
            death = self.inputer.death()
121
            assert(death == True or death == None)
122
            if self.write_headers:
123
                self.outputer.save(json.dumps(job_footer))
124

    
125
            death = self.outputer.death()
126
            print("OUTPUT: Shutting down outputer")
127
            assert(death == True or death == None)
128

    
129
    def process_event(self, event): # pylint: disable=R0912
130
        """
131
        Process a single event
132
        
133
        event is either of type MAUS::Data or string
134
        InputCppDAQ emits MAUS::Data, InputPyJSON and InputCppROOT emit strings
135

    
136
        Raises a TypeError if event is not in one of these formats 
137
        Process a single event - if it is a Spill, check for run_number change
138
        and call EndOfEvent/StartOfEvent if run_number has changed.
139

    
140
        If run_number has not changed, transform the event and write it out
141
        """
142

    
143
        if event == "":
144
            raise StopIteration("End of event")
145

    
146
        event_json = None
147
        bad_input = False
148
        
149
        # print '>> evtype, class == ',type(event), event.__class__.__name__
150
        if event.__class__.__name__ == 'MAUS::Data':
151
            evtype = event.GetEventType()
152
            daq_errors = event.GetSpill().GetErrors()["bad_data_input"]
153
            # protect against run number change due to bad (empty) spills
154
            if "InputCppDAQOfflineData" in daq_errors:
155
                bad_input = True
156
        elif event.__class__.__name__ == 'str':
157
            try:
158
                event_json = maus_cpp.converter.json_repr(event)
159
                evtype = DataflowUtilities.get_event_type(event_json)
160
                # protect against run number change due to bad (empty) spills
161
                bad_input = False
162
                if "errors" in event_json\
163
                   and "bad_data_input" in event_json["errors"]:
164
                    bad_input = True
165
            except: # pylint: disable = W0702
166
                return
167
        else:
168
            raise TypeError("Event type %s is not supported."\
169
                % type(event))
170

    
171
        # process spills
172
        # check for run number change and process an end-of-run spill if changed
173
        if evtype == "Spill":
174
            if event_json is None:
175
                current_run_number = event.GetSpill().GetRunNumber()
176
                if (event.GetEventType() == "end_of_run"):
177
                    self.end_of_run_spill = event
178
            else:
179
                current_run_number = DataflowUtilities.get_run_number(event_json) # pylint: disable=C0301
180
                if (DataflowUtilities.is_end_of_run(event_json)):
181
                    self.end_of_run_spill = event_json
182

    
183
            # check for bad inputs where the daq run number is not set
184
            # if that's the case do not treat it as an end of run
185
            if not bad_input and current_run_number != self.run_number:
186
                if self.run_number != "first":
187
                    self.end_of_run(self.run_number)
188
                self.start_of_run(current_run_number)
189
                self.run_number = current_run_number
190
            # now transform the event and reduce it
191
            event = self.transformer.process(event)
192
            event = self.merger.process(event)
193
        # done with tranform-merge, now write it out
194
        self.outputer.save(event)
195
        # if we converted to a different representation, delete the old one
196
        try:
197
            maus_cpp.converter.del_data_repr(event)
198
        except: # pylint: disable = W0702
199
            pass
200

    
201
    def start_of_run(self, new_run_number):
202
        """
203
        At the start_of_run, we birth the merger and transformer, then
204
        call start_of_run on the run_action_manager
205

    
206
        @param new_run_number run number of the run that is starting
207
        """
208
        run_header = maus_cpp.run_action_manager.start_of_run(new_run_number)
209

    
210
        print("MERGE: Setting up merger")
211
        birth = self.merger.birth(self.json_config_doc)
212
        assert(birth == True or birth == None)
213

    
214
        print("TRANSFORM: Setting up transformer")
215
        birth = self.transformer.birth(self.json_config_doc)
216
        assert(birth == True or birth == None)
217
        if self.write_headers:
218
            self.outputer.save(run_header)
219

    
220
    def end_of_run(self, old_run_number):
221
        """
222
        At the end_of_run, we death the transformer and merger, then call
223
        end_of_run on the run_action_manager (note reverse ordering, not that it
224
        should matter)
225

    
226
        @param old_run_number run number of the run that is ending
227
        """
228
        if (self.end_of_run_spill == None):
229
            print "  Missing an end_of_run spill..."
230
            print "  ...creating one to flush the mergers!"
231
            self.end_of_run_spill = {"daq_event_type":"end_of_run",
232
                                     "maus_event_type":"Spill",
233
                                     "run_number":self.run_number,
234
                                     "spill_number":-1}
235
            end_of_run_spill_str = json.dumps(self.end_of_run_spill)
236
            spill_data = maus_cpp.converter.data_repr(end_of_run_spill_str)
237
            end_of_run_spill_str = self.merger.process(spill_data)
238

    
239
            if self.write_headers: # write to disk only if write_headers is set
240
                self.outputer.save(end_of_run_spill_str)
241
        try:
242
            maus_cpp.converter.del_data_repr(self.end_of_run_spill)
243
        except TypeError:
244
            pass
245
        self.end_of_run_spill = None
246

    
247
        print("TRANSFORM: Shutting down transformer")
248
        death = self.transformer.death()
249
        assert(death == True or death == None)
250

    
251
        print("MERGE: Shutting down merger")
252
        death = self.merger.death()
253
        assert(death == True or death == None)
254

    
255
        run_footer = maus_cpp.run_action_manager.end_of_run(old_run_number)
256
        if self.write_headers:
257
            self.outputer.save(run_footer)
258

    
259
    @staticmethod
260
    def get_dataflow_description():
261
        """
262
        Get dataflow description.
263

    
264
        @return description.
265
        """
266
        description = "Run in a pipeline programming fashion with only a\n"
267
        description += "single thread. See Wikipedia on 'pipeline\n"
268
        description += "programming' for more information."
269
        return description
(2-2/2)