Last active
February 18, 2021 02:58
-
-
Save battlecook/8db93bc4087e495e32303242f880a8d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from MySQLdb import _mysql | |
from tensorflow import keras | |
from tensorflow.keras import layers, losses | |
from tensorflow.keras.models import Model | |
import numpy as np | |
import tensorflow as tf | |
import random | |
mysql_host = 'host' | |
mysql_user = 'user' | |
mysql_pw = 'password' | |
mysql_db = 'db' | |
mysql_client = _mysql.connect(host=mysql_host, user=mysql_user, passwd=mysql_pw, db=mysql_db) | |
table_name = 'USER_MATRIX' | |
user_count = 100 | |
size_x = 4 | |
size_y = 4 | |
for user_id in range(user_count): | |
x = random.randrange(0, size_x) | |
y = random.randrange(0, size_y) | |
value = random.randrange(-7, 7) | |
insert_query = '''insert into {} (user_id, x, y, value) VALUES({},{},{},{})'''.format(table_name,str(user_id), str(x), str(y), value) | |
mysql_client.query(insert_query) | |
# https://www.tensorflow.org/tutorials/generative/autoencoder | |
class Denoise(Model): | |
def __init__(self, row_count, column_count): | |
super(Denoise, self).__init__() | |
self.encoder = tf.keras.Sequential([ | |
layers.Input(shape=(row_count, column_count, 1)), | |
layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2), | |
layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)]) | |
self.decoder = tf.keras.Sequential([ | |
layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'), | |
layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'), | |
layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')]) | |
def call(self, x): | |
encoded = self.encoder(x) | |
decoded = self.decoder(encoded) | |
return decoded | |
class SqlDataGenerator(keras.utils.Sequence): | |
def __init__(self, ids, data_size, batch_size=10): | |
self.user_ids = ids | |
self.batch_size = batch_size | |
self.indexes = np.arange(len(self.user_ids)) | |
self.data_size = data_size | |
def __len__(self): | |
return int(np.floor(len(self.user_ids) / self.batch_size)) | |
def __getitem__(self, index): | |
indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size] | |
batch_user_ids = [self.user_ids[k] for k in indexes] | |
in_query = ','.join(batch_user_ids) | |
query = ''' | |
SELECT user_id, x, y, value FROM {} WHERE user_id IN ( {} ) | |
'''.format(table_name, in_query) | |
user_matrix = {} | |
mysql_client.query(query) | |
res = mysql_client.store_result() | |
for i in range(res.num_rows()): | |
row = res.fetch_row()[0] | |
user_id = str(row[0].decode("utf-8")) | |
x = int(row[1].decode("utf-8")) | |
y = int(row[2].decode("utf-8")) | |
value = int(row[3].decode("utf-8")) | |
if user_id not in user_matrix: | |
user_matrix[user_id] = np.zeros((self.data_size[0], self.data_size[1])) | |
user_matrix[user_id][x][y] = value | |
data = list() | |
for user_id, matrix in user_matrix.items(): | |
data.append(np.array(matrix)[..., tf.newaxis]) | |
data = np.array(data) | |
return data, data | |
def on_epoch_end(self): | |
self.indexes = np.arange(len(self.user_ids)) | |
user_ids = [str(i) for i in range(0, user_count)] | |
autoencoder = Denoise(size_x, size_y) | |
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError()) | |
batch_size = 20 | |
train_generator = SqlDataGenerator(user_ids, (size_x, size_y), batch_size) | |
autoencoder.fit( | |
x=train_generator, | |
validation_data=train_generator, | |
batch_size=batch_size, | |
epochs=1, | |
) | |
autoencoder.encoder.summary() | |
autoencoder.decoder.summary() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment