Last active
August 11, 2020 19:08
-
-
Save redwrasse/9e91904fcd63511a1350af374b644396 to your computer and use it in GitHub Desktop.
demonstrating auto-regressive model (motivating full generative model) as trained convolutional layer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
--------------------------------------------------- | |
Output: | |
epoch loss: 78.85499735287158 | |
epoch loss: 0.0008048483715437094 | |
epoch loss: 7.917497569703835e-06 | |
epoch loss: 7.784523854692527e-08 | |
epoch loss: 1.082900831506084e-09 | |
epoch loss: 3.153994704296892e-10 | |
epoch loss: 3.153994704296892e-10 | |
epoch loss: 3.153994704296892e-10 | |
epoch loss: 3.153994704296892e-10 | |
epoch loss: 3.153994704296892e-10 | |
Parameter containing: | |
tensor([[[ 0.5000, -0.4000]]], requires_grad=True) | |
success: trained weights match model. | |
testing trained model predictions ... | |
success: predictions agreed. | |
-------------------------------------------------- | |
This script demonstrates the essential ideas | |
for training a full generative model (for example, Wavenet) for timeseries | |
by way of convolutional layers. The weights of the trained model are shown to agree. | |
Assuming a finite paste time dependence and linear dependence | |
it becomes an auto-regressive model. This can be trained by | |
convolutional layers but the time series has to be appropriately | |
lined up at input and output layers. | |
The general rules (assuming a left-padding). | |
# 1) shift output left one index (so an index isn't tied to itself) | |
# 2) ignore first k - 1 indices | |
# 3) ignore last index in both input and output | |
# aka output is the range k-1:-1, | |
# input is the range k: | |
Note this means the kernel size is actually one less than what one may expect, | |
since an index isn't tied to itself. | |
What it looks like in this example with k = 2. The first k - 1 (= 1 for k = 2 in this case) | |
and last index are ignored for calculating the loss. | |
(ig) (ig) | |
x1 x2 x3 x4 x5 | |
* * * * * | |
/ | / | / | / | / | | |
* * * * * | |
x0 x1 x2 x3 x4 | |
(ig) (ig) | |
Example auto-regressive model: | |
AR(2) process x_t = a x_t-1 + b xt-2 + noise | |
stationary if a in [-2, 2], b in [-1, 1] | |
This trained model then allows prediction, outputting the next timestep value x5. Iterate to | |
generate a sequence of predictions. | |
""" | |
import random | |
A, B = -0.4, 0.5 | |
def gen_ar(): | |
n_samples = 100 | |
x0, x1 = 50, 60. | |
x2 = B * x0 + A * x1 | |
for i in range(n_samples): | |
x0 = x1 | |
x1 = x2 | |
x2 = B * x0 + A * x1 + random.gauss(0, 10 ** -5) | |
yield x2 | |
def train_convolutional(): | |
# to do: train convolutional model directly with analytic gradient descent. | |
pass | |
def test_prediction(model): | |
import torch | |
print("testing trained model predictions ...") | |
window = [] | |
window_sz = 5 + 1 | |
seq_end = 10**3 | |
i = 0 | |
for e in gen_ar(): | |
window.append(e) | |
if len(window) > window_sz: | |
del window[0] | |
torch_window = torch.FloatTensor(window[:window_sz - 1]).reshape(1, 1, window_sz - 1) | |
prediction = model(torch_window) | |
x5_pred = prediction[0][0][3].data.item() | |
x5_actual = window[-1] | |
rel_error = abs(x5_pred - x5_actual) / abs(x5_pred) | |
#print(f'rel. error: {rel_error}') | |
assert rel_error < 1e-3, 'prediction error!' | |
if i > seq_end: | |
break | |
i += 1 | |
print("success: predictions agreed.") | |
def train_conv_pytorch(): | |
import torch | |
import torch.nn.functional as F | |
# arbitrarily choose to send in chunks of length 5 | |
# so long as greater than k, ok | |
n = 5 | |
# dependence on 2 prior terms | |
# will shift output to the left one index | |
# so that an index is not tied to itself | |
k = 2 | |
def left_pad_k(x, m): | |
return F.pad(x, | |
pad=[m, 0], | |
mode='constant', | |
value=0) | |
def ar_loss_criterion(output, input, k): | |
# 1) shift output left one index | |
# 2) ignore first k - 1 indices | |
# 3) ignore last index in both input and output | |
# aka output is the range k-1:-1, | |
# input is the range k: | |
modified_out = output[:, :, k-1:-1] | |
modified_in = input[:, :, k:] | |
return torch.nn.MSELoss()(modified_out, | |
modified_in) | |
model = torch.nn.Conv1d( | |
in_channels=1, | |
out_channels=1, | |
kernel_size=k, | |
bias=False | |
) | |
data = [] | |
i = 0 | |
for e in gen_ar(): | |
if i % 5 == 0: | |
data.append([]) | |
data[-1].append(e) | |
i += 1 | |
torch_data = torch.FloatTensor(data).unsqueeze(1) | |
assert torch_data.shape == (20, 1, 5) | |
optimizer = torch.optim.SGD(model.parameters(), | |
lr=1e-3) | |
num_epochs = 10**3 | |
for epoch in range(num_epochs): | |
epoch_loss = 0. | |
for j in range(4): | |
optimizer.zero_grad() | |
chunk = torch_data[j*5:(j+1)*5, :, :] | |
chunk_lp = left_pad_k(chunk, k-1) | |
y = model(chunk_lp) | |
loss = ar_loss_criterion(y, chunk, k) | |
loss.backward() | |
epoch_loss += loss.item() | |
optimizer.step() | |
if epoch % 100 == 0: | |
print(f'epoch loss: {epoch_loss}') | |
print(model.weight) | |
b_conv = model.weight[0][0][0].data.numpy() | |
a_conv = model.weight[0][0][1].data.numpy() | |
try: | |
assert abs(b_conv - B) / B < 10**-5, "trained weight B does not match model" | |
assert abs(a_conv - A) / A < 10 ** -5, "trained weight A does not match model" | |
print("success: trained weights match model.") | |
except Exception(e): | |
pass | |
return model | |
if __name__ == "__main__": | |
model = train_conv_pytorch() | |
test_prediction(model) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment