-
Notifications
You must be signed in to change notification settings - Fork 9
/
pipeline.py
249 lines (190 loc) · 9.14 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
from deepdetector import DeepDetector
from simpledetector import SimpleDetector
from facerecognizer import FaceRecognizer
from annotatedphotowriter import AnnotatedPhotoWriter
from annotatedframewriter import AnnotatedFrameWriter
from annotatedvideowriter import AnnotatedVideoWriter
from jsonreportwriter import JSONReportWriter
import imageio
import cv2
import multiprocessing
import os
import os.path
import yaml
import sys
import traceback
class MultiPipelineExecutor(object):
'''
A multiprocess executor that sets up a pipeline for each CPU
core and distributes input set of files across these pipelines.
Each pipeline has its own detectors (including darkflow detectors), recognizer and outputter components,
including deep object detector neural networks. This may seem inefficient,
but it appears darkflow stores some state per input file and is therefore not thread safe.
'''
def execute(self, pipeline_file, input_directory, output_directory):
# Create a shared file queue across multiple processes.
file_queue = multiprocessing.JoinableQueue()
# Start pipelines.
num_pipeline_processors = multiprocessing.cpu_count()
print('Creating %d pipelines' % num_pipeline_processors)
pipeline_processors = [
PipelineProcessor(pipeline_file, input_directory, output_directory, file_queue)
for i in range(num_pipeline_processors) ]
# Enqueue files in input directory.
for dirpath,dirs,files in os.walk(input_directory):
for f in files:
file_path = os.path.join(dirpath, f)
print("put in queue:", file_path)
file_queue.put(file_path)
for w in pipeline_processors:
w.start()
# Add an end command in each queue
for i in range(num_pipeline_processors):
file_queue.put(None)
# Wait for all of the tasks to finish
file_queue.join()
print("Completed")
class PipelineProcessor(multiprocessing.Process):
def __init__(self, pipeline_file, input_directory, output_directory, file_queue):
multiprocessing.Process.__init__(self)
self.file_queue = file_queue
self.pipeline_file = pipeline_file
self.input_directory = input_directory
self.output_directory = output_directory
def run(self):
self.pipeline = Pipeline(self.pipeline_file, self.input_directory, self.output_directory)
proc_name = self.name
while True:
next_file = self.file_queue.get()
if next_file is None:
# None means shutdown this process.
print('%s: Exiting' % proc_name)
self.file_queue.task_done()
break
print('%s: Executing %s' % (proc_name, next_file))
# Termination of process due to uncaught exception will hold up
# the main process because it'll wait on queue forever.
try:
self.pipeline.execute(next_file)
print('%s: Executed %s' % (proc_name, next_file))
except:
print("*****************\nException while executing " + next_file)
traceback.print_exc()
finally:
self.file_queue.task_done()
return
class Pipeline(object):
'''
A Pipeline consists of a series of detectors, recognizers and outputters
through which a photo or video is passed in a sequence.
'''
COMPONENTS = {
'deepdetector' : DeepDetector,
'simpledetector' : SimpleDetector,
'photowriter' : AnnotatedPhotoWriter,
'framewriter' : AnnotatedFrameWriter,
'videowriter' : AnnotatedVideoWriter,
'recognizer' : FaceRecognizer,
'jsonreportwriter' : JSONReportWriter
}
def __init__(self, pipeline_file, input_directory, output_directory):
with open(pipeline_file, 'r') as f:
self.cfg = yaml.load(f)
self.cfg = self.cfg['pipeline']
self.output_directory = output_directory
self.input_directory = input_directory
self.create_components()
def create_components(self):
self.components = []
for comp_cfg in self.cfg:
comp_type = Pipeline.COMPONENTS.get(comp_cfg['type'])
if comp_type:
comp = comp_type(comp_cfg)
self.components.append(comp)
def execute(self, input_file):
isphoto = False
isvideo = False
img = None
# if input file is a photo, read it. Also create a separate
# grayscale image because some of the detectors work on grayscale
# but annotate on original color image.
# Send the color image, grayscale image, filepath, and isphoto flag
# through the components of the pipeline.
try:
img = imageio.imread(input_file)
print("Image read")
isphoto = True
except:
print("Not a photo. Error while attempting to load:", sys.exc_info())
# If input file is a video, open it and setup an iterator over its
# frames. Then for each frame, send image, grayscale image, video filename,
# frame number and isvideo flag through the components of the pipeline.
video = None
try:
video = imageio.get_reader(input_file, 'ffmpeg')
print("Video opened")
isvideo = True
except:
print("Not a video. Error while attempting to open:", sys.exc_info())
if video:
video.close()
if not isphoto and not isvideo:
print("Ignoring file: ", input_file)
return
if isphoto:
input_data = {
'file' : input_file,
'img' : img,
'isphoto' : True,
'isvideo' : False
}
self._execute_pipeline_on_image(input_data)
self.completed(input_data)
elif isvideo:
for frame_num, img in enumerate(video):
input_data = {
'file' : input_file,
'img' : img,
'isphoto' : False,
'isvideo' : True,
'frame' : frame_num
}
self._execute_pipeline_on_image(input_data)
# Notify components such as video writers that need to know when
# the input stream has completed so they can do their own cleanup.
self.completed(input_data)
def _execute_pipeline_on_image(self, input_data):
if input_data['img'].ndim == 3:
# It *appears* imageio imread returns RGB or RGBA, not BGR...confirmed using a blue
# filled rectangle that imageio is indeed RGB which is opposite of OpenCV's default BGR.
# Use RGB consistently everywhere.
if input_data['img'].shape[-1] == 4:
input_data['gray'] = cv2.cvtColor(input_data['img'], cv2.COLOR_RGBA2GRAY)
print("Input image seems to be 4-channel RGBA. Creating 3-channel RGB version")
input_data['img'] = cv2.cvtColor(input_data['img'], cv2.COLOR_RGBA2RGB)
else:
input_data['gray'] = cv2.cvtColor(input_data['img'], cv2.COLOR_RGB2GRAY)
elif input_data['img'].ndim == 2:
# If input is a grayscale image, it'll have just 2 dimensions,
# but Darkflow code expects 3 dimensions. So always keep 'img' a 3 dimension
# image no matter what.
print("Input image is grayscale. Creating RGB version")
input_data['gray'] = input_data['img'].copy()
input_data['img'] = cv2.cvtColor(input_data['img'], cv2.COLOR_GRAY2RGB)
else:
raise "Unknown image format " + input_data['img'].shape
print("Input image:", input_data['img'].shape)
print("Grayscale image:", input_data['gray'].shape)
for comp in self.components:
print("Executing %s on %s frame %d" % (comp.name, input_data['file'], input_data.get('frame', 0)))
comp_outputs = comp.execute(input_data, self.input_directory, self.output_directory)
# At each stage of the pipeline, collect the component's outputs
# and add them to the input data so that they're available for
# downstream components.
input_data[comp.name] = comp_outputs
# Release the image arrays.
input_data['img'] = None
input_data['gray'] = None
def completed(self, input_data):
for comp in self.components:
comp.completed(input_data, self.input_directory, self.output_directory)