A simple restricted/expanded implementation of DataFrames.transform, using Dagger, for generic Tables.

using Distributed
nprocs() >= 2 || addprocs(2)
@everywhere using LinearAlgebra
@everywhere using TrackingTimers
@everywhere begin
# borrowed from DataFrames.jl
struct ByRow{F}
function (b::ByRow)(args...)
return [b.f((arg[i] for arg in args)...) for i in eachindex(args...)]
function hi(a, b)
return (; x=a * b, y=a + b)
svdfn(x, y) = svdvals(x * y')
function garbo(a)
@time begin
m = rand(length(a), length(a), length(a))
m = rand(length(a), length(a), length(a))
m = rand(length(a), length(a), length(a))
m = rand(length(a), length(a), length(a))
m = dropdims(sum(m; dims=3); dims=3)
r = diag(m) .+ a
return r
stringify(f::ByRow) = string("ByRow(", stringify(f.f), ")")
test_table = (; a=randn(1000), b=rand(1000))
result, log, t = @time transform(test_table, :a => (x -> 2x) => :c,
(:a, :c) => svdfn => :svd, (:svd, :a) => (+) => :sum,
(:sum, :svd) => ByRow(hi) => [:x, :y],
[:y, :a] => (+) => :z, (:a, :b) => svdfn => :svd_ab,
:b => garbo => :g);
open("logs.gv"; write=true) do io
return Dagger.show_plan(io, Dagger.get_logs!(log))
using Dagger, Tables, OrderedCollections
using PrettyTables, TrackingTimers
# unpack syntax `input_cols => f => output_cols`
function decompose_pairs(p::Pair{<:Any,<:Pair})
input = first(p)
f = first(last(p))
output = last(last(p))
return input, f, output
stringify(f) = repr(f; context=:compact => true)
function instrument(t::TrackingTimer, p::Pair{<:Any,<:Pair})
input, f, output = decompose_pairs(p)
name = string(input, "", stringify(f), "", output)
return input => t(f, name) => output
wrap(input::Symbol) = tuple(input)
wrap(input) = input
columnify(f) = (args...) -> Tables.columns(f(args...))
# input: any Tables.jl table.
# Output: an `OrderedDict` of columns, which is a Tables.jl column table
function transform(table, ps...)
t = TrackingTimer()
ctx = Context()
log = Dagger.LocalEventLog()
ctx.log_sink = log
tab = Tables.columns(table)
delayed_cols = OrderedDict{Symbol,Thunk}()
# Pre-populate with existing columns
for c in Tables.columnnames(tab)
col = delayed(Tables.getcolumn)(tab, c)
delayed_cols[c] = col
# Add in new columns from transformations
for p in ps
input, f, output = decompose_pairs(instrument(t, p))
cols = (delayed_cols[i] for i in wrap(input))
if length(wrap(output)) > 1
result = delayed(columnify(f); cache=true)(cols...)
for col in output
# not sure if `getcolumn` should be cached here... it should be very cheap
delayed_cols[col] = delayed(t(Tables.getcolumn); cache=true)(result,
delayed_cols[output] = delayed(f; cache=true)(cols...)
# Collect results
result = OrderedDict{Symbol,AbstractVector}()
for (k, v) in delayed_cols
result[k] = collect(ctx, v)
@info "Timing information" t
return result, log, t
julia> include("test.jl")
      From worker 3:     30.067755 seconds (187 allocations: 29.810 GiB, 5.14% gc time)
┌ Info: Timing information
│   t =
│    TrackingTimer: 31.38 s since creation (98% measured).
│                    name                   time    gctime  n_allocs    allocs    thread ID  proc ID 
│    ────────────────────────────────────────────────────────────────────────────────────────────────
│     b  garbo  g                        30.10 s      5%       236  29.810 GiB          2        3
│     (:a, :b)  svdfn  svd_ab             0.30 s      2%      1093  15.891 MiB          2        3
│     (:a, :c)  svdfn  svd                0.28 s      0%        44  15.841 MiB          2        3
│     (:sum, :svd)  ByRow(hi)  [:x, :y]   0.01 s      0%       575  45.547 KiB          2        3
│     (:svd, :a)  +  sum                  0.00 s      0%         2   7.969 KiB          2        3                                                                   4 rows omitted

