Skip to content

Instantly share code, notes, and snippets.

@glesica
Created April 15, 2020 17:14
Show Gist options
  • Save glesica/140350cae9be82055d546159ec5af2f6 to your computer and use it in GitHub Desktop.
Save glesica/140350cae9be82055d546159ec5af2f6 to your computer and use it in GitHub Desktop.
A naive count-min sketch implementation in Python.
import sys
import time
from cmsketch import CMSketch
RECEIVED = 1000000
EVENTS = 1000
if len(sys.argv) != 3:
print("usage: benchmark.py <error> <prob>")
exit(1)
error = float(sys.argv[1])
prob = float(sys.argv[2])
add_time = 0.0
count_time = 0.0
s = CMSketch(error, prob)
for i in range(RECEIVED):
event = i % EVENTS
before = time.perf_counter()
s.add(event)
after = time.perf_counter()
add_time += (after - before)
for i in range(RECEIVED):
event = i % EVENTS
before = time.perf_counter()
s.count(event)
after = time.perf_counter()
count_time += (after - before)
print("add()")
print(f"mean: {add_time / RECEIVED:0.9f}")
print(f"total: {add_time:0.9f}")
print("")
print("count()")
print(f"mean: {count_time / RECEIVED:0.9f}")
print(f"total: {count_time:0.9f}")
import hashlib as hl
import math
from typing import Callable, List, Tuple
def make_hash_fn(seed: int, mod: int) -> Callable[[int], int]:
def hash(value: int) -> int:
h = hl.sha1()
h.update(bytes(seed))
h.update(bytes(value))
return int.from_bytes(h.digest()[:8], "big") % mod
return hash
class CMSketch:
"""
A naive count-min sketch implementation.
>>> c = CMSketch(0.01, 0.95)
>>> c.count(1)
0
>>> c.add(1)
>>> c.count(1)
1
>>> c.add(1)
>>> c.count(1)
2
>>> c.count(2)
0
>>> c.add(2)
>>> c.count(2)
1
"""
_table: List[List[int]]
_cols: int
_rows: int
_hashes: List[Callable[[int], int]]
def __init__(self, error: float, prob: float):
cols = math.ceil(math.e / error)
rows = math.ceil(math.log(1 / (1 - prob)))
self._table = [[0 for _ in range(cols)] for _ in range(rows)]
self._cols = cols
self._rows = rows
self._hashes = []
for row in range(rows):
self._hashes.append(make_hash_fn(row, cols))
def add(self, event: int) -> None:
for row in range(self._rows):
col = self._hashes[row](event)
self._table[row][col] += 1
def count(self, event: int) -> int:
min_count: int = -1
for row in range(self._rows):
col = self._hashes[row](event)
this_count = self._table[row][col]
if this_count >= min_count or min_count == -1:
min_count = this_count
return min_count
def dim(self) -> Tuple[int, int]:
return self._rows, self._cols
if __name__ == "__main__":
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment