Last active
March 22, 2021 13:23
-
-
Save davidscherer/2fb9aa34048c75470fec879df3c53f2a to your computer and use it in GitHub Desktop.
Crude document database with changefeeds, for illustrative purposes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fdb, uuid, json, struct, sys | |
fdb.api_version(500) | |
class Collection: | |
def __init__(self, | |
name, | |
indexes): | |
self.name = name | |
self.dir = root.create_or_open(db, name) | |
self.pk = indexes["primary_key"] | |
self.indexes = { | |
name: (proj, self.dir.create_or_open(db, "index:"+name)) | |
for name,proj in indexes.items() | |
if name!="primary_key" } | |
self.changefeeds = { | |
name: self.dir.create_or_open(db, "changefeed:"+name) | |
for name in indexes.keys() } | |
@fdb.transactional | |
def insert(self, tr, document): | |
pk = self.pk(document) | |
# Check primary key constraint | |
if tr[ self.dir[pk] ] != None: | |
raise Exception("Duplicate primary key for " + self.name + ": " + repr(pk)) | |
# Insert into primary key index | |
tr[ self.dir[pk] ] = json.dumps(document).encode("utf8") | |
self._update_changefeed(tr, "primary_key", pk, pk) | |
# Insert into other indexes | |
for name, (proj, idir) in self.indexes.items(): | |
val = proj(document) | |
tr[ idir[val][pk] ] = b"" | |
self._update_changefeed(tr, name, val, pk) | |
@fdb.transactional | |
def update(self, tr, document): | |
pk = self.pk(document) | |
old_doc = self.get(tr, pk) | |
if old_doc: | |
# Update non-primary key indexes | |
for name, (proj, idir) in self.indexes.items(): | |
old_val = proj(old_doc) | |
new_val = proj(document) | |
if old_val != new_val: | |
del tr[ idir[old_val][pk] ] | |
tr[ idir[new_val][pk] ] = b"" | |
self._update_changefeed(tr, name, old_val, pk) #< Documents no longer qualifying for a changefeed should appear in its log? | |
self._update_changefeed(tr, name, new_val, pk) | |
# Update primary key index | |
tr[ self.dir[pk] ] = json.dumps(document).encode("utf8") | |
self._update_changefeed(tr, "primary_key", pk, pk) | |
@fdb.transactional | |
def get(self, tr, pk): | |
v = tr[ self.dir[pk] ] | |
if v==None: return None | |
return json.loads(v.decode("utf8")) | |
@fdb.transactional | |
def index_scan(self, tr, index, value): | |
scanSubspace = self.indexes[index][1][value] | |
pks = [] | |
for k,v in tr[ scanSubspace.range() ]: | |
pk, = scanSubspace.unpack(k) | |
pks.append(pk) | |
docs = [ tr[ self.dir[pk] ] for pk in pks ] | |
return [ json.loads(v.decode("utf8")) for v in docs ] | |
@fdb.transactional | |
def changefeed(self, tr, index, value, unique_cfid): | |
cf = self.changefeeds[index][value] | |
cflist = cf[1] | |
cflog = cf[2] | |
# set up the change feed monitoring transactionally | |
tr.byte_max(cf, b"") | |
tr[ cflist[unique_cfid] ] = b"" | |
watch = tr.watch(cf) | |
# create and call a (generator) function which will | |
# execute outside the transaction `tr`, watching for changes | |
# and returning them | |
def tail_changefeed(db, start_ver, watch): | |
try: | |
docs = [] | |
while True: | |
yield docs | |
watch.wait() | |
@fdb.transactional | |
def read_changefeed(tr): | |
log = [ fdb.tuple.unpack(v)[0] for k,v in tr[ | |
cflog[fdb.tuple.Versionstamp(struct.pack(">QH",start_ver.wait()+1,0))]: | |
cflog.range().stop ] ] | |
log = list(set(log)) # "Squash" updates by PK. Our log doesn't (currently) contain enough info for unsquashed updates since it only stores the PK | |
docs = [ self.get(tr, pk) for pk in log ] | |
return docs, tr.get_read_version(), tr.watch( cf ) | |
docs, start_ver, watch = read_changefeed( db ) | |
finally: | |
# Closing the changefeed happens in a separate transaction | |
@fdb.transactional | |
def close_changefeed(tr): | |
del tr[ cflist[unique_cfid] ] | |
if list( tr.get_range( cflist, cflist.range().stop, limit=1 ) ) == []: | |
del tr[ self.changefeeds[index][value] : self.changefeeds[index][value].range().stop ] | |
close_changefeed(db) | |
return tail_changefeed(tr.db, tr.get_read_version(), watch) | |
def _update_changefeed(self, tr, name, val, pk): | |
# check for and update changefeeds | |
if tr[ self.changefeeds[name][val] ] != None: | |
#print("Posting to changefeed", self.name, name, val, pk) | |
tr[ self.changefeeds[name][val] ] = fdb.tuple.pack( (uuid.uuid4(),) ) | |
tr.set_versionstamped_key( | |
self.changefeeds[name][val][2].pack_with_versionstamp((fdb.tuple.Versionstamp(),)), | |
fdb.tuple.pack((pk,)) ) | |
db = fdb.open() | |
fdb.directory.remove_if_exists( db, "reactive_test" ) | |
root = fdb.directory.create_or_open( db, "reactive_test" ) | |
users = Collection("users", | |
indexes = { | |
"primary_key" : lambda doc: doc["id"] | |
}) | |
posts = Collection("posts", | |
indexes = { | |
"primary_key" : lambda doc: doc["id"], | |
"user" : lambda doc: doc["user_id"] | |
}) | |
def show_changes(collection, index, value): | |
it = iter( collection.changefeed(db, index, value, uuid.uuid4()) ) | |
for chs in it: | |
for ch in chs: | |
sys.stdout.write("Change to " + collection.name + ": " + str(ch) + "\n") | |
import threading | |
users.insert( db, {"id": 100, "name": "Alice", "profile": "public"} ) | |
threading.Thread( target=show_changes, args=(users,"primary_key",100) ).start() | |
threading.Thread( target=show_changes, args=(posts,"user",100) ).start() | |
users.insert( db, {"id": 101, "name": "Bob", "profile": "private"} ) | |
posts.insert( db, {"id": 1000, "user_id": 100, "subject": "Hi!"} ) | |
posts.insert( db, {"id": 1001, "user_id": 100, "subject": "Question"} ) | |
users.update( db, {"id": 100, "name": "Alice the Great", "profile":"public"} ) | |
posts.update( db, {"id": 1001, "user_id": 100, "subject": "Question??"} ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment