Skip to content

Instantly share code, notes, and snippets.

@zhangce
Created January 11, 2018 13:04
Show Gist options
  • Save zhangce/5771a6be766ada736d40b47bad83bfc8 to your computer and use it in GitHub Desktop.
Save zhangce/5771a6be766ada736d40b47bad83bfc8 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3.5
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torchvision
import torchvision.transforms as transforms
import os
import argparse
import math
from torch.autograd import Variable
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data.sampler import RandomSampler
from torch.optim.lr_scheduler import MultiStepLR
from torch.optim.lr_scheduler import LambdaLR
import numpy as np
import copy
from mpi4py import MPI
import random
import time
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
import os
NGPU_PER_MACHINE = 8
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]= "%d" % (rank % NGPU_PER_MACHINE)
print ("Rank %d, Using GPU %s" % (rank, os.environ["CUDA_VISIBLE_DEVICES"]))
import sys
LEARNING_RATE = float(sys.argv[1])
MOMEMTUM = float(sys.argv[2])
COMMUNICATION = sys.argv[3]
PACKAGE_DROP_RATE = float(sys.argv[4])
PARTITIONED_LOAD = False
if rank == 0:
print ("# LEARNING_RATE: %f" % LEARNING_RATE)
print ("# MOMEMTUM: %f" % MOMEMTUM)
print ("# COMMUNICATION: %s" % COMMUNICATION)
print ("# PACKAGE_DROP_RATE: %f" % PACKAGE_DROP_RATE)
transform_train = transforms.Compose([
# transforms.RandomCrop(32, padding=4),
# transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
"""
class Bottleneck(nn.Module):
def __init__(self, in_planes, growth_rate):
super(Bottleneck, self).__init__()
self.bn1 = nn.BatchNorm2d(in_planes)
self.conv1 = nn.Conv2d(in_planes, 4*growth_rate, kernel_size=1, bias=False)
self.bn2 = nn.BatchNorm2d(4*growth_rate)
self.conv2 = nn.Conv2d(4*growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)
def forward(self, x):
out = self.conv1(F.relu(self.bn1(x)))
out = self.conv2(F.relu(self.bn2(out)))
out = torch.cat([out,x], 1)
return out
class Transition(nn.Module):
def __init__(self, in_planes, out_planes):
super(Transition, self).__init__()
self.bn = nn.BatchNorm2d(in_planes)
self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=1, bias=False)
def forward(self, x):
out = self.conv(F.relu(self.bn(x)))
out = F.avg_pool2d(out, 2)
return out
class DenseNet(nn.Module):
def __init__(self, block, nblocks, growth_rate=12, reduction=0.5, num_classes=10):
super(DenseNet, self).__init__()
self.growth_rate = growth_rate
num_planes = 2*growth_rate
self.conv1 = nn.Conv2d(3, num_planes, kernel_size=3, padding=1, bias=False)
self.dense1 = self._make_dense_layers(block, num_planes, nblocks[0])
num_planes += nblocks[0]*growth_rate
out_planes = int(math.floor(num_planes*reduction))
self.trans1 = Transition(num_planes, out_planes)
num_planes = out_planes
self.dense2 = self._make_dense_layers(block, num_planes, nblocks[1])
num_planes += nblocks[1]*growth_rate
out_planes = int(math.floor(num_planes*reduction))
self.trans2 = Transition(num_planes, out_planes)
num_planes = out_planes
self.dense3 = self._make_dense_layers(block, num_planes, nblocks[2])
num_planes += nblocks[2]*growth_rate
out_planes = int(math.floor(num_planes*reduction))
self.trans3 = Transition(num_planes, out_planes)
num_planes = out_planes
self.dense4 = self._make_dense_layers(block, num_planes, nblocks[3])
num_planes += nblocks[3]*growth_rate
self.bn = nn.BatchNorm2d(num_planes)
self.linear = nn.Linear(num_planes, num_classes)
def _make_dense_layers(self, block, in_planes, nblock):
layers = []
for i in range(nblock):
layers.append(block(in_planes, self.growth_rate))
in_planes += self.growth_rate
return nn.Sequential(*layers)
def forward(self, x):
out = self.conv1(x)
out = self.trans1(self.dense1(out))
out = self.trans2(self.dense2(out))
out = self.trans3(self.dense3(out))
out = self.dense4(out)
out = F.avg_pool2d(F.relu(self.bn(out)), 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
net = DenseNet(Bottleneck, [6,12,24,16], growth_rate=12)
net2 = DenseNet(Bottleneck, [6,12,24,16], growth_rate=12)
"""
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion*planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion*planes)
)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, in_planes, planes, stride=1):
super(Bottleneck, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm2d(self.expansion*planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion*planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion*planes)
)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = F.relu(self.bn2(self.conv2(out)))
out = self.bn3(self.conv3(out))
out += self.shortcut(x)
out = F.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, num_blocks, num_classes=10):
super(ResNet, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.linear = nn.Linear(512*block.expansion, num_classes)
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1]*(num_blocks-1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
net = ResNet(BasicBlock, [2, 2, 2, 2])
net2 = ResNet(BasicBlock, [2, 2, 2, 2])
net.cuda()
net2.cuda()
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
if PARTITIONED_LOAD == False:
dsampler = RandomSampler(trainset)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler)
else:
dsampler = DistributedSampler(trainset, size, rank)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler)
dsampler2 = DistributedSampler(trainset, size, rank)
evalloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler2)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=LEARNING_RATE, momentum=MOMEMTUM, weight_decay=0.0)
scheduler = MultiStepLR(optimizer, milestones=[50,], gamma=0.1)
#scheduler = LambdaLR(optimizer, lr_lambda=lambda epoch: LEARNING_RATE / math.sqrt(epoch+1))
model = net.cuda()
model2 = net2.cuda()
random.seed(rank)
## Model Avg -- Everyone starts from the same mdoel at the begining
for param in model.parameters():
view = param.cpu().data.numpy()
view2 = 0.0 * view
comm.Allreduce(view, view2, op=MPI.SUM)
param.data = torch.from_numpy(view2).cuda()
param.data /= size
i = 0
for iepoch in range(0, 100):
if PARTITIONED_LOAD == True:
scheduler.step()
net.train()
train_loss = 0
correct = 0
total = 0
start = time.time()
if (iepoch + 1) % 25 == 0:
LEARNING_RATE = LEARNING_RATE / 10
for batch_idx, (inputs, targets) in enumerate(trainloader):
i = i + 1
if PARTITIONED_LOAD == False:
if (i % int(50000/8192)) == 0:
scheduler.step()
optimizer.zero_grad()
inputs, targets = Variable(inputs.cuda()), Variable(targets.cuda())
outputs = net(inputs)
loss = criterion(outputs, targets)
loss.backward()
# Centralized Communications
#
if COMMUNICATION == "CENTRALIZED":
for param in model.parameters():
view = param.grad.cpu().data.numpy()
view2 = 0.0 * view
comm.Allreduce(view, view2, op=MPI.SUM)
view2 = view2 / size
#newmodel = param.cpu().data.numpy() - LEARNING_RATE * view2
#param.data = torch.from_numpy(newmodel).cuda()
param.grad.data = torch.from_numpy(view2).cuda()
# TODO:
# try impl SGD by itself
optimizer.step()
elif COMMUNICATION == "DECENTRALIZED":
optimizer.step()
for param in model.parameters():
view = param.cpu().data.numpy()
model_neighbor1 = 0.0 * view
model_neighbor2 = 0.0 * view
neighbor1 = (rank - 1) % size
neighbor2 = (rank + 1) % size
send_req1 = comm.isend(view, dest=neighbor1)
send_req2 = comm.isend(view, dest=neighbor2)
model_neighbor1 = comm.recv(source=neighbor1)
model_neighbor2 = comm.recv(source=neighbor2)
send_req1.wait()
send_req2.wait()
view2 = (model_neighbor1 + model_neighbor2 + view) / 3
param.data = torch.from_numpy(view2).cuda()
elif COMMUNICATION == "PACKAGEDROP":
optimizer.step()
for param in model.parameters():
view = param.cpu().data.numpy()
drop1_local = np.array([0.0,])
if random.random() < PACKAGE_DROP_RATE:
view = 0.0 * view
drop1_local = drop1_local + 1
#print (" drop1 package rank", rank)
drop1_global = np.array([0.0,])
comm.Allreduce(drop1_local, drop1_global, op=MPI.SUM)
drop1_global = drop1_global[0]
#print ("drop1_global = ", drop1_global, "/", size)
view2 = 0.0 * view
comm.Allreduce(view, view2, op=MPI.SUM)
view2 = view2 / (size - drop1_global)
if random.random() < PACKAGE_DROP_RATE:
pass
#print (" drop2 package rank", rank)
else:
param.data = torch.from_numpy(view2).cuda()
if (i % int(50000/8192)) == 0:
if rank == 0:
elapsed = time.time() - start
# Evaluation
net2.train()
net2.load_state_dict(net.state_dict())
for param in model2.parameters():
view = param.cpu().data.numpy()
view2 = 1.0 * view
comm.Allreduce(view, view2, op=MPI.SUM)
param.data = torch.from_numpy(view2).cuda()
param.data /= size
_train_loss = 0
for batch_idx, (inputs, targets) in enumerate(evalloader):
inputs, targets = Variable(inputs.cuda()), Variable(targets.cuda())
outputs = net2(inputs)
loss = criterion(outputs, targets)
_train_loss += loss.data[0]
_train_loss = np.array([_train_loss,])
train_loss = 0.0 * _train_loss
comm.Allreduce(_train_loss, train_loss, op=MPI.SUM)
train_loss = train_loss[0]
if PARTITIONED_LOAD == False:
if rank == 0:
print ('%d Loss: %.3f | %f seconds | LR: %f ~' % (i / int(50000/8192),train_loss/size/(batch_idx+1),elapsed, LEARNING_RATE))
else:
if rank == 0:
print ('%d Loss: %.3f | %f seconds | LR: %f' % (iepoch,train_loss/size/(batch_idx+1),elapsed, LEARNING_RATE))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment