Created
April 10, 2022 00:28
-
-
Save charasyn/e874b210e87668dcd17318e24d5b644b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import heapq | |
import random | |
from math import ceil | |
from typing import List | |
VERBOSE = True | |
# generate Catador arrays for compression | |
class ArrayGenerator: | |
def __init__(self, arr_size, num_selections, elem_range=10, selection_space=None) -> None: | |
self.rng = random.Random() | |
self.ARR_SIZE = arr_size | |
self.NUM_SELECTIONS = num_selections | |
self.ELEM_RANGE = elem_range | |
if selection_space is None: | |
self.SELECTION_SPACE = int(ceil(self.ARR_SIZE * self.NUM_SELECTIONS * 1.4)) | |
else: | |
self.SELECTION_SPACE = selection_space | |
def seed(self, *args, **kwargs): | |
self.rng.seed(*args, **kwargs) | |
def randentry(self): | |
return self.rng.randrange(self.ELEM_RANGE) | |
def randindex(self): | |
return self.rng.randrange(self.SELECTION_SPACE-self.ARR_SIZE+1) | |
def generate_arrays(self) -> List[List[int]]: | |
full_data = [self.randentry() for _ in range(self.SELECTION_SPACE)] | |
arrays = [] | |
covered_indices = set() | |
for _ in range(self.NUM_SELECTIONS): | |
idx = self.randindex() | |
data = full_data[idx:idx+self.ARR_SIZE] | |
covered_indices.update(range(idx, idx+self.ARR_SIZE)) | |
arrays.append(data) | |
ratio = len(covered_indices)/(self.NUM_SELECTIONS*self.ARR_SIZE) | |
if VERBOSE: | |
print(f'"Generated" compression ratio: {len(covered_indices)}/{self.NUM_SELECTIONS*self.ARR_SIZE} = {ratio}') | |
print([full_data[x] for x in sorted(covered_indices)]) | |
return arrays, ratio | |
# compress Catador arrays | |
data, og_ratio = ArrayGenerator(32, 32, 256).generate_arrays() | |
def get_overlap(a: List[int], b: List[int]): | |
for n in range(min(len(a), len(b)), 0, -1): | |
if a[len(a)-n:] == b[:n]: | |
return n | |
return 0 | |
N_ARRS = len(data) | |
class OverlapHeap: | |
def __init__(self): | |
self.h = [] | |
def push(self, overlap, preidx, postidx): | |
heapq.heappush(self.h, (-overlap, preidx, postidx)) | |
def pop(self): | |
neg_overlap, preidx, postidx = heapq.heappop(self.h) | |
return -neg_overlap, preidx, postidx | |
def popiter(self): | |
while self.h: | |
yield self.pop() | |
return | |
overlap = [[0 for _ in range(N_ARRS)] for _ in range(N_ARRS)] | |
oh = OverlapHeap() | |
for arrpre_idx in range(N_ARRS): | |
for arrpost_idx in range(N_ARRS): | |
if arrpre_idx == arrpost_idx: | |
continue | |
ol = get_overlap(data[arrpre_idx], data[arrpost_idx]) | |
if ol > 0: | |
overlap[arrpre_idx][arrpost_idx] = ol | |
oh.push(ol, arrpre_idx, arrpost_idx) | |
if VERBOSE: | |
for i, row in enumerate(zip(data,overlap)): | |
rd, ro = row | |
print(f'{i}:', f'{rd},', ro) | |
# Put each original array index into sequence (build chains of indices) | |
# using the edges with the highest overlap first. | |
already_pres = set() | |
already_posts = set() | |
chains: List[List[int]] = [] | |
for wt, preidx, postidx in oh.popiter(): | |
if VERBOSE: | |
print(f'chain {preidx} -> {postidx} (wt {wt}): ', end='') | |
# Discard any edges for which the source vertex already points somewhere, | |
# or the destination vertex is already pointed to. | |
if preidx in already_pres or postidx in already_posts: | |
if VERBOSE: | |
print('skipping, node already used') | |
continue | |
chpreidx = -1 | |
chpostidx = -1 | |
for chidx, chain in enumerate(chains): | |
prematch = chain[-1] == preidx | |
postmatch = chain[0] == postidx | |
if postmatch: | |
chpostidx = chidx | |
if prematch: | |
chpreidx = chidx | |
if chpreidx >= 0 and chpostidx >= 0: | |
break | |
if chpreidx == chpostidx and chpreidx != -1: | |
if VERBOSE: | |
print('skipping to prevent a cycle in chain') | |
continue | |
# Note that we've already used an edge going from/to pre/post idx, | |
# so that we can skip using it again | |
already_pres.add(preidx) | |
already_posts.add(postidx) | |
if chpreidx == -1 and chpostidx == -1: | |
# New chain | |
if VERBOSE: | |
print('new') | |
chains.append([preidx, postidx]) | |
elif chpreidx == -1: | |
# Prepend to chpostidx | |
if VERBOSE: | |
print(f'prepend to {chains[chpostidx]}') | |
chains[chpostidx].insert(0, preidx) | |
elif chpostidx == -1: | |
# Append to chpreidx | |
if VERBOSE: | |
print(f'append to {chains[chpreidx]}') | |
chains[chpreidx].append(postidx) | |
else: | |
# Link two chains together | |
if VERBOSE: | |
print(f'link {chains[chpreidx]} -> {chains[chpostidx]}') | |
chains[chpreidx].extend(chains.pop(chpostidx)) | |
# Add individual elements that aren't already in a chain, as a "chain of one". | |
# This makes the chain->array conversion code below easier. | |
chains.extend([x] for x in range(N_ARRS) if x not in (already_pres | already_posts)) | |
if VERBOSE: | |
for chain in chains: | |
print(chain) | |
# Convert chain form into (data array, offsets) | |
dataout = [] | |
offsets = [None for _ in range(N_ARRS)] | |
cur_offset = 0 | |
# For each chain, append it to our existing data and add the offsets. | |
for chain in chains: | |
assert cur_offset == len(dataout) | |
last_overlap = 0 | |
for i, linkidx in enumerate(chain): | |
linkdata = data[linkidx] | |
# Add data into array and note where it starts | |
dataout.extend(linkdata[last_overlap:]) | |
offsets[linkidx] = cur_offset | |
# Advance ahead our pointer for where the offset of the next | |
# array should be. | |
if i == len(chain) - 1: | |
cur_overlap = 0 | |
else: | |
cur_overlap = overlap[linkidx][chain[i+1]] | |
cur_offset += len(linkdata) - cur_overlap | |
last_overlap = cur_overlap | |
# Now dataout should contain the full data array, | |
# and offsets should contain the index where each of the original arrays starts. | |
if VERBOSE: | |
print(dataout) | |
print(offsets) | |
datalen = sum(len(x) for x in data) | |
my_ratio = len(dataout)/(datalen) | |
print(f'Actual compression ratio: {len(dataout)}/{datalen} = {my_ratio}') | |
if my_ratio > og_ratio: | |
import sys | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment