Created
January 11, 2018 13:04
-
-
Save zhangce/5771a6be766ada736d40b47bad83bfc8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3.5 | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torch.nn.functional as F | |
import torch.backends.cudnn as cudnn | |
import torchvision | |
import torchvision.transforms as transforms | |
import os | |
import argparse | |
import math | |
from torch.autograd import Variable | |
import os | |
import torch | |
import torch.distributed as dist | |
import torch.multiprocessing as mp | |
from torch.utils.data.distributed import DistributedSampler | |
from torch.utils.data.sampler import RandomSampler | |
from torch.optim.lr_scheduler import MultiStepLR | |
from torch.optim.lr_scheduler import LambdaLR | |
import numpy as np | |
import copy | |
from mpi4py import MPI | |
import random | |
import time | |
comm = MPI.COMM_WORLD | |
size = comm.Get_size() | |
rank = comm.Get_rank() | |
import os | |
NGPU_PER_MACHINE = 8 | |
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID" | |
os.environ["CUDA_VISIBLE_DEVICES"]= "%d" % (rank % NGPU_PER_MACHINE) | |
print ("Rank %d, Using GPU %s" % (rank, os.environ["CUDA_VISIBLE_DEVICES"])) | |
import sys | |
LEARNING_RATE = float(sys.argv[1]) | |
MOMEMTUM = float(sys.argv[2]) | |
COMMUNICATION = sys.argv[3] | |
PACKAGE_DROP_RATE = float(sys.argv[4]) | |
PARTITIONED_LOAD = False | |
if rank == 0: | |
print ("# LEARNING_RATE: %f" % LEARNING_RATE) | |
print ("# MOMEMTUM: %f" % MOMEMTUM) | |
print ("# COMMUNICATION: %s" % COMMUNICATION) | |
print ("# PACKAGE_DROP_RATE: %f" % PACKAGE_DROP_RATE) | |
transform_train = transforms.Compose([ | |
# transforms.RandomCrop(32, padding=4), | |
# transforms.RandomHorizontalFlip(), | |
transforms.ToTensor(), | |
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), | |
]) | |
transform_test = transforms.Compose([ | |
transforms.ToTensor(), | |
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), | |
]) | |
""" | |
class Bottleneck(nn.Module): | |
def __init__(self, in_planes, growth_rate): | |
super(Bottleneck, self).__init__() | |
self.bn1 = nn.BatchNorm2d(in_planes) | |
self.conv1 = nn.Conv2d(in_planes, 4*growth_rate, kernel_size=1, bias=False) | |
self.bn2 = nn.BatchNorm2d(4*growth_rate) | |
self.conv2 = nn.Conv2d(4*growth_rate, growth_rate, kernel_size=3, padding=1, bias=False) | |
def forward(self, x): | |
out = self.conv1(F.relu(self.bn1(x))) | |
out = self.conv2(F.relu(self.bn2(out))) | |
out = torch.cat([out,x], 1) | |
return out | |
class Transition(nn.Module): | |
def __init__(self, in_planes, out_planes): | |
super(Transition, self).__init__() | |
self.bn = nn.BatchNorm2d(in_planes) | |
self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=1, bias=False) | |
def forward(self, x): | |
out = self.conv(F.relu(self.bn(x))) | |
out = F.avg_pool2d(out, 2) | |
return out | |
class DenseNet(nn.Module): | |
def __init__(self, block, nblocks, growth_rate=12, reduction=0.5, num_classes=10): | |
super(DenseNet, self).__init__() | |
self.growth_rate = growth_rate | |
num_planes = 2*growth_rate | |
self.conv1 = nn.Conv2d(3, num_planes, kernel_size=3, padding=1, bias=False) | |
self.dense1 = self._make_dense_layers(block, num_planes, nblocks[0]) | |
num_planes += nblocks[0]*growth_rate | |
out_planes = int(math.floor(num_planes*reduction)) | |
self.trans1 = Transition(num_planes, out_planes) | |
num_planes = out_planes | |
self.dense2 = self._make_dense_layers(block, num_planes, nblocks[1]) | |
num_planes += nblocks[1]*growth_rate | |
out_planes = int(math.floor(num_planes*reduction)) | |
self.trans2 = Transition(num_planes, out_planes) | |
num_planes = out_planes | |
self.dense3 = self._make_dense_layers(block, num_planes, nblocks[2]) | |
num_planes += nblocks[2]*growth_rate | |
out_planes = int(math.floor(num_planes*reduction)) | |
self.trans3 = Transition(num_planes, out_planes) | |
num_planes = out_planes | |
self.dense4 = self._make_dense_layers(block, num_planes, nblocks[3]) | |
num_planes += nblocks[3]*growth_rate | |
self.bn = nn.BatchNorm2d(num_planes) | |
self.linear = nn.Linear(num_planes, num_classes) | |
def _make_dense_layers(self, block, in_planes, nblock): | |
layers = [] | |
for i in range(nblock): | |
layers.append(block(in_planes, self.growth_rate)) | |
in_planes += self.growth_rate | |
return nn.Sequential(*layers) | |
def forward(self, x): | |
out = self.conv1(x) | |
out = self.trans1(self.dense1(out)) | |
out = self.trans2(self.dense2(out)) | |
out = self.trans3(self.dense3(out)) | |
out = self.dense4(out) | |
out = F.avg_pool2d(F.relu(self.bn(out)), 4) | |
out = out.view(out.size(0), -1) | |
out = self.linear(out) | |
return out | |
net = DenseNet(Bottleneck, [6,12,24,16], growth_rate=12) | |
net2 = DenseNet(Bottleneck, [6,12,24,16], growth_rate=12) | |
""" | |
class BasicBlock(nn.Module): | |
expansion = 1 | |
def __init__(self, in_planes, planes, stride=1): | |
super(BasicBlock, self).__init__() | |
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) | |
self.bn1 = nn.BatchNorm2d(planes) | |
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False) | |
self.bn2 = nn.BatchNorm2d(planes) | |
self.shortcut = nn.Sequential() | |
if stride != 1 or in_planes != self.expansion*planes: | |
self.shortcut = nn.Sequential( | |
nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False), | |
nn.BatchNorm2d(self.expansion*planes) | |
) | |
def forward(self, x): | |
out = F.relu(self.bn1(self.conv1(x))) | |
out = self.bn2(self.conv2(out)) | |
out += self.shortcut(x) | |
out = F.relu(out) | |
return out | |
class Bottleneck(nn.Module): | |
expansion = 4 | |
def __init__(self, in_planes, planes, stride=1): | |
super(Bottleneck, self).__init__() | |
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False) | |
self.bn1 = nn.BatchNorm2d(planes) | |
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) | |
self.bn2 = nn.BatchNorm2d(planes) | |
self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False) | |
self.bn3 = nn.BatchNorm2d(self.expansion*planes) | |
self.shortcut = nn.Sequential() | |
if stride != 1 or in_planes != self.expansion*planes: | |
self.shortcut = nn.Sequential( | |
nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False), | |
nn.BatchNorm2d(self.expansion*planes) | |
) | |
def forward(self, x): | |
out = F.relu(self.bn1(self.conv1(x))) | |
out = F.relu(self.bn2(self.conv2(out))) | |
out = self.bn3(self.conv3(out)) | |
out += self.shortcut(x) | |
out = F.relu(out) | |
return out | |
class ResNet(nn.Module): | |
def __init__(self, block, num_blocks, num_classes=10): | |
super(ResNet, self).__init__() | |
self.in_planes = 64 | |
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False) | |
self.bn1 = nn.BatchNorm2d(64) | |
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) | |
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) | |
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) | |
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) | |
self.linear = nn.Linear(512*block.expansion, num_classes) | |
def _make_layer(self, block, planes, num_blocks, stride): | |
strides = [stride] + [1]*(num_blocks-1) | |
layers = [] | |
for stride in strides: | |
layers.append(block(self.in_planes, planes, stride)) | |
self.in_planes = planes * block.expansion | |
return nn.Sequential(*layers) | |
def forward(self, x): | |
out = F.relu(self.bn1(self.conv1(x))) | |
out = self.layer1(out) | |
out = self.layer2(out) | |
out = self.layer3(out) | |
out = self.layer4(out) | |
out = F.avg_pool2d(out, 4) | |
out = out.view(out.size(0), -1) | |
out = self.linear(out) | |
return out | |
net = ResNet(BasicBlock, [2, 2, 2, 2]) | |
net2 = ResNet(BasicBlock, [2, 2, 2, 2]) | |
net.cuda() | |
net2.cuda() | |
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train) | |
if PARTITIONED_LOAD == False: | |
dsampler = RandomSampler(trainset) | |
trainloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler) | |
else: | |
dsampler = DistributedSampler(trainset, size, rank) | |
trainloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler) | |
dsampler2 = DistributedSampler(trainset, size, rank) | |
evalloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler2) | |
criterion = nn.CrossEntropyLoss() | |
optimizer = optim.SGD(net.parameters(), lr=LEARNING_RATE, momentum=MOMEMTUM, weight_decay=0.0) | |
scheduler = MultiStepLR(optimizer, milestones=[50,], gamma=0.1) | |
#scheduler = LambdaLR(optimizer, lr_lambda=lambda epoch: LEARNING_RATE / math.sqrt(epoch+1)) | |
model = net.cuda() | |
model2 = net2.cuda() | |
random.seed(rank) | |
## Model Avg -- Everyone starts from the same mdoel at the begining | |
for param in model.parameters(): | |
view = param.cpu().data.numpy() | |
view2 = 0.0 * view | |
comm.Allreduce(view, view2, op=MPI.SUM) | |
param.data = torch.from_numpy(view2).cuda() | |
param.data /= size | |
i = 0 | |
for iepoch in range(0, 100): | |
if PARTITIONED_LOAD == True: | |
scheduler.step() | |
net.train() | |
train_loss = 0 | |
correct = 0 | |
total = 0 | |
start = time.time() | |
if (iepoch + 1) % 25 == 0: | |
LEARNING_RATE = LEARNING_RATE / 10 | |
for batch_idx, (inputs, targets) in enumerate(trainloader): | |
i = i + 1 | |
if PARTITIONED_LOAD == False: | |
if (i % int(50000/8192)) == 0: | |
scheduler.step() | |
optimizer.zero_grad() | |
inputs, targets = Variable(inputs.cuda()), Variable(targets.cuda()) | |
outputs = net(inputs) | |
loss = criterion(outputs, targets) | |
loss.backward() | |
# Centralized Communications | |
# | |
if COMMUNICATION == "CENTRALIZED": | |
for param in model.parameters(): | |
view = param.grad.cpu().data.numpy() | |
view2 = 0.0 * view | |
comm.Allreduce(view, view2, op=MPI.SUM) | |
view2 = view2 / size | |
#newmodel = param.cpu().data.numpy() - LEARNING_RATE * view2 | |
#param.data = torch.from_numpy(newmodel).cuda() | |
param.grad.data = torch.from_numpy(view2).cuda() | |
# TODO: | |
# try impl SGD by itself | |
optimizer.step() | |
elif COMMUNICATION == "DECENTRALIZED": | |
optimizer.step() | |
for param in model.parameters(): | |
view = param.cpu().data.numpy() | |
model_neighbor1 = 0.0 * view | |
model_neighbor2 = 0.0 * view | |
neighbor1 = (rank - 1) % size | |
neighbor2 = (rank + 1) % size | |
send_req1 = comm.isend(view, dest=neighbor1) | |
send_req2 = comm.isend(view, dest=neighbor2) | |
model_neighbor1 = comm.recv(source=neighbor1) | |
model_neighbor2 = comm.recv(source=neighbor2) | |
send_req1.wait() | |
send_req2.wait() | |
view2 = (model_neighbor1 + model_neighbor2 + view) / 3 | |
param.data = torch.from_numpy(view2).cuda() | |
elif COMMUNICATION == "PACKAGEDROP": | |
optimizer.step() | |
for param in model.parameters(): | |
view = param.cpu().data.numpy() | |
drop1_local = np.array([0.0,]) | |
if random.random() < PACKAGE_DROP_RATE: | |
view = 0.0 * view | |
drop1_local = drop1_local + 1 | |
#print (" drop1 package rank", rank) | |
drop1_global = np.array([0.0,]) | |
comm.Allreduce(drop1_local, drop1_global, op=MPI.SUM) | |
drop1_global = drop1_global[0] | |
#print ("drop1_global = ", drop1_global, "/", size) | |
view2 = 0.0 * view | |
comm.Allreduce(view, view2, op=MPI.SUM) | |
view2 = view2 / (size - drop1_global) | |
if random.random() < PACKAGE_DROP_RATE: | |
pass | |
#print (" drop2 package rank", rank) | |
else: | |
param.data = torch.from_numpy(view2).cuda() | |
if (i % int(50000/8192)) == 0: | |
if rank == 0: | |
elapsed = time.time() - start | |
# Evaluation | |
net2.train() | |
net2.load_state_dict(net.state_dict()) | |
for param in model2.parameters(): | |
view = param.cpu().data.numpy() | |
view2 = 1.0 * view | |
comm.Allreduce(view, view2, op=MPI.SUM) | |
param.data = torch.from_numpy(view2).cuda() | |
param.data /= size | |
_train_loss = 0 | |
for batch_idx, (inputs, targets) in enumerate(evalloader): | |
inputs, targets = Variable(inputs.cuda()), Variable(targets.cuda()) | |
outputs = net2(inputs) | |
loss = criterion(outputs, targets) | |
_train_loss += loss.data[0] | |
_train_loss = np.array([_train_loss,]) | |
train_loss = 0.0 * _train_loss | |
comm.Allreduce(_train_loss, train_loss, op=MPI.SUM) | |
train_loss = train_loss[0] | |
if PARTITIONED_LOAD == False: | |
if rank == 0: | |
print ('%d Loss: %.3f | %f seconds | LR: %f ~' % (i / int(50000/8192),train_loss/size/(batch_idx+1),elapsed, LEARNING_RATE)) | |
else: | |
if rank == 0: | |
print ('%d Loss: %.3f | %f seconds | LR: %f' % (iepoch,train_loss/size/(batch_idx+1),elapsed, LEARNING_RATE)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment