Created
February 4, 2024 20:04
-
-
Save prydt/2ec58436c4aecec215300c6df76bf763 to your computer and use it in GitHub Desktop.
very naive datalog engine in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum, auto | |
class Relations(Enum): | |
PARENT = auto() | |
ANCESTOR = auto() | |
class Vars(Enum): | |
X = auto() | |
Y = auto() | |
Z = auto() | |
class Fact: | |
def __init__(self, fact): | |
self.fact = fact | |
def __eq__(self, other): | |
return self.fact == other.fact | |
def __str__(self): | |
return str(self.fact) | |
def match(self, fact): | |
binding = dict() | |
for i, atom in enumerate(self.fact): | |
other_atom = fact.fact[i] | |
# this fact acts as the rule | |
if isinstance(atom, Vars): | |
if atom in binding: | |
if binding[atom] != other_atom: | |
return False | |
else: | |
binding[atom] = other_atom | |
else: | |
if atom != other_atom: | |
return False | |
return binding | |
def make_from_bindings(self, binding): | |
l = [] | |
for atom in self.fact: | |
if atom in binding: | |
l.append(binding[atom]) | |
else: | |
l.append(atom) | |
return Fact(tuple(l)) | |
class Rule: | |
def __init__(self, output, triggers): | |
self.output = output | |
self.triggers = triggers | |
def triggered(self, fact): | |
bindings = [trigger.match(fact) for trigger in self.triggers] | |
if not all(bindings): | |
return False | |
binding = bindings[0] | |
for b in bindings[1:]: | |
if b != binding: | |
return False | |
return binding | |
def apply_to_db(self, db): | |
outlist = [] | |
for fact in db: | |
binding = self.triggered(fact) | |
if binding != False: | |
new_fact = self.output.make_from_bindings(binding) | |
if new_fact not in db: | |
outlist.append(new_fact) | |
return outlist | |
# Example from Wikipedia (https://en.m.wikipedia.org/wiki/Datalog) | |
facts = [ | |
Fact((Relations.PARENT, "xerces", "brooke")), | |
Fact((Relations.PARENT, "brooke", "damocles")), | |
] | |
rules = [ | |
Rule( | |
Fact((Relations.ANCESTOR, Vars.X, Vars.Y)), | |
[Fact((Relations.PARENT, Vars.X, Vars.Y))], | |
), | |
Rule( | |
Fact((Relations.ANCESTOR, Vars.X, Vars.Y)), | |
[ | |
Fact((Relations.PARENT, Vars.X, Vars.Z)), | |
Fact((Relations.PARENT, Vars.Z, Vars.Y)), | |
], | |
), | |
] | |
def fixpoint(): | |
db = facts | |
recent = db | |
to_add = [] | |
while True: | |
for rule in rules: | |
to_add += rule.apply_to_db(recent) | |
db += recent | |
if len(to_add) == 0: | |
break | |
print("NEW FACTS: ") | |
for f in to_add: | |
print(f" - {f}") | |
recent = to_add.copy() | |
to_add.clear() | |
if __name__ == "__main__": | |
fixpoint() | |
# outputs: | |
# NEW FACTS: | |
# - (<Relations.ANCESTOR: 2>, 'xerces', 'brooke') | |
# - (<Relations.ANCESTOR: 2>, 'brooke', 'damocles') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
realized theres a bug. the binding feature does not backtrack. will fix later