-
Notifications
You must be signed in to change notification settings - Fork 0
/
model.py
299 lines (250 loc) · 9.55 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import csv
import cv2
import numpy as np
from keras.models import Sequential
from keras.layers import Flatten, Dense, Lambda, Dropout, Cropping2D
from keras.layers.convolutional import Convolution2D
from keras.layers.pooling import MaxPooling2D
import matplotlib.pyplot as plt
import random
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
def load_data(data_locations):
"""
Load data from array of data_lications then correct the path to images before saving and return
Features: Center Camera only
Labels: Steering angle
:param data_locations:
:return:
"""
lines = []
for loc in data_locations:
with open(loc + '/driving_log.csv', 'r') as csvf:
reader = csv.reader(csvf)
for l in reader:
l[0] = loc + '/IMG/' + l[0].split('/')[-1]
l[1] = loc + '/IMG/' + l[1].split('/')[-1]
l[2] = loc + '/IMG/' + l[2].split('/')[-1]
lines.append(l)
return lines
def load_data_normalize(data_locations):
"""
Load data from array of data_lications then correct the path to images before saving and return
In addition, apply probability to filter out
0.0 steering angle to overcome bias going straight
-1.0 and 1.0 steeting angle so we don't have outlier data at far end
Features: Center Camera only
Labels: Steering angle
:param data_locations:
:return:
"""
lines = []
for loc in data_locations:
with open(loc + '/driving_log.csv', 'r') as csvf:
reader = csv.reader(csvf)
for l in reader:
random_prob = random.random()
angel = float(l[3])
if (angel == 0.0 and random_prob <= 0.3) or (abs(angel) == 1.0 and random_prob <= 0.1) or (
abs(angel) > 0.0 and abs(angel) < 1.0):
l[0] = loc + '/IMG/' + l[0].split('/')[-1]
l[1] = loc + '/IMG/' + l[1].split('/')[-1]
l[2] = loc + '/IMG/' + l[2].split('/')[-1]
lines.append(l)
return lines
def visualize_data(lines):
"""
Visualize data and show random captured images
:param lines:
:return:
"""
print('size of data', len(lines))
n = random.randint(0, len(lines))
fig = plt.figure(figsize=(20, 200))
for i in range(3):
image = cv2.cvtColor(cv2.imread(lines[n][i]), cv2.COLOR_BGR2RGB)
ax = fig.add_subplot(1, 3, i + 1)
ax.imshow(image)
def visualize_histogram(lines, bucket_size = 10):
"""
Visualize data in bucket_size, this is useful to visualize data
:param lines:
:param bucket_size:
:return:
"""
plt.figure(figsize=(10, 10))
measure = [float(l[3]) for l in lines]
plt.title('Histogram of steering angle')
plt.hist(measure, bucket_size, color='green')
def get_data(data_root_loc):
"""
Load all data in memory
Features: Center Camera only
Labels: Steering angle
:param data_root_loc: array of input data location
:return:
"""
with open(data_root_loc + '/driving_log.csv', 'r') as csvf:
reader = csv.reader(csvf)
lines = [l for l in reader]
images = []
steering_angle = []
for line in lines:
current_image_location = data_root_loc + '/IMG/'
center_f = line[0].split('/')[-1]
steering_measurement = float(line[3])
# center double data with flip
center_image = cv2.imread(current_image_location + center_f)
center_image = cv2.cvtColor(center_image, cv2.COLOR_BGR2RGB)
images.append(center_image)
images.append(np.fliplr(center_image))
steering_angle.append(steering_measurement)
steering_angle.append(-steering_measurement)
return np.array(images), np.array(steering_angle)
def generator(samples, batch_size=32):
"""
Data generator please be noted it double the size of samples with flip and angle adjustment
:param samples:
:param batch_size:
:return:
"""
num_samples = len(samples)
while True:
shuffle(samples)
for offset in range(0, num_samples, batch_size):
batch_samples = samples[offset:offset + batch_size]
images = []
steering_angle = []
for batch_sample in batch_samples:
center_image = cv2.imread(batch_sample[0])
# convert BGR to RGB because cv2 read in as BGR
center_image = cv2.cvtColor(center_image, cv2.COLOR_BGR2RGB)
steering_measurement = float(batch_sample[3])
# add data in with flip technique
images.append(center_image)
images.append(np.fliplr(center_image))
steering_angle.append(steering_measurement)
steering_angle.append(-steering_measurement)
x_train = np.array(images)
y_train = np.array(steering_angle)
yield shuffle(x_train, y_train)
def simple_network(input_shape):
"""
Very simple network
:param input_shape:
:return:
"""
model = Sequential()
model.add(Lambda(lambda x: (x / 255.0) - 0.5, input_shape=input_shape))
model.add(Flatten())
model.add(Dense(1))
return model
def lenet_network(input_shape):
"""
LeNet architecture
:param input_shape:
:return:
"""
model = Sequential()
model.add(Lambda(lambda x: (x / 255.0) - 0.5, input_shape=input_shape))
model.add(Cropping2D(cropping=((50, 20), (0, 0))))
model.add(Convolution2D(6, 5, 5, activation='relu'))
model.add(Dropout(0.2))
model.add(Convolution2D(12, 5, 5, activation='relu'))
model.add(MaxPooling2D())
model.add(Dropout(0.2))
model.add(Convolution2D(24, 5, 5, activation='relu'))
model.add(MaxPooling2D())
model.add(Dropout(0.2))
model.add(Flatten())
model.add(Dense(120, activation='relu'))
model.add(Dense(84, activation='relu'))
model.add(Dense(1))
return model
def nvidia_network(input_shape):
"""
Nvidia architecture
:param input_shape:
:return:
"""
model = Sequential()
model.add(Lambda(lambda x: (x / 255.0) - 0.5, input_shape=input_shape))
model.add(Cropping2D(cropping=((70, 25), (0, 0))))
model.add(Convolution2D(24, 5, 5, subsample=(2, 2), activation='relu'))
model.add(Dropout(0.3))
model.add(Convolution2D(36, 5, 5, subsample=(2, 2), activation='relu'))
model.add(Dropout(0.3))
model.add(Convolution2D(48, 5, 5, subsample=(2, 2), activation='relu'))
model.add(Dropout(0.3))
model.add(Convolution2D(64, 3, 3, activation='relu'))
model.add(Dropout(0.3))
model.add(Convolution2D(64, 3, 3, activation='relu'))
model.add(Dropout(0.3))
model.add(Flatten())
model.add(Dense(100))
model.add(Dense(50))
model.add(Dense(10))
model.add(Dense(1))
return model
def train(x_train, y_train, model, saved_name='model.h5'):
"""
Train will all data of x and y in memory, good to try something quick or prototype please avoid using it
when data is so huge
:param x_train:
:param y_train:
:param model:
:param saved_name:
:return:
"""
model.compile(loss='mse', optimizer='adam')
history_data = model.fit(x_train, y_train, validation_split=0.2, shuffle=True, nb_epoch=30, verbose=1)
model.save(saved_name)
return history_data
def train_with_generator(train_generator, validation_generator, train_size, validation_size, epoch, model,
saved_name='model.h5'):
"""
Model training with generation which is much more efficient than normal fit since we don't put all data in memory
at once but rather a chunk of data please see train function for equivalent
:param train_generator:
:param validation_generator:
:param train_size:
:param validation_size:
:param epoch:
:param model:
:param saved_name:
:return:
"""
model.compile(loss='mse', optimizer='adam')
history_data = model.fit_generator(train_generator, samples_per_epoch=train_size,
validation_data=validation_generator, nb_val_samples=validation_size,
nb_epoch=epoch, verbose=1)
model.save(saved_name)
return history_data
def visualize_loss(history):
"""
Plot model history of error loss of training and validation
:param history: keras history
:return:
"""
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model mean squared error loss')
plt.ylabel('mean squared error loss')
plt.xlabel('epoch')
plt.legend(['training set', 'validation set'], loc='upper right')
plt.show()
if __name__ == "__main__":
# load data from 3 location data_v3, curve_drive and side_drive
data = load_data_normalize(['./data_v3', './curve_drive', './side_drive'])
# Split data to train and validation 80/20 ratio
train_samples, validation_samples = train_test_split(data, test_size=0.2)
# create Train and Validation generator which will be fetched to model training later on
train_generator = generator(train_samples, batch_size=16)
validation_generator = generator(validation_samples, batch_size=16)
# Model
shape_of_data = (160, 320, 3)
train_model = nvidia_network(shape_of_data)
# Training model
# The size of train and validation are double because at generator I add flipping images for every images.
history_data = train_with_generator(train_generator, validation_generator, len(train_samples) * 2,
len(validation_samples) * 2, epoch=5, model=train_model)