Created
May 12, 2019 22:40
-
-
Save ghostbust555/5cbcabef6dc46415721f273ecccf9048 to your computer and use it in GitHub Desktop.
triplet BPR
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import itertools | |
import random | |
from collections import defaultdict | |
import matplotlib | |
from scipy import spatial | |
from scipy.interpolate import interpolate | |
import matplotlib.pyplot as plt | |
import keras | |
from keras import Input, Model, regularizers | |
from keras.datasets import mnist | |
from keras.layers import Conv2D, LeakyReLU, Dropout, BatchNormalization, GlobalAveragePooling2D, Dense, K, Lambda | |
from keras.optimizers import Adam | |
import numpy as np | |
# num_cores = 4 | |
# | |
# if GPU: | |
# num_GPU = 1 | |
# num_CPU = 1 | |
# if CPU: | |
# num_CPU = 1 | |
# num_GPU = 0 | |
# | |
# config = tf.ConfigProto(intra_op_parallelism_threads=num_cores, | |
# inter_op_parallelism_threads=num_cores, | |
# allow_soft_placement=True, | |
# device_count = {'CPU' : num_CPU, | |
# 'GPU' : num_GPU} | |
# ) | |
# | |
# session = tf.Session(config=config) | |
# K.set_session(session) | |
class TripletNetwork: | |
def __init__(self, img_shape, embedding_size = 10): | |
self.img_shape = img_shape | |
self.df = 32 | |
self.single_model = self.build_single_model(embedding_size) | |
self.full_model = self.build_full_model(self.single_model) | |
def build_single_model(self, embedding_size = 10): | |
def d_layer(layer_input, filters, f_size=3, normalization=True, dropout_rate=0., stride=2): | |
"""Discriminator layer""" | |
d = Conv2D(filters, kernel_size=f_size, strides=stride, padding='same')(layer_input) | |
d = LeakyReLU(alpha=0.2)(d) | |
if dropout_rate: | |
d = Dropout(dropout_rate)(d) | |
if normalization: | |
d = BatchNormalization()(d) | |
return d | |
img = Input(shape=self.img_shape) | |
d1 = d_layer(img, self.df, normalization=False) # , f_size=5, stride=1 | |
d2 = d_layer(d1, self.df * 2, dropout_rate=.2) # , dropout_rate=.2 | |
d3 = d_layer(d2, self.df * 4, dropout_rate=.2) # , dropout_rate=.2 | |
d4 = d_layer(d3, self.df * 8, dropout_rate=.2) # , dropout_rate=.2 | |
x = GlobalAveragePooling2D()(d4) | |
x = Dense(embedding_size, activation='tanh')(x) | |
embedding = Lambda(lambda tensor: K.l2_normalize(tensor))(x) | |
model = Model(img, embedding) | |
model.compile(Adam(), loss="mae") | |
model.summary() | |
return model | |
def build_full_model(self, single_model): | |
posIn = Input(shape=self.img_shape, name="positive") | |
negIn = Input(shape=self.img_shape, name="negative") | |
anchorIn = Input(shape=self.img_shape, name="anchor") | |
encoded_positive = single_model(posIn) | |
encoded_negative = single_model(negIn) | |
encoded_anchor = single_model(anchorIn) | |
DAP = Lambda(lambda tensors: K.sum(tensors[0] * tensors[1], axis=-1, keepdims=True), name='DAP_loss') # Distance for Anchor-Positive pair | |
DAN = Lambda(lambda tensors: K.sum(tensors[0] * tensors[1], axis=-1, keepdims=True), name='DAN_loss') # Distance for Anchor-Negative pair | |
Triplet_loss = Lambda(lambda loss: 1 - K.sigmoid(loss[0] - loss[1]), name='BPR_Triplet_loss') | |
DAP_loss = DAP([encoded_anchor, encoded_positive]) | |
DAN_loss = DAN([encoded_anchor, encoded_negative]) | |
Final_loss = Triplet_loss([DAP_loss, DAN_loss]) | |
model = Model(inputs=[posIn, negIn, anchorIn], outputs=Final_loss) | |
model.compile(Adam(), loss="mae") | |
model.summary() | |
return model | |
def chunks(self, l, n): | |
"""Yield successive n-sized chunks from l.""" | |
for i in range(0, len(l), n): | |
yield np.array(l[i:i + n]) | |
def batch_generator(self, grouped_training, num_classes, batch_size): | |
while(True): | |
batch = self.get_triplet_batch(grouped_training, num_classes, batch_size) | |
batch = list(batch) | |
empty_out = np.zeros((len(batch), 1)) | |
batch = np.swapaxes(batch, 0, 1) | |
batch_list = [i for i in batch] | |
yield batch_list, empty_out | |
def train_on_batch(self, grouped_training, num_classes, batch_size): | |
batch_list, empty_out = next(self.batch_generator(grouped_training, num_classes, batch_size)) | |
loss = self.full_model.train_on_batch(batch_list, empty_out) | |
return loss | |
def train_epoch(self, grouped_training, num_classes, batch_size, steps_per_epoch, current_epoch, callbacks): | |
loss = self.full_model.fit_generator( | |
generator=self.batch_generator(grouped_training, num_classes, batch_size), | |
steps_per_epoch=steps_per_epoch, | |
epochs=current_epoch+1, | |
verbose=2, | |
initial_epoch=current_epoch, | |
callbacks=callbacks | |
) | |
return loss | |
def manipulate_image(self, img): | |
newImg = np.rot90(img, k=random.randint(0,3)) | |
return newImg | |
def get_triplet_batch(self, grouped_training, num_classes, batch_size): | |
shufled_classes = list(range(0, num_classes)) | |
for i in range(batch_size): | |
np.random.shuffle(shufled_classes) | |
pos_class = shufled_classes[0] | |
neg_class = shufled_classes[1] | |
pos = random.choice(grouped_training[pos_class]) | |
anc = random.choice(grouped_training[pos_class]) | |
while len(grouped_training[pos_class]) > 1 and pos is anc: | |
anc = random.choice(grouped_training[pos_class]) | |
neg = random.choice(grouped_training[neg_class]) | |
yield [self.manipulate_image(pos), self.manipulate_image(neg), self.manipulate_image(anc)] | |
def run_model_eval(self, test_set, test_sample_size=1000): | |
shufled_classes = list(range(0, num_classes)) | |
pos_anc_dist = 0 | |
pos_neg_dist = 0 | |
for i in range(test_sample_size): | |
np.random.shuffle(shufled_classes) | |
pos_class = shufled_classes[0] | |
neg_class = shufled_classes[1] | |
pos = random.choice(test_set[pos_class]) | |
anc = random.choice(test_set[pos_class]) | |
while len(test_set[pos_class]) > 1 and pos is anc: | |
anc = random.choice(test_set[pos_class]) | |
neg = random.choice(test_set[neg_class]) | |
fvs = self.single_model.predict_on_batch( | |
np.array([triplet.manipulate_image(pos), triplet.manipulate_image(anc), triplet.manipulate_image(neg)])) | |
pos_fv = fvs[0] | |
anc_fv = fvs[1] | |
neg_fv = fvs[2] | |
pos_anc_dist += spatial.distance.cosine(pos_fv, anc_fv) | |
pos_neg_dist += spatial.distance.cosine(pos_fv, neg_fv) | |
print("pos_anc_dis = %05f" % (pos_anc_dist / test_sample_size)) | |
print("pos_neg_dist = %05f" % (pos_neg_dist / test_sample_size)) | |
print("neg/pos dist ratio = %05f" % (pos_neg_dist / pos_anc_dist)) | |
return ((pos_anc_dist / test_sample_size), (pos_neg_dist / test_sample_size), (pos_neg_dist / pos_anc_dist)) | |
def run_model_eval_auc(self, test_set, test_sample_size=50, mini_batch_size = 32, show_plot = False): | |
computed_vectors = [[] for i in range(num_classes)] | |
for c_key in test_set.keys(): | |
shuffled_class = test_set[c_key] | |
np.random.shuffle(shuffled_class) | |
class_size = test_set[c_key] | |
for i in range(0, test_sample_size, mini_batch_size): | |
minibatch = shuffled_class[i:min(i+min(mini_batch_size, test_sample_size), len(class_size))] | |
fvs = self.single_model.predict_on_batch(np.array(minibatch)) | |
computed_vectors[c_key].extend(fvs) | |
tprs = [0] | |
fprs = [0] | |
for threshold in np.linspace(.002,np.sqrt(1.8),25)**2:#np.logspace(-2.1, .25, 25) #np.logspace(-9, 1, 25, base=2) | |
fp_count = 0 | |
fn_count = 0 | |
tp_count = 0 | |
tn_count = 0 | |
for vector_list_idx in range(len(computed_vectors)): | |
vector_list = computed_vectors[vector_list_idx] | |
pos_fv = vector_list[0] | |
for anc_fv_idx in range(1, len(vector_list)): | |
anc_fv = vector_list[anc_fv_idx] | |
if spatial.distance.cosine(pos_fv, anc_fv) < threshold: | |
tp_count += 1 | |
else: | |
fn_count += 1 | |
other_list_range = list(range(len(computed_vectors))) | |
other_list_range.remove(vector_list_idx) | |
for other_vector_list_idx in other_list_range: | |
for neg_fv in computed_vectors[other_vector_list_idx]: | |
if spatial.distance.cosine(pos_fv, neg_fv) < threshold: | |
fp_count += 1 | |
else: | |
tn_count += 1 | |
tpr = tp_count/(tp_count+fn_count) | |
fpr = fp_count/(tn_count+fp_count) | |
tprs.append(tpr) | |
fprs.append(fpr) | |
tprs.append(1) | |
fprs.append(1) | |
fprs, tprs = (list(t) for t in zip(*sorted(zip(fprs, tprs)))) | |
new_fprs = [] | |
new_tprs = [] | |
for both in zip(fprs, tprs): | |
if both[0] not in new_fprs: | |
new_fprs.append(both[0]) | |
new_tprs.append(both[1]) | |
fprs = new_fprs | |
tprs = new_tprs | |
f_tpr = interpolate.interp1d(fprs, tprs, kind="quadratic") | |
x = np.linspace(0,1,100) | |
int_tprs = f_tpr(x) | |
auc = np.trapz(tprs, x=fprs) | |
int_auc = np.trapz(int_tprs, x=x) | |
print("AUC = %05f" % auc) | |
print("INT_AUC = %05f" % int_auc) | |
if show_plot: | |
#def close_event(): | |
# plt.close() # timer calls this function after 3 seconds and closes the window | |
#fig = plt.figure() | |
# timer = fig.canvas.new_timer( | |
# interval=6000) # creating a timer object and setting an interval of 3000 milliseconds | |
# timer.add_callback(close_event) | |
plt.plot(fprs, tprs, color='blue', marker=".") | |
plt.plot(x, int_tprs, color='red') | |
plt.ylim([0,1]) | |
plt.xlim([0,1]) | |
#timer.start() | |
plt.show() | |
return auc | |
batch_size = 128 | |
num_classes = 10 | |
epochs = 12 | |
channels = 1 | |
# input image dimensions | |
img_rows, img_cols = 28, 28 | |
# the data, split between train and test sets | |
(x_train, y_train), (x_test, y_test) = mnist.load_data() | |
if K.image_data_format() == 'channels_first': | |
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols) | |
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols) | |
input_shape = (1, img_rows, img_cols) | |
else: | |
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1) | |
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1) | |
input_shape = (img_rows, img_cols, 1) | |
x_train = x_train.astype('float32') | |
x_test = x_test.astype('float32') | |
x_train /= 255 | |
x_test /= 255 | |
print('x_train shape:', x_train.shape) | |
print(x_train.shape[0], 'train samples') | |
print(x_test.shape[0], 'test samples') | |
zipped_train = zip(x_train, y_train) | |
grouped_training = defaultdict(list) | |
for item in zipped_train: | |
key = item[1] | |
grouped_training[key].append(item[0]) | |
zipped_test = zip(x_test, y_test) | |
grouped_test = defaultdict(list) | |
for item in zipped_test: | |
key = item[1] | |
grouped_test[key].append(item[0]) | |
triplet = TripletNetwork((img_rows, img_cols, channels)) | |
class ModelEval(keras.callbacks.Callback): | |
def __init__(self): | |
super(ModelEval, self).__init__() | |
def on_epoch_end(self, epoch, logs=None): | |
(p_a_dis, p_n_dis, n_p_ratio) = triplet.run_model_eval(grouped_test) | |
logs["p_a_dis"] = p_a_dis | |
logs["p_n_dis"] = p_n_dis | |
logs["n_p_ratio"] = n_p_ratio | |
class AUC(keras.callbacks.Callback): | |
def __init__(self): | |
super(AUC, self).__init__() | |
def on_epoch_end(self, epoch, logs=None): | |
auc = triplet.run_model_eval_auc(grouped_test, show_plot=False) | |
logs["auc"] = auc | |
dt = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') | |
tb = keras.callbacks.TensorBoard(log_dir='./logs/%s' % (dt)) | |
saver = keras.callbacks.ModelCheckpoint("models/%s" % (dt), monitor="auc", verbose=1, mode="max", save_best_only=True) | |
callbacks = [ModelEval(), AUC(), saver, tb] | |
for epoch in range(0,1000): | |
triplet.train_epoch(grouped_training, num_classes, batch_size, int(x_train.shape[0]/batch_size), epoch, callbacks) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment