Created
January 4, 2019 11:00
-
-
Save annarailton/0e2bf53a2593c5502cfbdab88ffe3e41 to your computer and use it in GitHub Desktop.
Object that periodically dumps new contents to disk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
Toy example using DumpObject, a way of periodically dumping out large objects to | |
file as it grows. This example watches a file directory and creates a dictionary | |
with | |
* key: file name | |
* value: first line of file | |
Includes use of `watchdog` to watch for real time effects (here: new files | |
being created in the directory) and `atexit` to gracefully deal with the program | |
terminating unexpectedly. | |
This pattern was used for creating vocab dictionaries in a streaming way before | |
the use of `luigi`. | |
""" | |
import atexit | |
import csv | |
import logging | |
import os | |
import queue | |
import random | |
import sys | |
import tempfile | |
import threading | |
import time | |
import uuid | |
from watchdog.events import FileSystemEventHandler | |
from watchdog.observers import Observer | |
class DumpObject(object): | |
"""This object is used to dump various Python objects in a standardised | |
and easy way. Saves to csv format. | |
Inspired by @pawel (data science intern at Semmle). | |
Args: | |
directory (str): directory to which the data should be dumped | |
file_prefix (str, optional): string to prefix dump object's file name | |
n_keys_to_dump (int, optional): number of keys to dump after (default 1,000,000) | |
Example of the pattern to use for vocab updating: | |
batch_saver = DumpObject('/tmp/dumping_dir', 'test', 100) | |
universal = dict() | |
for i in range(200): | |
if i not in universal: | |
universal[i] = i + 1 | |
batch_saver.add_entry(i, i + 1) | |
""" | |
def __init__(self, directory, file_prefix=None, n_keys_to_dump=None): | |
self._directory = directory | |
self._try_to_create_directory() | |
self._file_prefix = file_prefix | |
self._data_to_dump = dict() | |
if n_keys_to_dump: | |
self._n_keys_to_dump = n_keys_to_dump | |
else: | |
self._n_keys_to_dump = 1000000 | |
atexit.register(self.dump_data) # dump data on program exit | |
def _try_to_create_directory(self): | |
"""No action taken if directory already exists.""" | |
if os.path.exists( | |
self._directory) and not os.path.isdir(self._directory): | |
raise ValueError("Directory {} cannot be created.".format( | |
self._directory)) | |
if not os.path.isdir(self._directory): | |
os.makedirs(self._directory) | |
def add_entry(self, key, value): | |
"""Add data to the _data_to_dump dictionary. Any key/value type is OK.""" | |
if key not in self._data_to_dump: | |
self._data_to_dump[key] = value | |
if len(self._data_to_dump) == self._n_keys_to_dump: | |
self.dump_data() | |
self._data_to_dump = dict() # re-initialise | |
def dump_data(self): | |
"""Save data in _data_to_dump to the directory. Use the file_prefix | |
attribute if it exists.""" | |
if not self._data_to_dump: | |
return # Don't dump empty dict | |
if self._file_prefix: | |
file_path = self._file_prefix + "_" + str(uuid.uuid4()) + ".csv" | |
else: | |
file_path = str(uuid.uuid4()) + ".csv" | |
file_path = os.path.join(self._directory, file_path) | |
# If such file doesn't exist just create it | |
if not os.path.exists(file_path): | |
with open(file_path, "w") as csv_out: | |
writer = csv.writer(csv_out) | |
writer.writerows(list(self._data_to_dump.items())) | |
logging.info("Dumped vocab to file {}".format(file_path)) | |
# Otherwise regenerate the file name | |
else: | |
self.dump_data() | |
class Handler(FileSystemEventHandler): | |
"""Puts newly created files picked up by the EventHandler in `file_queue`""" | |
def __init__(self, file_queue): | |
super().__init__() | |
self.file_queue = file_queue | |
def on_created(self, event): | |
self.file_queue.put(event.src_path) | |
class RandomFileFactory(object): | |
"""Use a thread to produce random files in a background process. | |
Based on http://sebastiandahlgren.se/2014/06/27/running-a-method-as-a-background-thread-in-python/ | |
""" | |
def __init__(self, directory, max_n_files=100, interval=0.1): | |
self.directory = directory | |
self.max_n_files = max_n_files | |
self.interval = interval | |
if not os.path.exists(self.directory): | |
os.makedirs(self.directory) | |
thread = threading.Thread(target=self.run, args=()) | |
# Running the thread in daemon mode allows the main application to exit | |
# even though the thread is running. It will also therefore make it | |
# possible to use ctrl+c to terminate the application. | |
thread.daemon = True | |
thread.start() | |
def run(self): | |
for _ in range(self.max_n_files): | |
with open(os.path.join(self.directory, str(uuid.uuid4())), | |
'w') as f_out: | |
f_out.write("{}\n".format(random.randint(0, 10000))) | |
time.sleep(0.1) | |
def toy_dump_object_example(wait_for_new_files=False): | |
watched_dir = os.path.join(tempfile.gettempdir(), "dump_object_example", "source_files") | |
aggregate_dir = os.path.join(tempfile.gettempdir(), "dump_object_example", "dump_object") | |
if not os.path.exists(watched_dir): | |
os.makedirs(watched_dir) | |
if not os.path.exists(aggregate_dir): | |
os.makedirs(aggregate_dir) | |
# Start thread making random files | |
_ = RandomFileFactory(watched_dir) | |
batch_saver = DumpObject(aggregate_dir, n_keys_to_dump=25) | |
universal_file_dict = dict() | |
# Populate file queue with files already there | |
file_queue = queue.Queue() | |
for file in os.listdir(watched_dir): | |
file_queue.put(os.path.join(watched_dir, file)) | |
# Watch for more files added | |
event_handler = Handler(file_queue) | |
observer = Observer() | |
observer.schedule(event_handler, path=watched_dir, recursive=True) | |
observer.start() | |
try: | |
while True: | |
file = file_queue.get() | |
if file not in universal_file_dict: | |
with open(file, 'r') as f_in: | |
contents = f_in.read().strip() | |
basename = os.path.basename(file) | |
universal_file_dict[basename] = contents | |
batch_saver.add_entry(basename, contents) | |
if file_queue.empty() and not wait_for_new_files: | |
raise KeyboardInterrupt | |
except KeyboardInterrupt: | |
observer.stop() | |
observer.join() | |
if __name__ == '__main__': | |
print("Program will not terminate: do Ctrl+C to exit") | |
root = logging.getLogger() | |
root.setLevel(logging.INFO) | |
handler = logging.StreamHandler(sys.stdout) | |
handler.setLevel(logging.INFO) | |
root.addHandler(handler) | |
toy_dump_object_example(wait_for_new_files=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment