Skip to content

Instantly share code, notes, and snippets.

@annarailton
Created January 4, 2019 11:00
Show Gist options
  • Save annarailton/0e2bf53a2593c5502cfbdab88ffe3e41 to your computer and use it in GitHub Desktop.
Save annarailton/0e2bf53a2593c5502cfbdab88ffe3e41 to your computer and use it in GitHub Desktop.
Object that periodically dumps new contents to disk
#!/usr/bin/python3
"""
Toy example using DumpObject, a way of periodically dumping out large objects to
file as it grows. This example watches a file directory and creates a dictionary
with
* key: file name
* value: first line of file
Includes use of `watchdog` to watch for real time effects (here: new files
being created in the directory) and `atexit` to gracefully deal with the program
terminating unexpectedly.
This pattern was used for creating vocab dictionaries in a streaming way before
the use of `luigi`.
"""
import atexit
import csv
import logging
import os
import queue
import random
import sys
import tempfile
import threading
import time
import uuid
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
class DumpObject(object):
"""This object is used to dump various Python objects in a standardised
and easy way. Saves to csv format.
Inspired by @pawel (data science intern at Semmle).
Args:
directory (str): directory to which the data should be dumped
file_prefix (str, optional): string to prefix dump object's file name
n_keys_to_dump (int, optional): number of keys to dump after (default 1,000,000)
Example of the pattern to use for vocab updating:
batch_saver = DumpObject('/tmp/dumping_dir', 'test', 100)
universal = dict()
for i in range(200):
if i not in universal:
universal[i] = i + 1
batch_saver.add_entry(i, i + 1)
"""
def __init__(self, directory, file_prefix=None, n_keys_to_dump=None):
self._directory = directory
self._try_to_create_directory()
self._file_prefix = file_prefix
self._data_to_dump = dict()
if n_keys_to_dump:
self._n_keys_to_dump = n_keys_to_dump
else:
self._n_keys_to_dump = 1000000
atexit.register(self.dump_data) # dump data on program exit
def _try_to_create_directory(self):
"""No action taken if directory already exists."""
if os.path.exists(
self._directory) and not os.path.isdir(self._directory):
raise ValueError("Directory {} cannot be created.".format(
self._directory))
if not os.path.isdir(self._directory):
os.makedirs(self._directory)
def add_entry(self, key, value):
"""Add data to the _data_to_dump dictionary. Any key/value type is OK."""
if key not in self._data_to_dump:
self._data_to_dump[key] = value
if len(self._data_to_dump) == self._n_keys_to_dump:
self.dump_data()
self._data_to_dump = dict() # re-initialise
def dump_data(self):
"""Save data in _data_to_dump to the directory. Use the file_prefix
attribute if it exists."""
if not self._data_to_dump:
return # Don't dump empty dict
if self._file_prefix:
file_path = self._file_prefix + "_" + str(uuid.uuid4()) + ".csv"
else:
file_path = str(uuid.uuid4()) + ".csv"
file_path = os.path.join(self._directory, file_path)
# If such file doesn't exist just create it
if not os.path.exists(file_path):
with open(file_path, "w") as csv_out:
writer = csv.writer(csv_out)
writer.writerows(list(self._data_to_dump.items()))
logging.info("Dumped vocab to file {}".format(file_path))
# Otherwise regenerate the file name
else:
self.dump_data()
class Handler(FileSystemEventHandler):
"""Puts newly created files picked up by the EventHandler in `file_queue`"""
def __init__(self, file_queue):
super().__init__()
self.file_queue = file_queue
def on_created(self, event):
self.file_queue.put(event.src_path)
class RandomFileFactory(object):
"""Use a thread to produce random files in a background process.
Based on http://sebastiandahlgren.se/2014/06/27/running-a-method-as-a-background-thread-in-python/
"""
def __init__(self, directory, max_n_files=100, interval=0.1):
self.directory = directory
self.max_n_files = max_n_files
self.interval = interval
if not os.path.exists(self.directory):
os.makedirs(self.directory)
thread = threading.Thread(target=self.run, args=())
# Running the thread in daemon mode allows the main application to exit
# even though the thread is running. It will also therefore make it
# possible to use ctrl+c to terminate the application.
thread.daemon = True
thread.start()
def run(self):
for _ in range(self.max_n_files):
with open(os.path.join(self.directory, str(uuid.uuid4())),
'w') as f_out:
f_out.write("{}\n".format(random.randint(0, 10000)))
time.sleep(0.1)
def toy_dump_object_example(wait_for_new_files=False):
watched_dir = os.path.join(tempfile.gettempdir(), "dump_object_example", "source_files")
aggregate_dir = os.path.join(tempfile.gettempdir(), "dump_object_example", "dump_object")
if not os.path.exists(watched_dir):
os.makedirs(watched_dir)
if not os.path.exists(aggregate_dir):
os.makedirs(aggregate_dir)
# Start thread making random files
_ = RandomFileFactory(watched_dir)
batch_saver = DumpObject(aggregate_dir, n_keys_to_dump=25)
universal_file_dict = dict()
# Populate file queue with files already there
file_queue = queue.Queue()
for file in os.listdir(watched_dir):
file_queue.put(os.path.join(watched_dir, file))
# Watch for more files added
event_handler = Handler(file_queue)
observer = Observer()
observer.schedule(event_handler, path=watched_dir, recursive=True)
observer.start()
try:
while True:
file = file_queue.get()
if file not in universal_file_dict:
with open(file, 'r') as f_in:
contents = f_in.read().strip()
basename = os.path.basename(file)
universal_file_dict[basename] = contents
batch_saver.add_entry(basename, contents)
if file_queue.empty() and not wait_for_new_files:
raise KeyboardInterrupt
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == '__main__':
print("Program will not terminate: do Ctrl+C to exit")
root = logging.getLogger()
root.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
root.addHandler(handler)
toy_dump_object_example(wait_for_new_files=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment