import numpy as np
from sklearn.model_selection import train_test_split
from keras.preprocessing.image import ImageDataGenerator
from tqdm import tqdm #displays progress bar within for loops
import matplotlib.pyplot as plt
from skimage import io
import pickle
from skimage.transform import resize
import glob
import os
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
from keras import regularizers, optimizers
from functions.model_functions import plot_confusion_matrix, predict_one_image_cnn
import tensorflow as tf
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
size = set()
for filename in glob.glob('data/*.jpg'):
im=io.imread(filename).shape
size.add(im)
size for all images are: {(424, 424, 3)}
#Let's look at four of our galaxies from the 61578 images in our training set.
plt.figure(1, figsize=(12,9))
plt.subplot(231)
plt.imshow(plt.imread('data/train/spiral/100023.jpg'))
plt.title('Galaxy 100023')
plt.subplot(232)
plt.imshow(plt.imread('data/train/elliptical/100078.jpg'))
plt.title('Galaxy 100078')
plt.subplot(233)
plt.imshow(plt.imread('data/train/elliptical/100123.jpg'))
plt.title('Galaxy 100123')
plt.subplot(234)
plt.imshow(plt.imread('data/train/spiral/100143.jpg'))
plt.title('Galaxy 100143')
plt.subplot(235)
plt.imshow(plt.imread('data/train/spiral/233622.jpg'))
plt.title('Galaxy 233622')
plt.subplot(236)
plt.imshow(plt.imread('data/train/spiral/177755.jpg'))
plt.title('Galaxy 177755')
plt.show()
plt.tight_layout()
From the images above, as a scientist I can already tell what morphology these galaxies are of, but now the question is will the computer be able to classify them correctly. Let's first build a baseline model then compare it to a CNN model.
import pandas as pd
import numpy as np
predictions = pd.read_csv('predictions.csv')
predictions.head()
There seems to be ambigious headings for our columns and the resources on Kaggle don't seem to help much. Essentially, the classes refer to the morphology on the galaxy.
#All of the classes in our Class1 add up to 1. These classes refer to the probaility of
#the galaxy being either the shape in Class1.1, Class1.2, or in Class1.3.
(predictions['Class1.1'] + predictions['Class1.2'] + predictions['Class1.3']).head()
#Dropping all the other columns which do not refer to the morphology.
predictions = predictions[['GalaxyID','Class1.1', 'Class1.2', 'Class1.3']]
predictions.head(10)
Based on the images printed above, we can use our physics brain to determine what the columns stand for and rename the columns.
predictions.columns = ['GalaxyID', 'Elliptical', 'Spiral', 'Irregular']
predictions.head()
If the value in the irregular was the max value for that row, we called it an irregular.
Note: The following code below no longer works because the irregular folders are deleted.
print('num in train elliptical = {}'.format(len(os.listdir('data/train/elliptical/'))))
print('num in train spiral = {}'.format(len(os.listdir('data/train/spiral/'))))
print('num in train irregular = {}'.format(len(os.listdir('data/train/irregular/'))))
print('num in test elliptical = {}'.format(len(os.listdir('data/test/elliptical/'))))
print('num in test spiral = {}'.format(len(os.listdir('data/test/spiral/'))))
print('num in test irregular = {}'.format(len(os.listdir('data/test/irregular/'))))
num in train elliptical = 13237
num in train spiral = 17526
num in train irregular = 26
num in test elliptical = 13456
num in test spiral = 17300
num in test irregular = 33
For this, we can remove the irregular images since they make such a small portion of our over all dataset.
Now, let's delete the irregular galaxies from the predictions (because we deleted their images). Then we will drop the "Irregular" column.
len(predictions[(predictions.Irregular > predictions.Spiral) & (predictions.Irregular > predictions.Elliptical)])
predictions[(predictions.Irregular > predictions.Spiral) & (predictions.Irregular > predictions.Elliptical)].head()
#Dropped all the irregular galaxies.
predictions.drop(predictions[(predictions.Irregular > predictions.Spiral) & (predictions.Irregular > predictions.Elliptical)].index,inplace=True)
predictions.drop(['Irregular'], axis=1, inplace=True)
predictions.head()
import matplotlib.pyplot as plt
import numpy as np
import os, shutil
from keras import models
from keras import layers
from keras.layers import Dropout
from sklearn.metrics import confusion_matrix, f1_score
np.random.seed(123)
from keras.models import load_model
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
# get all the data in the directory split/test, and reshape them
datagen = ImageDataGenerator(rescale=1./255,
rotation_range=90,
brightness_range=(0.5,2),
)
#Take 30000 images from the training folder and resize images down to 106 x 106 px
data_tr=datagen.flow_from_directory(
'data/train',
target_size=(106, 106), #actual image size
batch_size = 30000,
class_mode='binary',
seed = 123)
#Take 20000 images from the testing folder and resize images down to 106 x 106 px
data_te = ImageDataGenerator(rescale=1./255).flow_from_directory(
'data/test',
target_size=(106, 106),
batch_size = 20000,
class_mode='binary',
seed = 123)
# Defining x_train, y_train, and x_test, y_test by grabbing the first 30,000 batch for train
# and 20,000 image batch for test
x_tr, y_tr = next(data_tr)
x_te, y_te = next(data_te)
#Displaying that our training data is binary
y_tr[:5]
#Splitting our data using sklearn. 80% of our data is X_train, y_train - what we are training
#our model on. 20% of our data is the X_val, y_val what we are testing our data on.
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(x_tr, y_tr, test_size=0.20, random_state=123)
len(x_te)
X_val.shape
cnn = models.Sequential()
cnn.add(layers.Conv2D(64, (3, 3), kernel_regularizer=regularizers.l2(0.1), input_shape=(106, 106, 3), padding='valid',strides=(2,2)))
cnn.add(layers.Conv2D(64, (3, 3), input_shape=(106, 106, 3), padding='valid',strides=(2,2)))
cnn.add(layers.BatchNormalization())
cnn.add(layers.Activation('relu'))
cnn.add(layers.MaxPooling2D((2, 2)))
cnn.add(Dropout(0.5))
cnn.add(layers.Conv2D(64, (3, 3), kernel_regularizer=regularizers.l2(0.1)))
cnn.add(layers.BatchNormalization())
cnn.add(layers.Activation('relu'))
cnn.add(layers.MaxPooling2D((2, 2)))
cnn.add(layers.Flatten())
cnn.add(layers.Dense(128))
cnn.add(layers.BatchNormalization())
cnn.add(layers.Activation('relu'))
cnn.add(layers.Dense(32, activation='relu'))
cnn.add(layers.Dense(1, activation='sigmoid'))
# decay_rate = learning_rate / epochs
sgd = optimizers.SGD(lr=0.01, decay=0.0002, momentum=0.9, nesterov=False)
cnn.compile(loss='binary_crossentropy',
optimizer=sgd,
metrics=['acc'])
cnn.load_weights('model_weights/cnn11_only_lr.h5')
cnn1 = cnn.fit(X_train, y_train,
epochs=50,
validation_data=(X_val, y_val),
batch_size=500)
print(cnn.summary())
# Printing one random image to compare with the hidden layers below.
plt.imshow(X_train[50])
plt.show()
# layer_outputs = [layer.output for layer in cnn.layers[:8]]
layer_outputs = [layer.output for layer in cnn.layers]
cnn.layers
activation_model = models.Model(inputs=cnn.input, outputs=layer_outputs)
from keras import models
# from keras import models
from keras.preprocessing import image
activations = activation_model.predict(X_train)
fig, axes = plt.subplots(5, 4, figsize=(12,24))
for i in range(20):
row = i//4
column = i%4
ax = axes[row, column]
first_layer_activation = activations[1]
ax.matshow(first_layer_activation[50, :, :, i], cmap='viridis')
# from keras import models
from keras.preprocessing import image
activations = activation_model.predict(X_train)
fig, axes = plt.subplots(5, 4, figsize=(12,24))
for i in range(20):
row = i//4
column = i%4
ax = axes[row, column]
first_layer_activation = activations[7]
ax.matshow(first_layer_activation[50, :, :, i], cmap='viridis')
# from keras import models
from keras.preprocessing import image
activations = activation_model.predict(X_train)
fig, axes = plt.subplots(5, 4, figsize=(12,24))
for i in range(20):
row = i//4
column = i%4
ax = axes[row, column]
first_layer_activation = activations[0]
ax.matshow(first_layer_activation[50, :, :, i], cmap='viridis')
# from keras import models
from keras.preprocessing import image
activations = activation_model.predict(X_train)
fig, axes = plt.subplots(5, 4, figsize=(12,24))
for i in range(20):
row = i//4
column = i%4
ax = axes[row, column]
first_layer_activation = activations[5]
ax.matshow(first_layer_activation[50, :, :, i], cmap='viridis')
# from keras import models
from keras.preprocessing import image
activations = activation_model.predict(X_train)
fig, axes = plt.subplots(5, 4, figsize=(12,24))
for i in range(20):
row = i//4
column = i%4
ax = axes[row, column]
first_layer_activation = activations[9]
ax.matshow(first_layer_activation[50, :, :, i], cmap='viridis')
# from keras import models
from keras.preprocessing import image
activations = activation_model.predict(X_train)
fig, axes = plt.subplots(5, 4, figsize=(12,24))
for i in range(20):
row = i//4
column = i%4
ax = axes[row, column]
first_layer_activation = activations[3]
ax.matshow(first_layer_activation[50, :, :, i], cmap='viridis')
# from keras import models
from keras.preprocessing import image
activations = activation_model.predict(X_train)
fig, axes = plt.subplots(5, 4, figsize=(12,24))
for i in range(20):
row = i//4
column = i%4
ax = axes[row, column]
first_layer_activation = activations[4]
ax.matshow(first_layer_activation[50, :, :, i], cmap='viridis')
cnn.evaluate(x_te, y_te)
predictions_cnn = cnn.predict(x_te)
predictions_cnn = np.around(predictions_cnn)
f1_score(y_te, predictions_cnn, average='macro')
plt.figure()
plot_confusion_matrix(confusion_matrix(y_te, predictions_cnn), classes=['Elliptical', 'Spiral'])
plt.title('Confusion Matrix for CNN Model')
plt.savefig('CNN_ConfusionMatrix.png')
def predictoneimage_cnn(cnn, path):
img = load_img(path, target_size=(106, 106))
plt.imshow(img)
img = img_to_array(img)
img = img/255
img = np.expand_dims(img, axis=0)
predict = cnn.predict(img)
return predict
predict_one_image_cnn(cnn, 'data/train/elliptical/100078.jpg')
predict_one_image_cnn(cnn, 'data/test/elliptical/564639.jpg')
from sklearn.metrics import roc_curve
fpr_cnn, tpr_cnn, thresholds = roc_curve(y_te, cnn.predict(x_te))
plt.plot(fpr_cnn,tpr_cnn)
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve for CNN Model')
# plt.show()
plt.savefig('CNN_ROC.png')
from sklearn.metrics import auc
roc_auc = auc(fpr_cnn, tpr_cnn)
print(roc_auc)
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
tree_model = DecisionTreeClassifier(max_depth=10)
x_tr.shape
X_train.shape
y_train.shape
x_tr_flat = X_train.reshape((24000,X_train.shape[1]*X_train.shape[2]*X_train.shape[3]))
tree_model.fit(x_tr_flat, y_train)
x_te.shape
x_te_flat = x_te.reshape((20000,x_te.shape[1]*x_te.shape[2]*x_te.shape[3]))
y_predict = tree_model.predict(x_te_flat)
y_predict_proba = tree_model.predict_proba(x_te_flat)
y_predict_proba
accuracy_score(y_te, y_predict)
from sklearn.metrics import confusion_matrix
confusion_matrix(y_te, y_predict)
plt.figure()
plot_confusion_matrix(confusion_matrix(y_te, y_predict), classes=['Elliptical', 'Spiral'])
plt.title('Confusion Matrix for Decision Tree')
plt.savefig('DecisionTree_ConfusionMatrix.png')
y_te[:10]
# np.unique(y_predict_proba)[:5]
y_predict_proba[:10]
[x[1] for x in y_predict_proba][:10]
from sklearn.metrics import auc, roc_curve
fpr_decision, tpr_decision, thresholds = roc_curve(y_te, [x[1] for x in y_predict_proba])
plt.plot(fpr_decision,tpr_decision)
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve for Decision Tree')
# plt.show()
plt.savefig('DecisionTree_ROC.png')
roc_auc = auc(fpr_decision, tpr_decision)
# fpr_decision
# tpr_decision
print(roc_auc)
#we can use any of the inception stuff even if they are trained on anything
from keras.applications import inception_v3
from keras.layers import Dense,GlobalAveragePooling2D
from sklearn.metrics import confusion_matrix, f1_score
from keras.models import Model
imagenet=inception_v3.InceptionV3(weights='imagenet',include_top=False)
imagenet_new=imagenet.output
new_model = models.Sequential()
new_model.add(imagenet)
new_model.add(GlobalAveragePooling2D())
new_model.add(Dense(1024,activation='relu'))
new_model.add(Dense(1024,activation='relu')) #dense layer 2
new_model.add(Dense(512,activation='relu')) #dense layer 3
new_model.add(Dense(1,activation='sigmoid')) #final layer
for i,layer in enumerate(imagenet.layers):
print(i,layer.name, layer.trainable)
#don't train the image net (or you will wait too long which is the first layer)
for layer in new_model.layers[:1]:
layer.trainable=False
new_model.compile(optimizer='Adam',loss='binary_crossentropy',metrics=['accuracy'])
# step_size_train=train_generator.n//train_generator.batch_size
new_model.fit(X_train,
y_train,
epochs=10,
batch_size=50,
validation_data=(X_val, y_val))
new_model.evaluate(x_te, y_te)
predictions_transfer = new_model.predict(x_te)
predictions_transfer = np.around(predictions_transfer)
f1_score(y_te, predictions_transfer, average='macro')
plt.figure()
plot_confusion_matrix(confusion_matrix(y_te, predictions_transfer), classes=['Elliptical', 'Spiral'])
plt.title('Confusion Matrix for Inception')
plt.savefig('Inception_ConfusionMatrix.png')
fpr_inception, tpr_inception, thresholds = roc_curve(y_te, new_model.predict(x_te))
plt.plot(fpr_inception,tpr_inception)
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve for Inception')
plt.savefig('Inception_ROC.png')
# plt.show()
from keras.models import Sequential, Input
from keras.layers import Dense
model = Sequential()
model.add(layers.Flatten(input_shape=(106, 106, 3)))
layer_1 = model.add(Dense(units=128, activation='relu'))
layer_2 = model.add(Dense(units=1, activation='sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer='sgd',
metrics=['acc'])
history = model.fit(X_train, y_train,
epochs=50,
validation_data=(X_val, y_val),
batch_size=500)
# history.history['val_acc']
predictions_mlp = model.predict(x_te)
predictions_mlp = np.around(predictions_mlp)
plt.figure()
plot_confusion_matrix(confusion_matrix(y_te, predictions_mlp), classes=['Elliptical', 'Spiral'])
plt.title('Confusion Matrix for Neural Network')
plt.savefig('Baseline_MLP_ConfusionMatrix.png')
fpr_neural, tpr_neural, thresholds = roc_curve(y_te, model.predict(x_te))
plt.plot(fpr_neural,tpr_neural)
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve for Neural Network')
plt.savefig('BaselineMLP_ROC.png')
fpr
tpr
plt.figure()
plt.plot(range(len(history.history['val_acc'])), history.history['val_acc'])
plt.xlabel('Epoch')
plt.ylabel('Validation Accuracy')
loss_and_metrics = model.evaluate(x_te, y_te)
loss_and_metrics
model.metrics_names
model.save('simple_nn_baseline.h5')
# fpr_cnn, tpr_cnn, thresholds = roc_curve(y_te, cnn.predict(x_te)) # CNN -- already have
fpr_decision, tpr_decision, thresholds = roc_curve(y_te, [x[1] for x in y_predict_proba])
# fpr, tpr, thresholds = roc_curve(y_te, new_model.predict(x_te)) # inception
fpr_mlp, tpr_mlp, thresholds = roc_curve(y_te, model.predict(x_te)) # simple MLP
plt.plot(fpr_cnn, tpr_cnn, label='CNN')
plt.plot(fpr_inception, tpr_inception, label='Inception')
plt.plot(fpr_decision, tpr_decision, label="Decision Tree")
plt.plot(fpr_mlp, tpr_mlp, label="Baseline MLP")
plt.plot([0, 1], [0, 1],linestyle='--',c='black')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curves')
plt.legend()
plt.savefig('all_models_roc.png')