Created
February 14, 2021 19:46
-
-
Save mskyttner/5140bcd074b844263de5c3a2bef4bded to your computer and use it in GitHub Desktop.
Load .tsv file into duckdb in chunkwise fashiong using R and vroom
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(dplyr) | |
library(duckdb) | |
library(vroom) | |
duckdb_version <- function() { | |
con <- duckdb::dbConnect(duckdb::duckdb()) | |
on.exit(duckdb::dbDisconnect(con, shutdown = TRUE)) | |
res <- DBI::dbGetQuery(con, "PRAGMA version;") | |
parse_semver <- function(x) { | |
re <- "(\\d)+\\.(\\d+)\\.(\\d+).*$" | |
as.integer(unlist(regmatches(x, regexec(re, x)))[1 + (1:3)]) | |
} | |
parse_semver(res) | |
} | |
guess_colspec_concise <- function(file, n_lookahead = 100) { | |
message("Column specifications guess from vroom for ", file) | |
head <- vroom::vroom(file, n_max = n_lookahead) | |
spec <- vroom::spec(head) | |
condensed <- vroom::cols_condense(spec) | |
cols_concise <- function(colspec) | |
paste(collapse = "", gsub(pattern = "^collector_(.{1}).*$", "\\1", | |
vapply(colspec, class, character(2))[1,])) | |
list( | |
fields = names(spec$cols), | |
coltypes = spec, | |
concise = cols_concise(spec$cols), | |
condensed = cols_concise(condensed$cols), | |
default = cols_concise(list(.default = condensed$default)), | |
delim = condensed$delim | |
) | |
} | |
read_vroom_chunks <- function(file, n_chunksize = 5e5, chunk_callback_fn, | |
colspec = guess_colspec_concise(file), | |
encoding = Sys.getenv("encoding", "UTF-8"), ...) { | |
format_bytes <- function(x) | |
utils:::format.object_size(as.integer(x), "auto") | |
i <- 0 | |
is_done <- FALSE | |
chunk <- data.frame() | |
stats <- data.frame() | |
while (!is_done) { | |
t0 <- Sys.time() | |
beg <- 1 + i * n_chunksize | |
chunk <- vroom::vroom( | |
file = file, delim = colspec$delim, | |
col_names = colspec$fields, col_types = colspec$coltypes$col, | |
skip = beg, n_max = n_chunksize, | |
... | |
) | |
# stats for the chunk | |
i <- i + 1 | |
n <- nrow(chunk) | |
is_done <- n < n_chunksize | |
mem <- pryr::object_size(chunk) | |
mf <- as.numeric(system("awk '/MemFree/ {print $2}' /proc/meminfo", intern = TRUE)) | |
mu <- pryr::mem_used() | |
fd <- system("df -h | grep '/$' | awk '{print $4}'", intern = TRUE) | |
t1 <- Sys.time() | |
t_read <- t1 - t0 | |
# process the chunk | |
if (n > 0) chunk_callback_fn(chunk) | |
t_write <- Sys.time() - t1 | |
# update stats | |
statz <- tibble::tibble(file, | |
chunk = i, beg = beg, end = beg + n_chunksize, | |
is_done, nrow = n, t_read, t_write, | |
mem_chunk = format_bytes(mem), mem_used = format_bytes(mu), free_disk = fd) | |
print(statz) | |
stats <- rbind.data.frame(stats, statz) | |
if (is_done) { | |
message(sprintf("Total lines read excl header: %s", (beg + n - 1))) | |
message(sprintf("Read times: %s total, %s avg", sum(stats$t_read), mean(stats$t_read))) | |
message(sprintf("Write times: %s total, %s avg", sum(stats$t_write), mean(stats$t_write))) | |
} | |
Sys.sleep(0.001) | |
} | |
return(stats) | |
} | |
duck_copy_csv <- function(csvfile, dbfile, tablename, overwrite = FALSE, | |
csv_colspec = guess_colspec_concise(csvfile, n_lookahead = csv_lookahead), | |
csv_lookahead = 1000L, csv_chunksize = 5e5, duckdb_mem_limit = 8L, ...) { | |
file <- normalizePath(csvfile) | |
db <- normalizePath(dbfile) | |
con <- DBI::dbConnect(duckdb::duckdb(dbdir = db)) | |
on.exit(duckdb::dbDisconnect(con, shutdown = TRUE)) | |
stopifnot(all(c(file.exists(file), file.exists(db)))) | |
has_checkpointing <- all(duckdb_version() >= c(0, 2, 5)) | |
if (!has_checkpointing) | |
message("Checkpointing needs > v 0.2.4 of duckdb", | |
"; falling back to using per-chunk reconnections to flush WAL incrementally") | |
if (has_checkpointing) { | |
DBI::dbExecute(con, sprintf("PRAGMA checkpoint_threshold='%sGB';", duckdb_mem_limit)) | |
} | |
res <- DBI::dbExecute(con, sprintf("PRAGMA memory_limit='%sGB'", duckdb_mem_limit)) | |
if (duckdb::dbExistsTable(con, tablename) && !overwrite) | |
stop("Table ", tablename, " exists in ", db, " , please use `overwrite = T`") | |
if (duckdb::dbExistsTable(con, tablename)) | |
duckdb::dbRemoveTable(con, tablename) | |
select_into <- function(data, tablename, checkpoint = TRUE) { | |
con_chunk <- con | |
if (checkpoint && !has_checkpointing) { | |
# fallback to connect/disconnect for each chunk | |
con_chunk <- DBI::dbConnect(duckdb::duckdb(dbdir = db)) | |
on.exit(duckdb::dbDisconnect(con_chunk, shutdown = TRUE)) | |
} | |
# chunk fits in RAM -> use temp table and append data into tablename | |
duckdb::duckdb_register(con_chunk, "chunk", data) | |
sql <- sprintf("INSERT INTO %s SELECT * FROM chunk;", tablename) | |
if (!DBI::dbExistsTable(con_chunk, tablename)) | |
sql <- sprintf("CREATE TABLE %s AS SELECT * FROM chunk;", tablename) | |
res <- DBI::dbExecute(con_chunk, sql) | |
duckdb::duckdb_unregister(con_chunk, "chunk") | |
if (checkpoint && has_checkpointing) { | |
res <- DBI::dbExecute(con_chunk, "checkpoint;") | |
} | |
} | |
chunk_stats <- read_vroom_chunks(file, n_chunksize = csv_chunksize, colspec = csv_colspec, | |
chunk_callback_fn = function(x) select_into(x, tablename, ...) | |
) | |
if (!is_ok_transaction(db, tablename, csvfile)) { | |
message("Bulk load failed, pls remove table ", tablename, " from ", db, | |
"and try again :)") | |
return(chunk_stats) | |
} | |
return(chunk_stats) | |
} | |
# chunk_stats <- | |
# duck_copy_csv( | |
# csvfile = "data-raw/ark/hcaf_species_native.tsv", | |
# dbfile = "duckdb_database", | |
# tablename = "hcaf_species_native", overwrite = T, | |
# checkpoint = TRUE, csv_chunksize = 5e6 | |
# ) | |
# | |
# library(ggplot2) | |
# | |
# visual <- | |
# chunk_stats %>% | |
# select(chunk, end, t_read, t_write, free_disk, mem_chunk) %>% | |
# mutate_at(vars(starts_with("t_")), as.numeric) %>% | |
# mutate(krps_read = 1e-3 * end / cumsum(t_read)) %>% | |
# mutate(krps_write = 1e-3 * end / cumsum(t_write)) %>% | |
# mutate(disk = readr::parse_number(chartr("G,", " .", free_disk))) %>% | |
# mutate(disk_used = max(disk) - disk) %>% | |
# mutate(chunk_mem = as.numeric(gsub(" Mb$", "", mem_chunk))) %>% | |
# select(chunk, t_read, t_write, krps_read, krps_write, disk_used, chunk_mem) %>% | |
# tidyr::pivot_longer(ts, cols = -chunk, names_to = "variable", values_to = "value") | |
# | |
# p1 <- | |
# ggplot(visual, aes(x = chunk, y = value)) + | |
# geom_step(aes(color = variable)) + | |
# #geom_line(aes(color = variable), size = 1) + | |
# scale_color_brewer(palette = "Set2", guide = FALSE) + | |
# theme_minimal() + | |
# facet_wrap(~variable, scales = "free", ncol = 2) + | |
# theme(axis.title = element_blank()) | |
# | |
# ggsave("~/duck_csv_01.png", p1) | |
# | |
# knitr::kable(chunk_stats %>% select(-file)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment