Created
July 5, 2022 21:40
-
-
Save danizen/e4ca984fa3fead2ca1da491a0347290d to your computer and use it in GitHub Desktop.
Index a MARC file so you can read only the parts you need
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import logging | |
import os | |
import mmap | |
import time | |
from collections.abc import Sequence | |
from struct import pack, unpack | |
LOG = logging.getLogger(__name__) | |
def parse_offset_id(f): | |
""" | |
Bypass pymarc and parse MARC to determine the offset and control_id of reach record | |
""" | |
offset = f.tell() | |
ldr = f.read(24) | |
if not ldr: | |
return offset, None | |
# resid is the remaining length of the record | |
resid = int(ldr[:5]) - 24 | |
dir_plus_data = f.read(resid) | |
if not dir_plus_data: | |
return offset, None | |
assert len(dir_plus_data) == resid | |
base_of_data = int(ldr[12:17]) - 24 | |
directory = dir_plus_data[:base_of_data] | |
data = dir_plus_data[base_of_data:] | |
# first field is 001 controlfield | |
assert directory[:3] == b'001' | |
# length of field without record terminator | |
field_len = int(directory[3:7]) - 1 | |
control_id = data[:field_len] | |
return offset, control_id | |
def yield_offset_id(path, buffering=io.DEFAULT_BUFFER_SIZE): | |
with open(path, 'rb', buffering=buffering) as f: | |
while True: | |
offset, control_id = parse_offset_id(f) | |
if control_id is None: | |
break | |
yield offset, control_id | |
class MARCIndex(Sequence): | |
def __init__(self, path, index_path=None, buffering=io.DEFAULT_BUFFER_SIZE): | |
self.path = path | |
if index_path is None: | |
if path.endswith('.mrc'): | |
index_path = path[:-4] + '.idx' | |
else: | |
index_path = path + '.idx' | |
self.index_path = index_path | |
self.buffering = buffering | |
try: | |
self.open() | |
except FileNotFoundError: | |
self.__mbuf = None | |
self.__size = 0 | |
def __len__(self): | |
return self.__size | |
def __del__(self): | |
# not sure if this is needed | |
if self.__mbuf: | |
self.__mbuf.close() | |
def __getitem__(self, key): | |
if isinstance(key, slice): | |
return [self[ii] for ii in range(*key.indices(len(self)))] | |
else: | |
off = int(key) * 32 | |
return unpack('24pQ', self.__mbuf[off:off+32]) | |
def __setitem__(self, key, value): | |
raise TypeError('this object is immutable') | |
def __delitem__(self, key): | |
raise TypeError('this object is immutable') | |
def open(self): | |
f = open(self.index_path, 'rb') | |
index_size = os.fstat(f.fileno()).st_size | |
self.__mbuf = mmap.mmap(f.fileno(), index_size, access=mmap.ACCESS_READ) | |
self.__size = index_size // 32 | |
f.close() | |
def close(self): | |
if self.__mbuf is not None: | |
self.__mbuf.close() | |
self.__mbuf = None | |
def build(self): | |
""" | |
Builds the index path based on the MARC Path. | |
Re-opens the index when completed | |
:return: None | |
""" | |
stime = time.perf_counter() | |
records = [ | |
(control_id, offset) | |
for offset, control_id in yield_offset_id(self.path, self.buffering) | |
] | |
duration = time.perf_counter() - stime | |
LOG.info('Read %d records in %.1f seconds', len(records), duration) | |
stime = time.perf_counter() | |
records = sorted(records, key=lambda p: p[0]) | |
duration = time.perf_counter() - stime | |
LOG.info('Sorted %d records in %.1f seconds', len(records), duration) | |
# the format of each record will be an 20 character ascii control_id + an 8 byte integer | |
stime = time.perf_counter() | |
with open(self.index_path, 'wb') as f: | |
for control_id, offset in records: | |
buf = pack('24pQ', control_id, offset) | |
f.write(buf) | |
duration = time.perf_counter() - stime | |
LOG.info('Wrote %d records in %.1f seconds', len(records), duration) | |
self.open() | |
def lookup(self, control_id): | |
""" | |
Binary search for the position of the control_id over the index | |
:param control_id: byte or str representation of the control_id | |
:return: position in the index | |
""" | |
# convert control_id into a padded bytes of fixed length | |
if isinstance(control_id, str): | |
control_id = control_id.encode('ascii') | |
low = 0 | |
high = len(self) - 1 | |
while low <= high: | |
mid = (low + high) // 2 | |
middle_id, _ = self[mid] | |
if middle_id < control_id: | |
low = mid + 1 | |
elif middle_id > control_id: | |
high = mid - 1 | |
elif middle_id != control_id: | |
return None | |
else: | |
return mid | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment