Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save kaneplusplus/0153d8d91412ce0ddcdb772b0fec4f35 to your computer and use it in GitHub Desktop.
Save kaneplusplus/0153d8d91412ce0ddcdb772b0fec4f35 to your computer and use it in GitHub Desktop.
Code from my NESS 2016 Short Course
###########################################
# Install required packages.
###########################################
install.packages(c("iotools", "rbokeh",
"tidyr", "foreach", "doParallel",
"datadr", "trelliscope"))
# Download the Airline OnTime dataset.
url_prefix="http://stat-computing.org/dataexpo/2009/"
file_nums = as.character(1987:2008)
for (file_num in file_nums) {
url = paste0(url_prefix, file_num, ".csv.bz2")
new_file_name = paste0(file_num, ".csv.bz2")
download.file(url, destfile=new_file_name)
}
file_names = paste0(1987:2008, ".csv.bz2")
system.time({
total_lines = 0
for (file_name in file_names) {
total_lines = total_lines +
nrow(read.csv(bzfile(file_name)))
}
print(total_lines)
})
# [1] 123534969
# user system elapsed
# 2619.732 49.698 2671.296
###########################################
# Parallel processing example 1 - bzipped files
###########################################
library(foreach)
library(doParallel)
cl = makeCluster(4)
registerDoParallel(cl)
system.time({
total_lines = foreach (file_name=file_names,
.combine=`+`) %dopar% {
nrow(read.csv(bzfile(file_name)))
}
print(total_lines)
stopCluster(cl)
})
# [1] 12​3534969
# user system elapsed
# 0.039 0.107 996.029
###########################################
# Parallel processing example 2 - decompress files
###########################################
cl = makeCluster(4)
registerDoParallel(cl)
foreach (file_name=file_names) %dopar% {
x = read.csv(bzfile(file_name))
x[,11] = gsub("'", "", x[,11])
write.csv(x, gsub(".bz2", "", file_name),
row.names=FALSE, quote=FALSE)
}
stopCluster(cl)​
file_names = gsub(".bz2", "", file_names)
# Parallel processing example 3 - iotools
cl = makeCluster(4)
registerDoParallel(cl)
system.time({
total_lines = foreach(file_name=file_names,
.combine=`+`) %dopar% {
library(iotools)
nrow(read.csv.raw(file_name))
}
stopCluster(cl)
###########################################
# Parallel processing example 4 - rds files
###########################################
# Create the RDS files
library(iotools)
cl = makeCluster(4)
registerDoParallel(cl)
system.time({
total_lines = foreach(file_name=file_names) %dopar% {
library(iotools)
x = read.csv.raw(file_name)
saveRDS(x, gsub(".csv", ".rds", file_name))
}
stopCluster(cl)
})
# Load and get the line counts.
cl = makeCluster(4)
registerDoParallel(cl)
system.time({
total_lines = foreach(file_name=file_names,
.combine=`+`) %dopar% {
library(iotools)
nrow(readRDS(gsub(".csv", ".rds", file_name)))
}
stopCluster(cl)
})
###########################################
# A quick note on tidyr
###########################################
library(tidyr)
add_one = function(x) x+1
add = function(x, y) x+y
3 %>% add_one %>% add(10)
# [1] 14
###########################################
# Creating a single airline file
###########################################
normalize_df = function(x) {
rownames(x) = NULL
x$DepTime = sprintf("%04d", x$DepTime)
x$DepTime =
as.numeric(substr(x$DepTime, 1, 2))*60 +
as.numeric(substr(x$DepTime, 3, 4))
x
}
file_names = file_names
out_file = file("airline.csv", "wb")
write(paste(col_names, collapse=","), out_file)
for (file_name in file_names) {
file_name %>% read.csv.raw %>%
normalize_df %>% as.output(sep=",") %>%
writeBin(out_file)
}
close(out_file)
###########
# Use chunk.apply to count the rows in airline.csv
###########
xs = read.csv("1995.csv", nrows=50,
as.is=TRUE)
col_names = names(xs)
col_types = as.vector(sapply(xs, class))
chunk.apply(input="airline.csv",
FUN=function(chunk) {
nrow(dstrsplit(chunk, col_types, ","))
}, CH.MERGE=sum, parallel=6)
# [1] 86289324
#############
# Create an airline_apply function
############
airline_apply = function(fun, ch_merge=list,
...,
parallel=6) {
chunk.apply(input="airline.csv",
FUN=function(chunk){
x = dstrsplit(chunk, col_types, ",")
names(x) = col_names
fun(x, ...)
}, CH.MERGE=ch_merge,
parallel=parallel)
}
########
# rbokeh examples
#######
x = read.csv.raw("1995.csv")
x = na.omit(x[x$Month==1 & x$Dest=="JFK",])
figure() %>%
ly_hist(ArrDelay, data=x, breaks=40,
freq=FALSE) %>%
ly_density(ArrDelay, data=x)
figure() %>%
ly_points(ArrDelay, DepDelay, data=x,
color=UniqueCarrier,
hover=list(ArrDelay, DepDelay,
UniqueCarrier,
Origin))
#######
# trelliscope example
######
library(trelliscope)
x = read.csv.raw("2008.csv")
conn = vdbConn("airline-db",
name = "airline-db",
autoYes=TRUE)
bdma = divide(x, c("Month", "DayofMonth",
"Dest"))
class(bdma)
# [1] "ddf" "ddo" "kvMemory"
length(bdma)
# [1] 103753
bdma[[1]]$key
# [1] "Month= 1|DayofMonth= 1|Dest=ABE"
figure() %>%
ly_points(ArrDelay, DepDelay,
data=bdma[[1]]$value,
color=UniqueCarrier,
hover=list(ArrDelay, DepDelay,
UniqueCarrier,
Origin))
bdma_cog_fun = function(x) {
list(num_flights=cog(nrow(x), desc="Flights"),
num_airlines=cog(length(unique(x$UniqueCarriers)),
desc="Carriers"),
num_origins=cog(length(unique(x$Origin)), desc="Origins"),
avg_dist=cog(mean(x$Distance, na.rm=TRUE),
desc="Mean Flight Distance"),
sd_dist=cog(sd(x$Distance, na.rm=TRUE),
desc="SD Flight Distance"),
avg_dep_delay = cog(mean(x$DepDelay, na.rm=TRUE),
desc="Mean Dep Delay"),
avg_arrival_delay = cog(mean(x$ArrDelay, na.rm=TRUE),
desc="Mean Arrival Delay"))
}
delay_plot = function(x) {
figure() %>%
ly_points(ArrDelay, DepDelay, data=x,
color=UniqueCarrier,
hover=list(ArrDelay, DepDelay,
UniqueCarrier, Origin))
}
makeDisplay(bdma,
name="Delays",
group="DayMonthAirport",
desc="Delays by Day, Month, and Year",
panelFn=delay_plot,
cogFn=bdma_cog_fun)
view()
####
# OLS
###
my_lm = function(X, y) {
XTy = crossprod(X, y)
XTX = crossprod(X)
QR_XTX = qr(XTX)
keep_vars = QR_XTX$pivot[1:QR_XTX$rank]
solve(XTX[keep_vars, keep_vars]) %*%
XTy[keep_vars]
}
####
# Numerical Stability
###
x1 = 1:100
x2 = x1 * 1.01 + rnorm(100, 0, sd=7e-6)
X = cbind(x1, x2)
y = x1 + x2
qr.solve(t(X) %*% X) %*% t(X) %*% y
# Error in qr.solve(t(X) %*% X) : singular matrix 'a' in solve
beta = qr.solve(t(X) %*% X, tol=0) %*% t(X) %*% y
beta
# [,1]
# x1 0.9869174
# x2 1.0170043
sd(y - X %*% beta)
# [1] 0.1187066
sd(lm(y ~ x1 + x2 -1)$residuals)
# [1] 9.546656e-15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment