Skip to content

Instantly share code, notes, and snippets.

@charasyn
Created April 10, 2022 00:28
Show Gist options
  • Save charasyn/e874b210e87668dcd17318e24d5b644b to your computer and use it in GitHub Desktop.
Save charasyn/e874b210e87668dcd17318e24d5b644b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import heapq
import random
from math import ceil
from typing import List
VERBOSE = True
# generate Catador arrays for compression
class ArrayGenerator:
def __init__(self, arr_size, num_selections, elem_range=10, selection_space=None) -> None:
self.rng = random.Random()
self.ARR_SIZE = arr_size
self.NUM_SELECTIONS = num_selections
self.ELEM_RANGE = elem_range
if selection_space is None:
self.SELECTION_SPACE = int(ceil(self.ARR_SIZE * self.NUM_SELECTIONS * 1.4))
else:
self.SELECTION_SPACE = selection_space
def seed(self, *args, **kwargs):
self.rng.seed(*args, **kwargs)
def randentry(self):
return self.rng.randrange(self.ELEM_RANGE)
def randindex(self):
return self.rng.randrange(self.SELECTION_SPACE-self.ARR_SIZE+1)
def generate_arrays(self) -> List[List[int]]:
full_data = [self.randentry() for _ in range(self.SELECTION_SPACE)]
arrays = []
covered_indices = set()
for _ in range(self.NUM_SELECTIONS):
idx = self.randindex()
data = full_data[idx:idx+self.ARR_SIZE]
covered_indices.update(range(idx, idx+self.ARR_SIZE))
arrays.append(data)
ratio = len(covered_indices)/(self.NUM_SELECTIONS*self.ARR_SIZE)
if VERBOSE:
print(f'"Generated" compression ratio: {len(covered_indices)}/{self.NUM_SELECTIONS*self.ARR_SIZE} = {ratio}')
print([full_data[x] for x in sorted(covered_indices)])
return arrays, ratio
# compress Catador arrays
data, og_ratio = ArrayGenerator(32, 32, 256).generate_arrays()
def get_overlap(a: List[int], b: List[int]):
for n in range(min(len(a), len(b)), 0, -1):
if a[len(a)-n:] == b[:n]:
return n
return 0
N_ARRS = len(data)
class OverlapHeap:
def __init__(self):
self.h = []
def push(self, overlap, preidx, postidx):
heapq.heappush(self.h, (-overlap, preidx, postidx))
def pop(self):
neg_overlap, preidx, postidx = heapq.heappop(self.h)
return -neg_overlap, preidx, postidx
def popiter(self):
while self.h:
yield self.pop()
return
overlap = [[0 for _ in range(N_ARRS)] for _ in range(N_ARRS)]
oh = OverlapHeap()
for arrpre_idx in range(N_ARRS):
for arrpost_idx in range(N_ARRS):
if arrpre_idx == arrpost_idx:
continue
ol = get_overlap(data[arrpre_idx], data[arrpost_idx])
if ol > 0:
overlap[arrpre_idx][arrpost_idx] = ol
oh.push(ol, arrpre_idx, arrpost_idx)
if VERBOSE:
for i, row in enumerate(zip(data,overlap)):
rd, ro = row
print(f'{i}:', f'{rd},', ro)
# Put each original array index into sequence (build chains of indices)
# using the edges with the highest overlap first.
already_pres = set()
already_posts = set()
chains: List[List[int]] = []
for wt, preidx, postidx in oh.popiter():
if VERBOSE:
print(f'chain {preidx} -> {postidx} (wt {wt}): ', end='')
# Discard any edges for which the source vertex already points somewhere,
# or the destination vertex is already pointed to.
if preidx in already_pres or postidx in already_posts:
if VERBOSE:
print('skipping, node already used')
continue
chpreidx = -1
chpostidx = -1
for chidx, chain in enumerate(chains):
prematch = chain[-1] == preidx
postmatch = chain[0] == postidx
if postmatch:
chpostidx = chidx
if prematch:
chpreidx = chidx
if chpreidx >= 0 and chpostidx >= 0:
break
if chpreidx == chpostidx and chpreidx != -1:
if VERBOSE:
print('skipping to prevent a cycle in chain')
continue
# Note that we've already used an edge going from/to pre/post idx,
# so that we can skip using it again
already_pres.add(preidx)
already_posts.add(postidx)
if chpreidx == -1 and chpostidx == -1:
# New chain
if VERBOSE:
print('new')
chains.append([preidx, postidx])
elif chpreidx == -1:
# Prepend to chpostidx
if VERBOSE:
print(f'prepend to {chains[chpostidx]}')
chains[chpostidx].insert(0, preidx)
elif chpostidx == -1:
# Append to chpreidx
if VERBOSE:
print(f'append to {chains[chpreidx]}')
chains[chpreidx].append(postidx)
else:
# Link two chains together
if VERBOSE:
print(f'link {chains[chpreidx]} -> {chains[chpostidx]}')
chains[chpreidx].extend(chains.pop(chpostidx))
# Add individual elements that aren't already in a chain, as a "chain of one".
# This makes the chain->array conversion code below easier.
chains.extend([x] for x in range(N_ARRS) if x not in (already_pres | already_posts))
if VERBOSE:
for chain in chains:
print(chain)
# Convert chain form into (data array, offsets)
dataout = []
offsets = [None for _ in range(N_ARRS)]
cur_offset = 0
# For each chain, append it to our existing data and add the offsets.
for chain in chains:
assert cur_offset == len(dataout)
last_overlap = 0
for i, linkidx in enumerate(chain):
linkdata = data[linkidx]
# Add data into array and note where it starts
dataout.extend(linkdata[last_overlap:])
offsets[linkidx] = cur_offset
# Advance ahead our pointer for where the offset of the next
# array should be.
if i == len(chain) - 1:
cur_overlap = 0
else:
cur_overlap = overlap[linkidx][chain[i+1]]
cur_offset += len(linkdata) - cur_overlap
last_overlap = cur_overlap
# Now dataout should contain the full data array,
# and offsets should contain the index where each of the original arrays starts.
if VERBOSE:
print(dataout)
print(offsets)
datalen = sum(len(x) for x in data)
my_ratio = len(dataout)/(datalen)
print(f'Actual compression ratio: {len(dataout)}/{datalen} = {my_ratio}')
if my_ratio > og_ratio:
import sys
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment