Skip to content

Instantly share code, notes, and snippets.

@tsibley
Created September 3, 2024 19:58
Show Gist options
  • Save tsibley/d069064af42c6c1da670106a55666245 to your computer and use it in GitHub Desktop.
Save tsibley/d069064af42c6c1da670106a55666245 to your computer and use it in GitHub Desktop.
From 4b07dc59f847f08a51addf7a9826d572484accf2 Mon Sep 17 00:00:00 2001
From: Thomas Sibley <tsibley@fredhutch.org>
Date: Tue, 3 Sep 2024 12:55:14 -0700
Subject: [PATCH] wip! merge: concurrent import
---
augur/merge.py | 146 ++++++++++++++++++++++++++++++-------------------
1 file changed, 90 insertions(+), 56 deletions(-)
diff --git a/augur/merge.py b/augur/merge.py
index 134c344b..b43670a3 100644
--- a/augur/merge.py
+++ b/augur/merge.py
@@ -33,6 +33,7 @@ future. The SQLite 3 CLI, sqlite3, must be available. If it's not on PATH (or
you want to use a version different from what's on PATH), set the SQLITE3
environment variable to path of the desired sqlite3 executable.
"""
+import concurrent.futures as futures
import gettext
import os
import re
@@ -62,20 +63,70 @@ T = TypeVar('T')
_n = gettext.NullTranslations().ngettext
-class NamedMetadata(Metadata):
+# Locate how to re-invoke ourselves (_this_ specific Augur).
+if sys.executable:
+ AUGUR = f"{shquote(sys.executable)} -m augur"
+else:
+ # A bit unusual we don't know our own Python executable, but assume we
+ # can access ourselves as the ``augur`` command.
+ AUGUR = f"augur"
+
+
+class SQLiteMetadata(Metadata):
name: str
"""User-provided descriptive name for this metadata file."""
+ db_path: str
+ """Temporary, on-disk SQLite database file path."""
+
table_name: str
"""Generated SQLite table name for this metadata file, based on *name*."""
def __init__(self, name: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
+
+ # Work with a temporary, on-disk SQLite database under a name we control so
+ # we can access it from multiple (serial) processes.
+ db_fd, db_path = mkstemp(prefix=f"augur-metadata-{self.name}-", suffix=".sqlite")
+ os.close(db_fd)
+
+ self.db_path = db_path
self.table_name = f"metadata_{self.name}"
def __repr__(self):
- return f"<NamedMetadata {self.name}={self.path}>"
+ return f"<SQLiteMetadata {self.name}={self.path} db_path={self.db_path}>"
+
+ def load_db(self):
+ """
+ Import metadata file *path* into SQLite database at *db_path*.
+ """
+ # All other metadata reading in Augur (i.e. via the csv module)
+ # uses Python's "universal newlines"¹ definition and accepts \n,
+ # \r\n, and \r as newlines interchangably (even mixed within the
+ # same file!). We accomplish the same behaviour here with SQLite's
+ # less flexible newline handling by relying on the universal
+ # newline translation of `augur read-file`.
+ # -trs, 24 July 2024
+ #
+ # ¹ <https://docs.python.org/3/glossary.html#term-universal-newlines>
+ newline = os.linesep
+
+ sqlite3(self.db_path,
+ f'.mode csv',
+ f'.separator {sqlite_quote_dot(self.delimiter)} {sqlite_quote_dot(newline)}',
+ f'.import {sqlite_quote_dot(f"|{AUGUR} read-file {shquote(self.path)}")} {sqlite_quote_dot(self.table_name)}',
+
+ f'create unique index {sqlite_quote_id(f"{self.table_name}_id")} on {sqlite_quote_id(self.table_name)}({sqlite_quote_id(self.id_column)});',
+
+ # <https://sqlite.org/pragma.html#pragma_optimize>
+ f'pragma optimize;')
+
+ # Ensure our self.columns matches what SQLite's .import created so
+ # callers can rely on our method. We can address cases where SQLite
+ # renames columns on import later.
+ assert self.columns == (table_columns := sqlite3_table_columns(self.db_path, self.table_name)), \
+ f"{self.columns!r} == {table_columns!r}"
def register_parser(parent_subparsers):
@@ -151,26 +202,12 @@ def run(args):
# Infer delimiters and id columns
metadata = [
- NamedMetadata(name, path, [delim for name_, delim in metadata_delimiters if not name_ or name_ == name] or DEFAULT_DELIMITERS,
- [column for name_, column in metadata_id_columns if not name_ or name_ == name] or DEFAULT_ID_COLUMNS)
+ SQLiteMetadata(name, path, [delim for name_, delim in metadata_delimiters if not name_ or name_ == name] or DEFAULT_DELIMITERS,
+ [column for name_, column in metadata_id_columns if not name_ or name_ == name] or DEFAULT_ID_COLUMNS)
for name, path in metadata]
- # Locate how to re-invoke ourselves (_this_ specific Augur).
- if sys.executable:
- augur = f"{shquote(sys.executable)} -m augur"
- else:
- # A bit unusual we don't know our own Python executable, but assume we
- # can access ourselves as the ``augur`` command.
- augur = f"augur"
-
-
- # Work with a temporary, on-disk SQLite database under a name we control so
- # we can access it from multiple (serial) processes.
- db_fd, db_path = mkstemp(prefix="augur-merge-", suffix=".sqlite")
- os.close(db_fd)
-
- # Clean up database file by default
+ # Clean up database files by default
delete_db = True
# Track columns as we see them, in order. The first metadata's id column
@@ -197,36 +234,21 @@ def run(args):
try:
# Read all metadata files into a SQLite db
+ with futures.ThreadPoolExecutor() as executor:
+ def load(m):
+ # XXX FIXME: better UI here! use richer terminal interactions for async?
+ print_info(f"Reading {m.name!r} metadata from {m.path!r}…")
+ m.load_db()
+
+ done, not_done = futures.wait([executor.submit(load, m) for m in metadata], return_when=futures.FIRST_EXCEPTION)
+
+ for future in done:
+ future.result() # raise first exception, if any
+
+
+ # Track which columns appear in which metadata inputs, preserving
+ # the order of both.
for m in metadata:
- # All other metadata reading in Augur (i.e. via the csv module)
- # uses Python's "universal newlines"¹ definition and accepts \n,
- # \r\n, and \r as newlines interchangably (even mixed within the
- # same file!). We accomplish the same behaviour here with SQLite's
- # less flexible newline handling by relying on the universal
- # newline translation of `augur read-file`.
- # -trs, 24 July 2024
- #
- # ¹ <https://docs.python.org/3/glossary.html#term-universal-newlines>
- newline = os.linesep
-
- print_info(f"Reading {m.name!r} metadata from {m.path!r}…")
- sqlite3(db_path,
- f'.mode csv',
- f'.separator {sqlite_quote_dot(m.delimiter)} {sqlite_quote_dot(newline)}',
- f'.import {sqlite_quote_dot(f"|{augur} read-file {shquote(m.path)}")} {sqlite_quote_dot(m.table_name)}',
-
- f'create unique index {sqlite_quote_id(f"{m.table_name}_id")} on {sqlite_quote_id(m.table_name)}({sqlite_quote_id(m.id_column)});',
-
- # <https://sqlite.org/pragma.html#pragma_optimize>
- f'pragma optimize;')
-
- # We're going to use Metadata.columns to generate the select
- # statement, so ensure it matches what SQLite's .import created.
- assert m.columns == (table_columns := sqlite3_table_columns(db_path, m.table_name)), \
- f"{m.columns!r} == {table_columns!r}"
-
- # Track which columns appear in which metadata inputs, preserving
- # the order of both.
for column in m.columns:
# Match different id column names in different metadata files
# since they're logically equivalent. Any non-id columns that
@@ -236,7 +258,7 @@ def run(args):
output_column = output_id_column if column == m.id_column else column
output_columns.setdefault(output_column, [])
- output_columns[output_column] += [(m.table_name, column)]
+ output_columns[output_column] += [(m.name, m.table_name, column)]
# Construct query to produce merged metadata.
@@ -246,12 +268,12 @@ def run(args):
for output_column, input_columns in output_columns.items()),
# Source columns
- *(f"""{sqlite_quote_id(m.table_name, m.id_column)} is not null as {sqlite_quote_id(f'__source_metadata_{m.name}')}"""
+ *(f"""{sqlite_quote_id(m.name, m.table_name, m.id_column)} is not null as {sqlite_quote_id(f'__source_metadata_{m.name}')}"""
for m in metadata)]
from_list = [
- sqlite_quote_id(metadata[0].table_name),
- *(f"full outer join {sqlite_quote_id(m.table_name)} on {sqlite_quote_id(m.table_name, m.id_column)} in ({', '.join(sqlite_quote_id(m.table_name, m.id_column) for m in reversed(preceding))})"
+ sqlite_quote_id(metadata[0].name, metadata[0].table_name),
+ *(f"full outer join {sqlite_quote_id(m.name, m.table_name)} on {sqlite_quote_id(m.name, m.table_name, m.id_column)} in ({', '.join(sqlite_quote_id(m.name, m.table_name, m.id_column) for m in reversed(preceding))})"
for m, preceding in [(m, metadata[:i]) for i, m in enumerate(metadata[1:], 1)])]
# Take some small pains to make the query readable since it makes
@@ -272,11 +294,13 @@ def run(args):
# Assume TSV like nearly all other extant --output-metadata options.
print_info(f"Merging metadata and writing to {args.output_metadata!r}…")
print_debug(query)
- sqlite3(db_path,
+ sqlite3("",
+ *[f'attach database {sqlite_quote_string(m.db_path)} as {sqlite_quote_id(m.name)};'
+ for m in metadata],
f'.mode csv',
f'.separator "\\t" "\\n"',
f'.headers on',
- f'.once {sqlite_quote_dot(f"|{augur} write-file {shquote(args.output_metadata)}")}',
+ f'.once {sqlite_quote_dot(f"|{AUGUR} write-file {shquote(args.output_metadata)}")}',
query)
except SQLiteError as err:
@@ -284,10 +308,20 @@ def run(args):
raise AugurError(str(err)) from err
finally:
+ db_paths = [m.db_path for m in metadata]
+
if delete_db:
- os.unlink(db_path)
+ for db in db_paths:
+ os.unlink(db)
else:
- print_info(f"WARNING: Skipped deletion of {db_path} due to error, but you may want to clean it up yourself (e.g. if it's large).")
+ print_info(dedent(f"""\
+ WARNING: Skipped deletion of temporary SQLite databases due to
+ error, but you may want to clean them up yourself with:
+
+ rm -f {' '.join(map(shquote, db_paths))}
+
+ as they may be large.
+ """))
def sqlite3(*args, **kwargs):
--
2.46.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment