Last active
March 19, 2020 15:29
-
-
Save bioinfornatics/31098eb887155f4ac57a8eb1ffb48fe7 to your computer and use it in GitHub Desktop.
bench parquet vs sqlite vs raw
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sqlite3 | |
import random | |
import string | |
from uuid import uuid4 | |
from typing import List, Tuple | |
from statistics import mean | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
from pyarrow.lib import Schema | |
from pyspark.sql import SparkSession, SQLContext | |
def random_str(): | |
str_length = random.randint(40,50) | |
return ''.join(random.choice(string.ascii_lowercase) for _ in range(str_length)) | |
def sql_init(db_file: str): | |
conn = sqlite3.connect(db_file) | |
c = conn.cursor() | |
configure_sqlite(c) | |
c.execute('''CREATE TABLE IF NOT EXISTS some_text | |
(txt1 TEXT, txt2 TEXT, txt3 TEXT, txt4 TEXT, text5 TEXT)''') | |
return conn | |
def sql_init_with_constraint(db_file: str): | |
conn = sqlite3.connect(db_file) | |
c = conn.cursor() | |
configure_sqlite(c) | |
c.execute('''CREATE TABLE IF NOT EXISTS some_text | |
(txt1 TEXT PRIMARY KEY, txt2 TEXT, txt3 TEXT, txt4 TEXT, text5 TEXT)''') | |
return conn | |
def configure_sqlite(cursor: sqlite3.Cursor): | |
cursor.execute('PRAGMA optimize') | |
cursor.execute('PRAGMA journal_mode=WAL') | |
cursor.execute('PRAGMA LOCKING_MODE=EXCLUSIVE') | |
cursor.execute('PRAGMA synchronous=NORMAL') | |
cursor.execute('PRAGMA SQLITE_CONFIG_MULTITHREAD') | |
cursor.execute('PRAGMA SQLITE_DEFAULT_CACHE_SIZE=-4000') | |
cursor.execute('PRAGMA THREAD=4') | |
def parquet_init() -> Schema: | |
my_schema = pa.schema([('txt1', pa.string()), | |
('txt2', pa.string()), | |
('txt3', pa.string()), | |
('txt4', pa.string()), | |
('txt5', pa.string())]) | |
return my_schema | |
def spark_init() -> Tuple[SparkSession, SQLContext]: | |
spark = SparkSession.builder.master("local") \ | |
.appName("Word selector") \ | |
.getOrCreate() | |
sc = SQLContext(spark.sparkContext, sparkSession=spark) | |
return spark, sc | |
def sql_write(conn: sqlite3.Connection, data: List[List[str]]): | |
cursor = conn.cursor() | |
cursor.executemany('''INSERT INTO some_text VALUES | |
(?,?,?,?,?)''', data) | |
conn.commit() | |
conn.close() | |
def txt_write(txt_file: str, data: List[List[str]]): | |
with open(txt_file, 'w') as f: | |
for row in data: | |
f.write(', '.join(row) + '\n') | |
def parquet_write(parquet_file: str, schema, data: List[List[str]]): | |
table = None | |
columns = [[] for _ in range(0, len(data[0]))] # each row have same number of columns | |
for row in data: | |
for i, item in enumerate(row): | |
columns[i].append(item) | |
arrays = [pa.array(column) for column in columns] | |
table = pa.Table.from_arrays(arrays, schema=schema) | |
with pq.ParquetWriter(parquet_file, table.schema, | |
use_dictionary=True, version='2.0') as writer: | |
writer.write_table(table) # write one row_group | |
def sql_select(conn: sqlite3.Connection, to_select: List[str]): | |
cursor = conn.cursor() | |
query = f'''SELECT * FROM some_text | |
WHERE txt1 IN ({','.join(['?']*len(to_select))})''' | |
cursor.execute(query, to_select) | |
return cursor.fetchall() | |
def txt_select(txt_file: str, to_select: List[str]): | |
result = [] | |
with open(txt_file, 'r') as f: | |
for line in f: | |
fields = line.split() | |
if fields[0] in to_select: | |
result.append(line) | |
return result | |
def parquet_sql_select(parquet_file: str, sc: SQLContext, to_select: List[str]): | |
data_frame = sc.read.parquet(parquet_file) | |
data_frame.createOrReplaceTempView("some_text") | |
query = f'''SELECT * FROM some_text | |
WHERE txt1 IN ({','.join(f'"{txt1}"' for txt1 in to_select)})''' | |
results = sc.sql(query) | |
return results.collect() | |
def parquet_sql_select2(parquet_file: str, sc: SQLContext, spark: SparkSession, to_select: List[str]): | |
data_frame = sc.read.parquet(parquet_file) | |
filter_df = spark.createDataFrame(to_select, data_frame.schema['txt1'].dataType) | |
return data_frame.join(filter_df, data_frame['txt1'] == filter_df["value"]) | |
def parquet_dataframe_select(parquet_file: str, sc: SQLContext, to_select: List[str]): | |
data_frame = sc.read.parquet(parquet_file) | |
results = data_frame.where(data_frame.txt1.isin(to_select)) | |
return results | |
nb_rows = 100000 | |
rows = [[str(uuid4())] + [random_str() for _ in range(4)] for iteration in range(0,nb_rows)] | |
txt1_field_selector = [rows[random.randint(0, nb_rows-1)][0] for _ in range(0, 100)] | |
db_file = 'perf_pq_db_txt.db' | |
db_constraint_file = 'perf_pq_db_txt.constraint.db' | |
txt_file = 'perf_pq_db_txt.txt' | |
pq_file = 'perf_pq_db_txt.parquet' | |
if __name__ == '__main__': | |
from timeit import repeat | |
txt_write_t1 = repeat(stmt='txt_write(txt_file,rows)', repeat=6, number=1, setup=''' | |
from __main__ import txt_write, rows, txt_file | |
import os | |
if os.path.exists(txt_file): | |
os.remove(txt_file) | |
''') | |
sql_write_t1 = repeat(stmt='sql_write(conn,rows)', repeat=6, number=1, setup=''' | |
from __main__ import sql_init, sql_write, rows, db_file | |
import os | |
if os.path.exists(db_file): | |
os.remove(db_file) | |
conn = sql_init(db_file) | |
''') | |
sql_write_t2 = repeat(stmt='sql_write(conn,rows)', repeat=6, number=1, setup=''' | |
from __main__ import sql_init_with_constraint, sql_write, rows, db_constraint_file | |
import os | |
if os.path.exists(db_constraint_file): | |
os.remove(db_constraint_file) | |
conn = sql_init_with_constraint(db_constraint_file) | |
''') | |
parquet_write_t1 = repeat(stmt='parquet_write(pq_file,schema,rows)', repeat=6, number=1, setup=''' | |
from __main__ import parquet_init, parquet_write, rows, pq_file | |
import os | |
if os.path.exists(pq_file): | |
os.remove(pq_file) | |
schema = parquet_init() | |
''') | |
txt_select_t1 = repeat(stmt='txt_select(txt_file,txt1_field_selector)', repeat=6, number=1, setup=''' | |
from __main__ import txt_select, txt_file, txt1_field_selector | |
''') | |
sql_select_t1 = repeat(stmt='sql_select(conn,txt1_field_selector)', repeat=6, number=1, setup=''' | |
from __main__ import sql_init, sql_select, txt1_field_selector, db_file | |
conn = sql_init(db_file) | |
''') | |
sql_select_t2 = repeat(stmt='sql_select(conn,txt1_field_selector)', repeat=6, number=1, setup=''' | |
from __main__ import sql_init, sql_select, txt1_field_selector, db_constraint_file | |
conn = sql_init(db_constraint_file) | |
''') | |
parquet_select_t1 = repeat(stmt='parquet_sql_select(pq_file,sc,txt1_field_selector)', repeat=6, number=1, setup=''' | |
from __main__ import spark_init, parquet_sql_select, txt1_field_selector, pq_file | |
import os | |
spark, sc = spark_init() | |
''') | |
parquet_select_t2 = repeat(stmt='parquet_dataframe_select(pq_file,sc,txt1_field_selector)', repeat=6, number=1, setup=''' | |
from __main__ import spark_init, parquet_dataframe_select, txt1_field_selector, pq_file | |
import os | |
spark, sc = spark_init() | |
''') | |
print(f'{"text":<40} was writed in {mean(txt_write_t1):.3f} seconds.') | |
print(f'{"database":<40} was writed in {mean(sql_write_t1):.3f} seconds.') | |
print(f'{"database with constraint":<40} was writed in {mean(sql_write_t2):.3f} seconds.') | |
print(f'{"parquet":<40} was writed in {mean(parquet_write_t1):.3f} seconds.') | |
print(f'{"text":<40} selected in {mean(txt_select_t1):.3f} seconds.') | |
print(f'{"database":<40} selected in {mean(sql_select_t1):.3f} seconds.') | |
print(f'{"database with constraint":<40} selected in {mean(sql_select_t2):.3f} seconds.') | |
print(f'{"parquet SQL":<40} selected in {mean(parquet_select_t1):.3f} seconds.') | |
print(f'{"parquet using dataframe":<40} selected in {mean(parquet_select_t2):.3f} seconds.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment