master
spike 7 years ago
parent 4aa38389b6
commit e4a4c5d32b

File diff suppressed because one or more lines are too long

@ -0,0 +1,165 @@
import pickle
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelBinarizer
def _load_label_names():
"""
Load the label names from file
"""
return ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
def load_cfar10_batch(cifar10_dataset_folder_path, batch_id):
"""
Load a batch of the dataset
"""
with open(cifar10_dataset_folder_path + '/data_batch_' + str(batch_id), mode='rb') as file:
batch = pickle.load(file, encoding='latin1')
features = batch['data'].reshape((len(batch['data']), 3, 32, 32)).transpose(0, 2, 3, 1)
labels = batch['labels']
return features, labels
def display_stats(cifar10_dataset_folder_path, batch_id, sample_id):
"""
Display Stats of the the dataset
"""
batch_ids = list(range(1, 6))
if batch_id not in batch_ids:
print('Batch Id out of Range. Possible Batch Ids: {}'.format(batch_ids))
return None
features, labels = load_cfar10_batch(cifar10_dataset_folder_path, batch_id)
if not (0 <= sample_id < len(features)):
print('{} samples in batch {}. {} is out of range.'.format(len(features), batch_id, sample_id))
return None
print('\nStats of batch {}:'.format(batch_id))
print('Samples: {}'.format(len(features)))
print('Label Counts: {}'.format(dict(zip(*np.unique(labels, return_counts=True)))))
print('First 20 Labels: {}'.format(labels[:20]))
sample_image = features[sample_id]
sample_label = labels[sample_id]
label_names = _load_label_names()
print('\nExample of Image {}:'.format(sample_id))
print('Image - Min Value: {} Max Value: {}'.format(sample_image.min(), sample_image.max()))
print('Image - Shape: {}'.format(sample_image.shape))
print('Label - Label Id: {} Name: {}'.format(sample_label, label_names[sample_label]))
plt.axis('off')
plt.imshow(sample_image)
def _preprocess_and_save(normalize, one_hot_encode, features, labels, filename):
"""
Preprocess data and save it to file
"""
features = normalize(features)
labels = one_hot_encode(labels)
pickle.dump((features, labels), open(filename, 'wb'))
def preprocess_and_save_data(cifar10_dataset_folder_path, normalize, one_hot_encode):
"""
Preprocess Training and Validation Data
"""
n_batches = 5
valid_features = []
valid_labels = []
for batch_i in range(1, n_batches + 1):
features, labels = load_cfar10_batch(cifar10_dataset_folder_path, batch_i)
validation_count = int(len(features) * 0.1)
# Prprocess and save a batch of training data
_preprocess_and_save(
normalize,
one_hot_encode,
features[:-validation_count],
labels[:-validation_count],
'preprocess_batch_' + str(batch_i) + '.p')
# Use a portion of training batch for validation
valid_features.extend(features[-validation_count:])
valid_labels.extend(labels[-validation_count:])
# Preprocess and Save all validation data
_preprocess_and_save(
normalize,
one_hot_encode,
np.array(valid_features),
np.array(valid_labels),
'preprocess_validation.p')
with open(cifar10_dataset_folder_path + '/test_batch', mode='rb') as file:
batch = pickle.load(file, encoding='latin1')
# load the training data
test_features = batch['data'].reshape((len(batch['data']), 3, 32, 32)).transpose(0, 2, 3, 1)
test_labels = batch['labels']
# Preprocess and Save all training data
_preprocess_and_save(
normalize,
one_hot_encode,
np.array(test_features),
np.array(test_labels),
'preprocess_training.p')
def batch_features_labels(features, labels, batch_size):
"""
Split features and labels into batches
"""
for start in range(0, len(features), batch_size):
end = min(start + batch_size, len(features))
yield features[start:end], labels[start:end]
def load_preprocess_training_batch(batch_id, batch_size):
"""
Load the Preprocessed Training data and return them in batches of <batch_size> or less
"""
filename = 'preprocess_batch_' + str(batch_id) + '.p'
features, labels = pickle.load(open(filename, mode='rb'))
# Return the training data in batches of size <batch_size> or less
return batch_features_labels(features, labels, batch_size)
def display_image_predictions(features, labels, predictions):
n_classes = 10
label_names = _load_label_names()
label_binarizer = LabelBinarizer()
label_binarizer.fit(range(n_classes))
label_ids = label_binarizer.inverse_transform(np.array(labels))
fig, axies = plt.subplots(nrows=4, ncols=2)
fig.tight_layout()
fig.suptitle('Softmax Predictions', fontsize=20, y=1.1)
n_predictions = 3
margin = 0.05
ind = np.arange(n_predictions)
width = (1. - 2. * margin) / n_predictions
for image_i, (feature, label_id, pred_indicies, pred_values) in enumerate(zip(features, label_ids, predictions.indices, predictions.values)):
pred_names = [label_names[pred_i] for pred_i in pred_indicies]
correct_name = label_names[label_id]
axies[image_i][0].imshow(feature*255)
axies[image_i][0].set_title(correct_name)
axies[image_i][0].set_axis_off()
axies[image_i][1].barh(ind + margin, pred_values[::-1], width)
axies[image_i][1].set_yticks(ind + margin)
axies[image_i][1].set_yticklabels(pred_names[::-1])
axies[image_i][1].set_xticks([0, 0.5, 1.0])

@ -0,0 +1,199 @@
import os
import numpy as np
import tensorflow as tf
import random
from unittest.mock import MagicMock
def _print_success_message():
return print('Tests Passed')
def test_folder_path(cifar10_dataset_folder_path):
assert cifar10_dataset_folder_path is not None,\
'Cifar-10 data folder not set.'
assert cifar10_dataset_folder_path[-1] != '/',\
'The "/" shouldn\'t be added to the end of the path.'
assert os.path.exists(cifar10_dataset_folder_path),\
'Path not found.'
assert os.path.isdir(cifar10_dataset_folder_path),\
'{} is not a folder.'.format(os.path.basename(cifar10_dataset_folder_path))
train_files = [cifar10_dataset_folder_path + '/data_batch_' + str(batch_id) for batch_id in range(1, 6)]
other_files = [cifar10_dataset_folder_path + '/batches.meta', cifar10_dataset_folder_path + '/test_batch']
missing_files = [path for path in train_files + other_files if not os.path.exists(path)]
assert not missing_files,\
'Missing files in directory: {}'.format(missing_files)
print('All files found!')
def test_normalize(normalize):
test_shape = (np.random.choice(range(1000)), 32, 32, 3)
test_numbers = np.random.choice(range(256), test_shape)
normalize_out = normalize(test_numbers)
assert type(normalize_out).__module__ == np.__name__,\
'Not Numpy Object'
assert normalize_out.shape == test_shape,\
'Incorrect Shape. {} shape found'.format(normalize_out.shape)
assert normalize_out.max() <= 1 and normalize_out.min() >= 0,\
'Incorect Range. {} to {} found'.format(normalize_out.min(), normalize_out.max())
_print_success_message()
def test_one_hot_encode(one_hot_encode):
test_shape = np.random.choice(range(1000))
test_numbers = np.random.choice(range(10), test_shape)
one_hot_out = one_hot_encode(test_numbers)
assert type(one_hot_out).__module__ == np.__name__,\
'Not Numpy Object'
assert one_hot_out.shape == (test_shape, 10),\
'Incorrect Shape. {} shape found'.format(one_hot_out.shape)
n_encode_tests = 5
test_pairs = list(zip(test_numbers, one_hot_out))
test_indices = np.random.choice(len(test_numbers), n_encode_tests)
labels = [test_pairs[test_i][0] for test_i in test_indices]
enc_labels = np.array([test_pairs[test_i][1] for test_i in test_indices])
new_enc_labels = one_hot_encode(labels)
assert np.array_equal(enc_labels, new_enc_labels),\
'Encodings returned different results for the same numbers.\n' \
'For the first call it returned:\n' \
'{}\n' \
'For the second call it returned\n' \
'{}\n' \
'Make sure you save the map of labels to encodings outside of the function.'.format(enc_labels, new_enc_labels)
_print_success_message()
def test_nn_image_inputs(neural_net_image_input):
image_shape = (32, 32, 3)
nn_inputs_out_x = neural_net_image_input(image_shape)
assert nn_inputs_out_x.get_shape().as_list() == [None, image_shape[0], image_shape[1], image_shape[2]],\
'Incorrect Image Shape. Found {} shape'.format(nn_inputs_out_x.get_shape().as_list())
assert nn_inputs_out_x.op.type == 'Placeholder',\
'Incorrect Image Type. Found {} type'.format(nn_inputs_out_x.op.type)
assert nn_inputs_out_x.name == 'x:0', \
'Incorrect Name. Found {}'.format(nn_inputs_out_x.name)
print('Image Input Tests Passed.')
def test_nn_label_inputs(neural_net_label_input):
n_classes = 10
nn_inputs_out_y = neural_net_label_input(n_classes)
assert nn_inputs_out_y.get_shape().as_list() == [None, n_classes],\
'Incorrect Label Shape. Found {} shape'.format(nn_inputs_out_y.get_shape().as_list())
assert nn_inputs_out_y.op.type == 'Placeholder',\
'Incorrect Label Type. Found {} type'.format(nn_inputs_out_y.op.type)
assert nn_inputs_out_y.name == 'y:0', \
'Incorrect Name. Found {}'.format(nn_inputs_out_y.name)
print('Label Input Tests Passed.')
def test_nn_keep_prob_inputs(neural_net_keep_prob_input):
nn_inputs_out_k = neural_net_keep_prob_input()
assert nn_inputs_out_k.get_shape().ndims is None,\
'Too many dimensions found for keep prob. Found {} dimensions. It should be a scalar (0-Dimension Tensor).'.format(nn_inputs_out_k.get_shape().ndims)
assert nn_inputs_out_k.op.type == 'Placeholder',\
'Incorrect keep prob Type. Found {} type'.format(nn_inputs_out_k.op.type)
assert nn_inputs_out_k.name == 'keep_prob:0', \
'Incorrect Name. Found {}'.format(nn_inputs_out_k.name)
print('Keep Prob Tests Passed.')
def test_con_pool(conv2d_maxpool):
test_x = tf.placeholder(tf.float32, [None, 32, 32, 5])
test_num_outputs = 10
test_con_k = (2, 2)
test_con_s = (4, 4)
test_pool_k = (2, 2)
test_pool_s = (2, 2)
conv2d_maxpool_out = conv2d_maxpool(test_x, test_num_outputs, test_con_k, test_con_s, test_pool_k, test_pool_s)
assert conv2d_maxpool_out.get_shape().as_list() == [None, 4, 4, 10],\
'Incorrect Shape. Found {} shape'.format(conv2d_maxpool_out.get_shape().as_list())
_print_success_message()
def test_flatten(flatten):
test_x = tf.placeholder(tf.float32, [None, 10, 30, 6])
flat_out = flatten(test_x)
assert flat_out.get_shape().as_list() == [None, 10*30*6],\
'Incorrect Shape. Found {} shape'.format(flat_out.get_shape().as_list())
_print_success_message()
def test_fully_conn(fully_conn):
test_x = tf.placeholder(tf.float32, [None, 128])
test_num_outputs = 40
fc_out = fully_conn(test_x, test_num_outputs)
assert fc_out.get_shape().as_list() == [None, 40],\
'Incorrect Shape. Found {} shape'.format(fc_out.get_shape().as_list())
_print_success_message()
def test_output(output):
test_x = tf.placeholder(tf.float32, [None, 128])
test_num_outputs = 40
output_out = output(test_x, test_num_outputs)
assert output_out.get_shape().as_list() == [None, 40],\
'Incorrect Shape. Found {} shape'.format(output_out.get_shape().as_list())
_print_success_message()
def test_conv_net(conv_net):
test_x = tf.placeholder(tf.float32, [None, 32, 32, 3])
test_k = tf.placeholder(tf.float32)
logits_out = conv_net(test_x, test_k)
assert logits_out.get_shape().as_list() == [None, 10],\
'Incorrect Model Output. Found {}'.format(logits_out.get_shape().as_list())
print('Neural Network Built!')
def test_train_nn(train_neural_network):
mock_session = tf.Session()
test_x = np.random.rand(128, 32, 32, 3)
test_y = np.random.rand(128, 10)
test_k = np.random.rand(1)
test_optimizer = tf.train.AdamOptimizer()
mock_session.run = MagicMock()
train_neural_network(mock_session, test_optimizer, test_k, test_x, test_y)
assert mock_session.run.called, 'Session not used'
_print_success_message()
Loading…
Cancel
Save