Skip to content

Instantly share code, notes, and snippets.

@zhangce
Created February 5, 2018 10:54
Show Gist options
  • Save zhangce/0044869827f254da82381fce78237e41 to your computer and use it in GitHub Desktop.
Save zhangce/0044869827f254da82381fce78237e41 to your computer and use it in GitHub Desktop.
from mpi4py import MPI
from io import StringIO
import numpy as np
import math
import sys
import time
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
# Run it like
# python decent.py ALLREDUCE 0.1 0.1 "_shuffle"
# python decent.py ALLREDUCE 0.1 0.1 "_shuffle"
#
# Search sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] for the meaning of parameters
NCLASS = 16 # Number of classes -- equal to number of nodes
REGULARIZATION = 0.001 # Regularization term
COMMUNICATION = sys.argv[1] # ALLREDUCE, LAG, RING, VAR_REDUCTION, TRI, MESH, STUPIDALLREDUCE
# Data are stored in four folders:
# inception_features_noaug
# inception_features_noaug_head
# inception_features_noaug_shuffle
# inception_features_noaug_head_shuffle
#
# `head` means small version, `shuffle` means data are shuffled, otherwise
# each node only sees a single class
DATA_FOLDER = "/mnt/local/czhang/inception_features_noaug" + sys.argv[4]
FILE_PREFIX = "features"
NLOCALTUPLES = 2600 # Number of examples per machine
BATCHSIZE = 100 # Batch size
DIMENSIONS = 2048 # Number of features
NEPOCHS = 100 # Number of Epochs
#DATA_FOLDER = "/mnt/local/czhang/inception_features_noaug_head"
#FILE_PREFIX = "features"
#NLOCALTUPLES = 100
#BATCHSIZE = 10
#DIMENSIONS = 2048
#NEPOCHS = 100
#DATA_FOLDER = "/mnt/local/czhang/inception_features_noaug_head_shuffle"
#FILE_PREFIX = "features"
#NLOCALTUPLES = 500
#BATCHSIZE = 10
#DIMENSIONS = 2048
#NEPOCHS = 20
CONSTANT_RATE = float(sys.argv[2]) # Initial Learning Rate
EXPO_DECAY = float(sys.argv[3]) # Decay of Learning Rrate
LOCAL_FILE = DATA_FOLDER + "/" + FILE_PREFIX + "%d" % rank
print rank, LOCAL_FILE
if rank == 0:
print COMMUNICATION
print "ISSHUFFLE:", sys.argv[4]
print "BATCHSIZE, DIMENSIONS, NEPOCHS:", BATCHSIZE, DIMENSIONS, NEPOCHS
print "CONSTANT_RATE, EXPO_DECAY:", CONSTANT_RATE, EXPO_DECAY
model_previous_previous = np.zeros((NCLASS, DIMENSIONS))
model_previous = np.zeros((NCLASS, DIMENSIONS))
model = np.zeros((NCLASS, DIMENSIONS))
grad_previous_previous = np.zeros((NCLASS, DIMENSIONS))
grad_previous = np.zeros((NCLASS, DIMENSIONS))
grad = np.zeros((NCLASS, DIMENSIONS))
def enumerate_one_epoch(filename):
buf = []
for l in open(filename):
label, misc, features = l.split("; ")
buf.append((int(label), np.loadtxt(StringIO(u"" + features))))
if len(buf) == BATCHSIZE:
yield buf
buf = []
LEARNING_RATE = CONSTANT_RATE
i = 0
NEXTINTERVAL = 5
CURRENT = 0
for iepoch in range(0, NEPOCHS):
LEARNING_RATE = CONSTANT_RATE * math.exp(-EXPO_DECAY * iepoch)
"""
if iepoch > 0 and iepoch == CURRENT + NEXTINTERVAL: # and iepoch % 25 == 0:
print "RESYNC MODEL..."
global_model = 0.0 * model
comm.Allreduce(model, global_model)
global_model = global_model / size
model = global_model
LEARNING_RATE = LEARNING_RATE / 2
CURRENT = iepoch
NEXTINTERVAL = NEXTINTERVAL * 2
#LEARNING_RATE = CONSTANT_RATE / math.sqrt(iepoch + 1)
i = 0
"""
if rank == 0:
print " lr =", LEARNING_RATE, " ", iepoch
#start = time.time()
# enumerate each batch
for batch in enumerate_one_epoch(LOCAL_FILE):
grad = 0.0 * grad
for (label, features) in batch:
logits = np.dot(features, model.T)
probs = np.exp(logits)
probs = probs / sum(probs)
for label_ in range(0, NCLASS):
if label_ == label:
grad[label_, :] = grad[label_,:] + (1.0 - probs[label_]) * features / BATCHSIZE
else:
grad[label_, :] = grad[label_,:] + ( - probs[label_]) * features / BATCHSIZE
i = i + 1
grad_previous_previous = grad_previous
model_previous_previous = model_previous
grad_previous = grad
model_previous = model
#i = i + 1
if COMMUNICATION == "ALLREDUCE":
global_updates = 0.0 * grad
comm.Allreduce(grad, global_updates)
model = (1 - REGULARIZATION * LEARNING_RATE) * model + LEARNING_RATE * global_updates / size
if COMMUNICATION == "LAG":
model_neighbor1 = 0.0 * model
model_neighbor2 = 0.0 * model
neighbor1 = (rank - 1) % size
neighbor2 = (rank + 1) % size
send_req1 = comm.isend(model, dest=neighbor1)
send_req2 = comm.isend(model, dest=neighbor2)
model_neighbor1 = comm.recv(source=neighbor1)
model_neighbor2 = comm.recv(source=neighbor2)
send_req1.wait()
send_req2.wait()
#model = (model + model_neighbor1 + model_neighbor2) / 3
model = (1 - REGULARIZATION * LEARNING_RATE) * model + LEARNING_RATE * grad - 0.4 * (model - model_neighbor1 + model - model_neighbor2)
if COMMUNICATION == "RING":
model = (1 - REGULARIZATION * LEARNING_RATE) * model + LEARNING_RATE * grad
model_neighbor1 = 0.0 * model
model_neighbor2 = 0.0 * model
neighbor1 = (rank - 1) % size
neighbor2 = (rank + 1) % size
send_req1 = comm.isend(model, dest=neighbor1)
send_req2 = comm.isend(model, dest=neighbor2)
model_neighbor1 = comm.recv(source=neighbor1)
model_neighbor2 = comm.recv(source=neighbor2)
send_req1.wait()
send_req2.wait()
#model = (model + model_neighbor1 + model_neighbor2) / 3
model = model + 0.45 * ( (model_neighbor1 - model) + (model_neighbor2 - model) )
if COMMUNICATION == "VAR_REDUCTION":
if i == 1:
model = (1 - REGULARIZATION * LEARNING_RATE) * model + LEARNING_RATE * grad
else:
model = 2 * model - model_previous_previous + (LEARNING_RATE * grad - REGULARIZATION * LEARNING_RATE * model) - (LEARNING_RATE * grad_previous_previous - REGULARIZATION * LEARNING_RATE * model_previous_previous)
model_neighbor1 = 0.0 * model
model_neighbor2 = 0.0 * model
neighbor1 = (rank - 1) % size
neighbor2 = (rank + 1) % size
send_req1 = comm.isend(model, dest=neighbor1)
send_req2 = comm.isend(model, dest=neighbor2)
model_neighbor1 = comm.recv(source=neighbor1)
model_neighbor2 = comm.recv(source=neighbor2)
send_req1.wait()
send_req2.wait()
model = (2.0 / 3 * model + 1.0 / 6 * model_neighbor1 + 1.0 / 6 * model_neighbor2)
if COMMUNICATION == "TRI":
model = (1 - REGULARIZATION * LEARNING_RATE) * model + LEARNING_RATE * grad
model_neighbor1 = 0.0 * model
model_neighbor2 = 0.0 * model
model_neighbor3 = 0.0 * model
neighbor1 = (rank - 1) % size
neighbor2 = (rank + 1) % size
neighbor3 = (rank + size / 2) % size
send_req1 = comm.isend(model, dest=neighbor1)
send_req2 = comm.isend(model, dest=neighbor2)
send_req3 = comm.isend(model, dest=neighbor3)
model_neighbor1 = comm.recv(source=neighbor1)
model_neighbor2 = comm.recv(source=neighbor2)
model_neighbor3 = comm.recv(source=neighbor3)
send_req1.wait()
send_req2.wait()
send_req3.wait()
model = (model + model_neighbor1 + model_neighbor2 + model_neighbor3) / 4
if COMMUNICATION == "MESH":
#print "~~"
#if i == 1:
# #print "~"
# model = (1 - REGULARIZATION * LEARNING_RATE) * model + LEARNING_RATE * grad
#else:
# model = 2 * model - model_previous_previous + (LEARNING_RATE * grad - REGULARIZATION * LEARNING_RATE * model) - (LEARNING_RATE * grad_previous_previous - REGULARIZATION * LEARNING_RATE * model_previous_previous)
model_neighbor1 = 0.0 * model
model_neighbor2 = 0.0 * model
model_neighbor3 = 0.0 * model
model_neighbor4 = 0.0 * model
neighbor1 = (rank + 1) % size
neighbor2 = (rank - 1) % size
neighbor3 = (rank + 4) % size
neighbor4 = (rank - 4) % size
#print rank, "---", neighbor1
#print rank, "---", neighbor2
#print rank, "---", neighbor3
#print rank, "---", neighbor4
send_req1 = comm.isend(model, dest=neighbor1)
send_req2 = comm.isend(model, dest=neighbor2)
send_req3 = comm.isend(model, dest=neighbor3)
send_req4 = comm.isend(model, dest=neighbor4)
model_neighbor1 = comm.recv(source=neighbor1)
model_neighbor2 = comm.recv(source=neighbor2)
model_neighbor3 = comm.recv(source=neighbor3)
model_neighbor4 = comm.recv(source=neighbor4)
send_req1.wait()
send_req2.wait()
send_req3.wait()
send_req4.wait()
#print "##"
model = (model + model_neighbor1 + model_neighbor2 + model_neighbor3 + model_neighbor4) / 5
model = (1 - REGULARIZATION * LEARNING_RATE) * model + LEARNING_RATE * grad
#model = model + 0.2 * ( (model_neighbor1 - model) + (model_neighbor2 - model) + (model_neighbor3 - model) + (model_neighbor4 - model) )
if COMMUNICATION == "STUPIDALLREDUCE":
model = (1 - REGULARIZATION * LEARNING_RATE) * model + LEARNING_RATE * grad
sum_models = 0.0 * model
reqs = []
for dst in range(0, size):
if dst == rank: continue
send_req1 = comm.isend(model, dest=dst)
reqs.append(send_req1)
model_neighbor1 = 0.0 * model
for dst in range(0, size):
if dst == rank: continue
model_neighbor1 = comm.recv(source=dst)
sum_models = sum_models + model_neighbor1
model = (model + sum_models) / size
for req in reqs:
req.wait()
#end = time.time()
#print rank, "time", end - start
global_model = 0.0 * model
comm.Allreduce(model, global_model)
global_model = global_model / size
loss = np.zeros(1)
sumloss = np.zeros(1)
for batch in enumerate_one_epoch(LOCAL_FILE):
for (label, features) in batch:
logits = np.dot(features, global_model.T)
probs = np.exp(logits)
probs = probs / sum(probs)
loss = loss + (- math.log(probs[label])) / NLOCALTUPLES
print " ", rank, loss
comm.Allreduce(loss, sumloss)
if rank == 0:
print sumloss / size
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment