Last active July 25, 2024 20:59
High Performance Streaming Analytics in Julia
# This code is part of a presentation on streaming analytics in Julia
# It was inspired by a number of individuals and makes use of some of their ideas
# 1. got me thinking about inline processing after
# reading his great Vowpal Wabbit posts
# 2. John Lanford and his fantastic Vowpal Wabbit library.
# Check out his NYU video course to learn more (see below)
# 3. John Myles White's presentation on online SDG and his StreamStats.jl library
# Thank you all!
path = dirname(@__FILE__) # path to the script. Will fail if used from console (set path manually)
# Before we begin we're going to need some data.
# Below, we create two files with 5k and 500k rows respectively.
# Feel free to try this out with larger files, but if CSV format be prepared to wait!
# csv data files
faux_data_5k = joinpath(path,"faux_data_5k.csv")
faux_data_500k = joinpath(path,"faux_data_500k.csv")
function fauxCSVData(path, n, p)
iostrm = open(path,"w")
xs = rand(n,p) # e.g. 32 x 32 images
y = ones(n)
data = hcat(y,xs)
println("Completed Write out...")
# let's create some LARGE files with lots of features/columns
p = 32 * 32 # e.g. 1024 columns (32 x 32 images)
@time fauxCSVData(faux_data_5k, 5000, p) # <- 5k rows of 1024 columns (32 x 32) 93.6MB
# 1.68 seconds (0.25GB allocated, 9.77% gc time)
# WARNING: Don't run during pesentation (over 6 min!)
@time fauxCSVData(faux_data_500k, 500_000, p) # <-500k rows of 1024 columns (32 x 32) 9.36GB
# 339.7 seconds (25.8GB bytes allocated, 4.15% gc time)
# now, lets load this data into memory and calculate the mean of the 1st feature/column
function CSVread(path::String)
data = readcsv(path)
@time CSVread(faux_data_5k)
# elapsed time: 4.038636026 seconds (689176136 bytes allocated, 3.34% gc time)
# WARNING: Don't run during pesentation (over 13 min!!!)
@time CSVread(faux_data_500k) # <- needed about 1.9 GB of RAN at its peak
# elapsed time: 787.871814233 seconds (22229432176 bytes allocated, 1.72% gc time)
# Note
# ==============
# 1. This still works if your plan is to load it once and do some feature engineering
# 2. However, we're already taking a really long time and we're only at 500K rows
# 3. We can speed this up a lot by moving from
# "Load all -> manipulate/calc all" to "Load 1 row -> manipulate/calc 1 row"
function inlineCSVRead(path::String)
iostrm = open(path,"r") # open an IO stream to the file
total = 0.
n = 0
while !eof(iostrm)
l = readline(iostrm) # reads in a line as a string
v = split(l,",") # split it into an array of strings
n += 1
total += parse(Float64,v[1]) # cast string to a value
println("Completed read in... \ntotal = $total avg = $(total/n)")
@time inlineCSVRead(faux_data_5k)
# elapsed time: 0.594267434 seconds (412923464 bytes allocated, 43.24% gc time)
@time inlineCSVRead(faux_data_500k)
# elapsed time: 70.693168822 seconds (41283386680 bytes allocated, 41.23% gc time)
# Note
# ========
# 1. 5k went from 4.04 secs to 0.59 secs
# 2. 500k went from 787.87 secs to 70.69 secs
# 3. Garbage collection went through the roof at > 41%!!
# 4. There's an important shift from thinking of data as a big table
# to thinking of data as a series of event.
# Really fast IO
# ==================
# 1. To get really fast IO in Julia two conditions need to be met
# Turn all non-numeric data into numeric data (one-hot encoding, word-vecs, etc)
# Store it as binary data, not csv
# Let's create some binary data
faux_data_fast_5k = joinpath(path,"faux_data_fast_5k.dat")
faux_data_fast_500k = joinpath(path,"faux_data_fast_500k.dat")
faux_data_fast_5M = joinpath(path,"faux_data_fast_5M.dat")
function writeFast(path, n,p)
iostrm = open(path,"w")
x = zeros(Float32,p)
for y in 1:n
y += 1
write(iostrm,y) # <- we're writing a float directly to disk
rand!(x) # e.g. 32 x 32 images
write(iostrm,x) # <- we're writing a bunch of floats directly to disk
println("Completed write out...")
@time writeFast(faux_data_fast_5k, 5000, p)
# 5k rows of 1024 columns (32 x 32) 20.5MB and took 0.037 seconds (5128 bytes allocated))
@time writeFast(faux_data_fast_500k, 500_000, p)
# 5k rows of 1024 columns (32 x 32) 20.5MB and took 3.509 seconds (5128 bytes allocated))
# WARNING: Might not want to run. Prepare to talk during this part of pesentation (33 sec!!!)
@time writeFast(faux_data_fast_5M, 5_000_000, p)
# 5M rows of 1024 columns (32 x 32) 20GB takes about 33.15 secs
# Note
# ===========
# 1. 5k comparision: 1.68 seconds (csv) vs 0.037 seconds (binary)
# 2. 500k comparision: 339.7 seconds seconds (csv) vs 3.509 seconds (binary)
# 3. 5M no comparision but only took 33.509 seconds for about 20GB (binary)
# 4. Writing data in binary is both faster and uses less disk
# Next, let's try reading through it to calculate the mean of Y.
function fastRead(path::String, p)
iostrm = open(path,"r")
n = 0
total = 0.
while !eof(iostrm)
y = read(iostrm,Int64,1) # <-- lots of memory allocation happening here!
x = read(iostrm,Float32,p) # <-- and here!
n += 1
total += y
println("Completed read in... \ntotal = $total avg = $(total/(n))")
@time fastRead(faux_data_fast_5k, (32 * 32))
# elapsed time: 0.035616242 seconds (22122928 bytes allocated, 57.90% gc time)
@time fastRead(faux_data_fast_500k, (32 * 32))
# elapsed time: 2.705912819 seconds (2212002896 bytes allocated, 57.32% gc time)
@time fastRead(faux_data_fast_5M, (32 * 32))
# elapsed time: 42.250951046 seconds (22120002848 bytes allocated, 39.31% gc time
# Note
# ===========
# 1. 5k read: 0.59 seconds (inline csv) vs 0.035 seconds (binary)
# 2. 500k read: 70.69 seconds seconds (inline csv) vs 2.71 seconds (binary)
# 3. 5Mk read took 42.02 seconds to read through a 20GB file (binary)
# 4. As expected this is a lot faster but we've a lot of garbage collection happening
# 5. We can do better!
# let's get rid of the unnecessary GC
function fastRead_No_GC(path::String, p)
iostrm = open(path,"r")
n = 0
total = 0.
x = zeros(Float32,p) # <-- pre-allocated array of floats
y = zeros(Int64,1) # <-- pre-allocated array of ints (important: only 1 cell but still array!)
while !eof(iostrm)
read!(iostrm,y) # <-- re-using pre-allocated memory (very fast, no GC!)
read!(iostrm,x) # <-- re-using pre-allocated memory (very fast, no GC!)]
n += 1
total += y
println("Completed read in... \ntotal = $total avg = $(total/(n))")
@time fastRead_No_GC(faux_data_fast_5k, (32 * 32))
# elapsed time: 0.006328654 seconds (686096 bytes allocated)
@time fastRead_No_GC(faux_data_fast_500k, (32 * 32))
# elapsed time: 0.567468112 seconds (68006256 bytes allocated, 5.51% gc time)
@time fastRead_No_GC(faux_data_fast_5M, (32 * 32))
# elapsed time: 25.674385541 seconds (680007392 bytes allocated, 2.10% gc time)
# Note
# ===========
# 1. 5k read: 0.035 seconds (binary w/GC) vs 0.0063 seconds (binary low GC)
# 2. 500k read: 2.71 seconds (binary w/GC) vs 0.57 seconds (binary low GC)
# 3. 5M read took 25.6 seconds to read through a 20GB file (binary)!
# 4. Now that we have fast IO let's move on to inline analytics
# we're going to start by generating some data for a L2 logistic Regression
# you will need the Distributions and StreamStats.jl Package
# Pkg.add("Distributions")
# Pkg.clone("")
using StreamStats, Distributions
invlogit(z::Real) = 1 / (1 + exp(-z))
function fauxDataFast(path::String, n::Int64, β₀::Float64, β::Array{Float64,1}, addnoise::Bool)
iostrm = open(path,"w")
p = length(β) # number of features/columns
x = zeros(Float64,p)
pᵢ = 0.
y = 0
for i in 1:n
x[1:p] = randn(p)
pᵢ = invlogit(β₀ + dot(β, x))
y = addnoise? rand(Bernoulli(pᵢ)) : (pᵢ < 0.5?0:1)
write(iostrm,y) # writeout Y
write(iostrm,x) # writeout Xs
println("Completed faux data write out...")
p_data = 100 # number of features in our faux data
β₀ = randn() # the intercept we're trying to learn
β = randn(p_data) # the beta coefficients we're trying to learn
faux_data_no_noise_5k = joinpath(path,"faux_data_no_noise_5k.dat")
faux_data_with_noise_5k = joinpath(path,"faux_data_with_noise_5k.dat")
@time fauxDataFast(faux_data_no_noise_5k, 5_000, β₀, β, false)
# elapsed time: 0.013943915 seconds (4481656 bytes allocated)
@time fauxDataFast(faux_data_with_noise_5k, 5_000, β₀, β, true)
# elapsed time: 0.011365431 seconds (4481656 bytes allocated)
faux_data_no_noise_500k = joinpath(path,"faux_data_no_noise_500k.dat")
faux_data_with_noise_500k = joinpath(path,"faux_data_with_noise_500k.dat")
@time fauxDataFast(faux_data_no_noise_500k, 500_000, β₀, β, false)
# elapsed time: 1.283577979 seconds (448002008 bytes allocated)
@time fauxDataFast(faux_data_with_noise_500k,500_000, β₀, β, true)
# elapsed time: 1.357101841 seconds (448002216 bytes allocated)
faux_data_no_noise_5M = joinpath(path,"faux_data_no_noise_5M.dat")
faux_data_with_noise_5M = joinpath(path,"faux_data_with_noise_5M.dat")
@time fauxDataFast(faux_data_no_noise_5M, 5_000_000, β₀, β, false)
# elapsed time: 12.32556777 seconds
@time fauxDataFast(faux_data_with_noise_5M, 5_000_000, β₀, β, true)
# elapsed time: 12.91362036 seconds
function streamingLogistic(path::String, p, stat)
iostrm = open(path,"r")
x = zeros(Float64,p) # <-- pre-allocated array of floats
y = zeros(Int64,1) # <-- pre-allocated array of ints (important: only 1 cell but still array!)
n = 0 # number of examples
c = 0 # number of correct
while !eof(iostrm)
read!(iostrm,y) # <-- re-using pre-allocated memory (very fast, no GC!)
read!(iostrm,x) # <-- re-using pre-allocated memory (very fast, no GC!)]
yhat = invlogit(stat.β₀ + dot(stat.β, x)) # first predict using latest example
StreamStats.update!(stat, x, y[1]) # <- now use that example to update the model
c += (yhat < 0.5?0:1) == y[1]? 1:0 # number correct
n += 1
pct_corr = round(c/n*100,2)
println("$n examples processed. $(pct_corr)% correct")
return pct_corr
λ = 0.00001 # L2 penalty
α = 0.01 # learning rate
# 5k examples
logReg = StreamStats.ApproxL2Logit(length(β),λ,α)
@time corr = streamingLogistic(faux_data_no_noise_5k, length(β), logReg) # 1st pass
# 88.72 % correct elapsed time: 0.006953184 seconds (2284 bytes allocated)
@time corr = streamingLogistic(faux_data_no_noise_5k, length(β), logReg) # 2nd pass
# 95.18 % correct elapsed time: 0.008152086 seconds (2508 bytes allocated)
empty!(logReg) # reset model
@time corr = streamingLogistic(faux_data_with_noise_5k, length(β), logReg) # 1st pass
# 86.92 % correct elapsed time: 0.007270419 seconds (2284 bytes allocated)
@time corr = streamingLogistic(faux_data_with_noise_5k, length(β), logReg) # 2nd pass
# 93.00 % correct elapsed time: 0.007605418 seconds (2284 bytes allocated)
# 500K exmaples
empty!(logReg) # reset model
@time corr = streamingLogistic(faux_data_no_noise_500k, length(β), logReg) # 1st pass
# 99.27 % correct elapsed time: 0.727339275 seconds (2496 bytes allocated)
@time corr = streamingLogistic(faux_data_no_noise_500k, length(β), logReg) # 2nd pass
# 99.75 % correct elapsed time: 0.712567852 seconds (2496 bytes allocated)
empty!(logReg) # reset model
@time corr = streamingLogistic(faux_data_with_noise_500k, length(β), logReg) # 1st pass
# 94.35 % correct elapsed time: 0.694565161 seconds (2496 bytes allocated)
@time corr = streamingLogistic(faux_data_with_noise_500k, length(β), logReg) # 2nd pass
# 94.48 % correct elapsed time: 0.731819073 seconds (2496 bytes allocated)
# 5M exmaples 4.04 GB files
empty!(logReg) # reset model
@time corr = streamingLogistic(faux_data_no_noise_5M, length(β), logReg) # 1st pass
# 99.79 % correct elapsed time: 7.277882121 seconds (2496 bytes allocated)
@time corr = streamingLogistic(faux_data_no_noise_5M, length(β), logReg) # 2nd pass
# 99.91 % correct elapsed time: 7.264460519 seconds (2496 bytes allocated)
empty!(logReg) # reset model
@time corr = streamingLogistic(faux_data_with_noise_5M, length(β), logReg) # 1st pass
# 94.43 % correct elapsed time: 7.241979489 seconds (2496 bytes allocated)
@time corr = streamingLogistic(faux_data_with_noise_5M, length(β), logReg) # 2nd pass
# 94.44 % correct elapsed time: 7.265981804 seconds (2496 bytes allocated)
# ========================= Woohoo! =============================
# We just did a 5M row, 100 feature L2 regression in a little over 7.26 seconds. Wow!
# ===============================================================
# What's happening under the hood here?
# John Myles white's ApproxL2Logit update! from StreamStats.jl (annotated version)
function update!(stat::ApproxL2Logit, x::Vector{Float64}, y::Real)
stat.n += 1
y_pred = invlogit(stat.β₀ + dot(stat.β, x)) # <- current best predicttion
ε = y - y_pred # <- calculate the error (simple eh?)
# Intercept <- single pass update of intercept
gr₀ = ε - stat.λ * stat.β₀
stat.sum_gr₀_sq += gr₀^2
α₀ = stat.α / sqrt(stat.sum_gr₀_sq)
stat.β₀ += α₀ * gr₀
# Non-constant predictors <- single pass using adgrad to update of coefficients
for i in 1:length(x) # <- note the in-line update of each paramter in the vector (no GC)
grᵢ = ε * x[i] - stat.λ * stat.β[i]
stat.sum_gr_sq[i] += grᵢ^2 # <- in-line update
αᵢ = stat.α / sqrt(stat.sum_gr_sq[i])
stat.β[i] += αᵢ * grᵢ # <- in-line update
# ======= End of Gist ============
# Slides go on to explain NNGraph.jl and RecurrentNN.jl and...
# 1. "Sorry, L2 logistic regession is great, but I need non-linear algos"
# 2. Truns out that solution to any convex loss function can be approximated
# using online stochastic gradient descent (SGD)
# 3. How to cleanly deal with data schema using types
# Some interesting resources
# =========================
# 1. Zygmunt Zając's series on Vowpal Wabbit's and Ph
# VW blogs:
# Library (does alot in line processing)
# 2. John Langford's talk on Vowpal Wabbit's online learning
# Slides:
# part 1:
# part 2:
# 3. John Myles White's talk on online SDG
# 4. My own Reccurent.jl which use RMSProp (a variant of SDG) to learn deep recurrent nets
