Created
April 7, 2020 10:27
-
-
Save kuriringohankamehameha/d65164f0b8cf80a2325b74e6a0a013bb to your computer and use it in GitHub Desktop.
A sample word2vec implementation of the Continuous Bag of Words model in Numpy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import re | |
from nltk.corpus import brown | |
import matplotlib.pyplot as plt | |
import time | |
def tokenize(text): | |
pattern = re.compile(r'[A-Za-z]+[\w^\']*|[\w^\']*[A-Za-z]+[\w^\']*') | |
try: | |
return pattern.findall(text.lower()) | |
except: | |
return pattern.findall(text) | |
def generate_mapping(tokens): | |
# Generates the word_to_idx and idx_to_word mappings | |
word_to_idx, idx_to_word = {}, {} | |
for idx, token in enumerate(set(tokens)): | |
word_to_idx[token] = idx | |
idx_to_word[idx] = token | |
return word_to_idx, idx_to_word | |
def get_training_data(tokens, word_to_ix, CONTEXT_SIZE): | |
X, Y = [], [] | |
for i in range(CONTEXT_SIZE, len(tokens) - CONTEXT_SIZE): | |
# Context spans from max(0, i - CONTEXT_SIZE) to min(len(tokens) - 1, i + CONTEXT_SIZE) | |
# for j in range(max(0, i - CONTEXT_SIZE), min(len(tokens), i + CONTEXT_SIZE + 1)): | |
for j in range(i - CONTEXT_SIZE, i + CONTEXT_SIZE + 1): | |
if j == i: continue | |
Y.append(word_to_ix[tokens[j]]) | |
X.append(word_to_ix[tokens[i]]) | |
X, Y = np.array(X), np.array(Y) | |
X, Y = X[np.newaxis, :], Y[np.newaxis, :] | |
assert 2 * CONTEXT_SIZE * X.shape[1] == Y.shape[1] | |
return X, Y | |
def load_corpus(filename): | |
corpus = "" | |
with open(filename, 'r', encoding="utf8") as rf: | |
for line in rf: | |
line = line.strip() | |
corpus += line | |
return corpus | |
class CBOW: | |
def __init__(self, contexts, targets, vocab_size, frequency_weights, embedding_size, learning_rate, context_size=3, do_negative_sampling=True, sampling_distribution = None): | |
# Initialize Stuff | |
self.embedding_size = embedding_size | |
self.context_size = context_size | |
self.embedding_matrix = np.random.randn(vocab_size, embedding_size) * 0.01 * frequency_weights.T | |
self.num_classes = vocab_size | |
self.sampling_distribution = sampling_distribution.flatten() | |
self.frequency_weights = frequency_weights | |
self.W = np.random.randn(embedding_size, self.num_classes) * 0.01 * frequency_weights | |
self.bias = np.zeros((1, self.W.shape[1])) | |
self.word_vec = None | |
self.gradients = dict() | |
self.cache_contexts = None # For backprop | |
self.cache_Z = None # For backprop | |
self.learning_rate = learning_rate | |
self.contexts = contexts | |
self.targets = targets | |
self.context_width = 2*context_size | |
self.negative_samples = None | |
self.do_negative_sampling = do_negative_sampling | |
def Embedding(self, contexts): | |
# Op.shape = 1 * embedding_size (Column Vector) | |
# Take the average vector from the context window | |
word_vec = np.mean(self.embedding_matrix[contexts, :], axis=1) | |
#assert word_vec.shape == (self.context_width, self.embedding_matrix.shape[1]) | |
assert word_vec.shape == (1, self.embedding_matrix.shape[1]) | |
self.word_vec = word_vec | |
def tanh(self, inp): | |
return np.tanh(inp) | |
def tanh_delta(self, inp): | |
return (1 - inp*inp) | |
def Dense(self, word_vec, do_negative_sampling): | |
# Pass the word_vec thru the Dense Layer | |
if do_negative_sampling: | |
op = np.dot(word_vec, self.W[:, self.negative_samples]) + self.bias[:, self.negative_samples] | |
# Normalize before passing to tanh | |
v_max, v_min = np.max(word_vec), np.min(word_vec) | |
op = (op - v_min) / (v_max - v_min + 1e-5) | |
#op = op / (np.sum(1e-2 + np.max(op), keepdims=True)) | |
op = self.tanh(op) | |
#print(op.shape, self.negative_samples.shape) | |
assert op.shape == (1, self.negative_samples.shape[0]) | |
return op | |
op = np.dot(word_vec, self.W) + self.bias | |
# Normalize before passing to tanh | |
v_max, v_min = np.max(word_vec), np.min(word_vec) | |
op = (op - v_min) / (v_max - v_min + 1e-5) | |
#op = op / (np.sum(1e-2 + np.max(op), keepdims=True)) | |
op = self.tanh(op) | |
assert op.shape == (1, self.W.shape[1]) | |
return op | |
def softmax(self, inp, do_negative_sampling): | |
if do_negative_sampling: | |
assert inp.shape == (1, self.negative_samples.shape[0]) | |
else: | |
assert inp.shape == (1, self.num_classes) | |
max_val = np.max(inp, axis=1) | |
softmax_out = np.exp(inp - max_val) / (np.sum(np.exp(inp - max_val), keepdims=True) + 1e-3) | |
return softmax_out | |
def forward(self, contexts, target, do_negative_sampling): | |
if do_negative_sampling: | |
def normalize_prob(p): | |
p = p * (1./p.sum()) | |
return p | |
k = 20 | |
target_frequency = self.sampling_distribution[target] | |
self.sampling_distribution[target] = 0 | |
samples = np.arange(0, self.num_classes, dtype=np.int) | |
#samples = np.concatenate([np.arange(0, target, dtype=np.int), np.arange(target + 1, self.num_classes, dtype=np.int)]) | |
prob_distribution = self.sampling_distribution | |
self.negative_samples = np.random.choice(samples, k, p=normalize_prob(self.sampling_distribution), replace=False) | |
self.sampling_distribution[target] = target_frequency | |
#self.negative_samples = np.random.choice(self.num_classes, k, p=[1/(self.num_classes - 1) if i != target else 0 for i in range(self.num_classes)], replace=False) | |
self.negative_samples = np.hstack((self.negative_samples, target)) # Last index is the positive sample | |
self.Embedding(contexts) | |
Z = self.Dense(self.word_vec, self.do_negative_sampling) | |
self.cache_Z = Z | |
softmax_out = self.softmax(Z, self.do_negative_sampling) | |
self.cache_contexts = contexts | |
return softmax_out | |
self.Embedding(contexts) | |
Z = self.Dense(self.word_vec, self.do_negative_sampling) | |
self.cache_Z = Z | |
softmax_out = self.softmax(Z, self.do_negative_sampling) | |
self.cache_contexts = contexts | |
return softmax_out | |
def cross_entropy(self, softmax_out, Y): | |
target = np.zeros(softmax_out.shape) | |
target[:, -1] = 1 | |
loss = -(target * np.log(softmax_out + 1e-3)) | |
return np.max(loss) | |
def backward(self, Y, softmax_out, do_negative_sampling): | |
# Perform backprop | |
# Z = tanh(O) | |
# O = W*word_vec + bias | |
# softmax_out = softmax(Z) | |
# dL/d(Z) = (softmax_out - Y) | |
if (do_negative_sampling == True): | |
Z = self.cache_Z | |
target = np.zeros(softmax_out.shape) | |
target[:, -1] = 1 | |
dL_dZ = softmax_out - target | |
assert dL_dZ.shape == (1, self.negative_samples.shape[0]) | |
self.gradients['dL_dZ'] = dL_dZ | |
# dZ_dO = (1 - tanh^2(O)) = (1 - z*z) | |
dZ_dO = self.tanh_delta(Z) | |
assert dZ_dO.shape == (1, self.negative_samples.shape[0]) # (1, num_classes) | |
# dL/dO = dL_dZ * dZ/dO | |
dL_dO = dL_dZ * dZ_dO | |
assert dL_dO.shape == (1, self.negative_samples.shape[0]) | |
self.gradients['dL_dO'] = dL_dO | |
# dL/d(W) = dL/d(O) * d(O)/d(W) | |
# d(O)/dW = word_vec | |
dO_dW = self.word_vec | |
dL_dW = np.dot(dO_dW.T, dL_dO) | |
assert dL_dW.shape == (self.W.shape[0], self.negative_samples.shape[0]) # (embedding_size, num_classes) | |
self.gradients['dL_dW'] = dL_dW | |
# dL/d(word_vec) = dL/dO * dO/d(word_vec) = dL/dO * W | |
dL_dword_vec = np.dot(dL_dO, self.W[:, self.negative_samples].T) | |
assert dL_dword_vec.shape == self.word_vec.shape # (1, embedding_size) | |
self.gradients['dL_dword_vec'] = dL_dword_vec | |
# dL/d(bias) = dL/dO * dO/d(bias) = dL/dO | |
dL_dbias = dL_dO | |
assert dL_dbias.shape == (1, self.negative_samples.shape[0]) | |
self.gradients['dL_dbias'] = dL_dbias | |
# Clip all gradients | |
#for grad in self.gradients: | |
# self.gradients[grad] = np.clip(self.gradients[grad], -500, 500) | |
return | |
Z = self.cache_Z | |
target = np.zeros(softmax_out.shape) | |
target[:, np.squeeze(Y)] = 1 | |
dL_dZ = softmax_out - target | |
assert dL_dZ.shape == (1, self.num_classes) | |
self.gradients['dL_dZ'] = dL_dZ | |
# dZ_dO = (1 - tanh^2(O)) = (1 - z*z) | |
dZ_dO = self.tanh_delta(Z) | |
assert dZ_dO.shape == (1, self.W.shape[1]) # (1, num_classes) | |
# dL/dO = dL_dZ * dZ/dO | |
dL_dO = dL_dZ * dZ_dO | |
assert dL_dO.shape == (1, self.W.shape[1]) | |
self.gradients['dL_dO'] = dL_dO | |
# dL/d(W) = dL/d(O) * d(O)/d(W) | |
# d(O)/dW = word_vec | |
dO_dW = self.word_vec | |
dL_dW = np.dot(dO_dW.T, dL_dO) | |
assert dL_dW.shape == self.W.shape # (embedding_size, num_classes) | |
self.gradients['dL_dW'] = dL_dW | |
# dL/d(word_vec) = dL/dO * dO/d(word_vec) = dL/dO * W | |
dL_dword_vec = np.dot(dL_dO, self.W.T) | |
assert dL_dword_vec.shape == self.word_vec.shape # (1, embedding_size) | |
self.gradients['dL_dword_vec'] = dL_dword_vec | |
# dL/d(bias) = dL/dO * dO/d(bias) = dL/dO | |
dL_dbias = dL_dO | |
assert dL_dbias.shape == self.bias.shape | |
self.gradients['dL_dbias'] = dL_dbias | |
# Clip all gradients | |
#for grad in self.gradients: | |
# self.gradients[grad] = np.clip(self.gradients[grad], -500, 500) | |
#print(self.gradients) | |
def negative_sampling(self, k): | |
""" | |
Performs Negative Sampling to optimize training time. | |
This approximates the softmax function by randomly updating only a selected few negative samples | |
""" | |
self.negative_samples = np.random.choice(self.num_classes, k, replace=False) | |
#rnd = np.random.choice(self.num_classes, 5, p=[1/39 if i != else 0 for i in range(40)], replace=False) | |
#dL_dword_vec = self.gradients['dL_dword_vec'] | |
#self.embedding_matrix[negative_samples, :] -= self.learning_rate * dL_dword_vec | |
#self.W[:, self.negative_samples] -= self.learning_rate * self.gradients['dL_dW'][:, negative_samples] | |
#self.bias[:, self.negative_samples] -= self.learning_rate * self.gradients['dL_dbias'][:, negative_samples] | |
def update(self, do_negative_sampling): | |
contexts = self.cache_contexts | |
if do_negative_sampling: | |
dL_dword_vec = self.gradients['dL_dword_vec'] | |
self.embedding_matrix[contexts] -= self.learning_rate * dL_dword_vec | |
self.W[:, self.negative_samples] -= self.learning_rate * self.gradients['dL_dW'] | |
self.bias[:, self.negative_samples] -= self.learning_rate * self.gradients['dL_dbias'] | |
return | |
dL_dword_vec = self.gradients['dL_dword_vec'] | |
self.embedding_matrix[contexts] -= self.learning_rate * dL_dword_vec | |
self.W -= self.learning_rate * self.gradients['dL_dW'] | |
self.bias -= self.learning_rate * self.gradients['dL_dbias'] | |
def batch_update(self, X_batch, Y_batch, do_negative_sampling): | |
if do_negative_sampling: | |
return | |
def train(self, epochs): | |
losses = [] | |
X = self.contexts | |
Y = self.targets | |
vocab_size = self.num_classes | |
context_width = Y.shape[1] | |
for epoch in range(epochs): | |
epoch_loss = 0 | |
factor = (2 * CONTEXT_SIZE) | |
inds = list(range(0, context_width)) | |
np.random.shuffle(inds) | |
print(f"Item #{inds[0]}") | |
for i in inds: | |
#start = time.perf_counter() | |
X_item = X[:, i*factor:(i+1)* factor] | |
Y_item = Y[:, i:i+1] | |
softmax_out = self.forward(X_item, Y_item[0], self.do_negative_sampling) | |
# self.batch_update(X_batch, Y_batch, self.do_negative_sampling) | |
self.backward(Y_item, softmax_out, self.do_negative_sampling) | |
self.update(self.do_negative_sampling) | |
loss = self.cross_entropy(softmax_out, Y_item) | |
epoch_loss += np.squeeze(loss) | |
#print(f"Iteration Time = {time.perf_counter() - start}") | |
losses.append(epoch_loss) | |
if epoch: | |
print(f"Loss after epoch #{epoch}: {epoch_loss}") | |
plt.plot(np.arange(epochs), losses) | |
plt.xlabel('# of epochs') | |
plt.ylabel('cost') | |
def load_model(filename): | |
import pickle | |
with open(filename, 'rb') as input: | |
model = pickle.load(input) | |
return model | |
return None | |
def save_model(model, filename): | |
import pickle | |
with open(filename, 'wb') as output: | |
pickle.dump(model, output, pickle.HIGHEST_PROTOCOL) | |
def get_max_idx(inp): | |
# Get counts and indices | |
c = np.array(np.unique(np.argmax(inp, axis=1), return_counts=True)) | |
# Get the maximum count | |
idx = np.argmax(c[1, :]) | |
return c[0, idx] | |
def get_index_of_max(inp): | |
return np.argmax(inp, axis=1) | |
def get_max_prob_result(inp, ix_to_word): | |
return ix_to_word[get_max_idx(inp)] | |
def make_context_vector(context, word_to_idx): | |
return np.array([[word_to_idx[word] for word in tokenize(context)]]) | |
def generate_model_probs(model, context_vector): | |
softmax_out = model.forward(context_vector, None, do_negative_sampling=False) | |
return softmax_out | |
def test(model, corpus, word_to_idx, idx_to_word): | |
corpus_tokens = tokenize(corpus) | |
corpus_size = len(corpus_tokens) | |
for i in range(0 + CONTEXT_SIZE, corpus_size - CONTEXT_SIZE): | |
word = " ".join([corpus_tokens[j] for j in range(i - CONTEXT_SIZE, i + CONTEXT_SIZE + 1) if j != i]) | |
print("Word:" + word) | |
context_vector = make_context_vector(word, word_to_idx) | |
probs = generate_model_probs(model, context_vector) | |
print(f"Prediction: {get_max_prob_result(probs, idx_to_word)}") | |
print(probs) | |
def get_term_frequency(corpus_tokens, word_to_idx): | |
from collections import defaultdict | |
freq_dict = defaultdict(int) | |
max_freq = -1 | |
max_idx = -1 | |
for token in corpus_tokens: | |
freq_dict[word_to_idx[token]] += 1 | |
if freq_dict[word_to_idx[token]] > max_freq: | |
max_freq = freq_dict[word_to_idx[token]] | |
max_idx = word_to_idx[token] | |
return freq_dict, max_freq, max_idx | |
def get_frequency_distribution(freq_dict, total_frequency): | |
unigram_distribution = np.zeros((1, vocab_size), dtype='float32') | |
for token, freq in freq_dict.items(): | |
unigram_distribution[:, token] = freq / total_frequency | |
return unigram_distribution | |
def weigh_rare_tokens(freq_dict, max_freq, max_idx): | |
import math | |
frequency_weights = np.zeros((1, vocab_size), dtype='float32') | |
num_tokens = len(corpus_tokens) | |
for token, freq in freq_dict.items(): | |
frequency_weights[:, token] = freq_dict[token] * (max_freq - freq_dict[token] + 1) / math.sqrt(freq) | |
# Normalize frequency weights | |
frequency_weights = frequency_weights / (frequency_weights[:, max_idx]) | |
# frequency_weights = frequency_weights / (np.sqrt(np.sum(frequency_weights * frequency_weights))) | |
return frequency_weights | |
WINDOW_SIZE = 3 | |
CONTEXT_SIZE = 3 | |
corpus = """We are about to study the idea of a computational process. We are about to study the idea of a computational process. | |
Computational processes are abstract beings that inhabit computers. | |
As they evolve, processes manipulate other abstract things called data. | |
The evolution of a process is directed by a pattern of rules | |
called a program. People create programs to direct processes. In effect, | |
we conjure the spirits of the computer with our spells. Other than that the computers simply perform what we instruct them to do. Other than that the computers simply perform what we instruct them to do.""" | |
# corpus = load_corpus('sample.txt') | |
corpus_tokens = tokenize(corpus) | |
word_to_idx, idx_to_word = generate_mapping(corpus_tokens) | |
targets, contexts = get_training_data(corpus_tokens, word_to_idx, WINDOW_SIZE) | |
vocab_size = len(set(corpus_tokens)) | |
freq_dict, max_freq, max_idx = get_term_frequency(corpus_tokens, word_to_idx) | |
frequency_weights = weigh_rare_tokens(freq_dict, max_freq, max_idx) | |
total_frequency = sum(freq_dict.values()) | |
unigram_distribution = get_frequency_distribution(freq_dict, total_frequency) | |
model = None | |
try: | |
model = load_model("model.pkl") | |
if model is None: | |
model = CBOW(contexts, targets, vocab_size, frequency_weights=frequency_weights, embedding_size=300, learning_rate=0.5, do_negative_sampling=True, sampling_distribution=unigram_distribution) | |
model.train(50) | |
else: | |
model.train(50) | |
except FileNotFoundError: | |
model = CBOW(contexts, targets, vocab_size, frequency_weights=frequency_weights, embedding_size=300, learning_rate=0.5, do_negative_sampling=True, sampling_distribution=unigram_distribution) | |
model.train(50) | |
save_model(model, "model.pkl") | |
test(model, corpus, word_to_idx, idx_to_word) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment