Skip to content

Instantly share code, notes, and snippets.

@mbafford
Created April 8, 2023 13:18
Show Gist options
  • Save mbafford/6f06b032751a50cb58133aee76f2ba06 to your computer and use it in GitHub Desktop.
Save mbafford/6f06b032751a50cb58133aee76f2ba06 to your computer and use it in GitHub Desktop.
ruleswrapper for beancount-import

Rough guide to using my ruleswrapper.py class to hard-code certain rules in beancount-import.

See the example config - add the ruleswrapper as the data source, and specify the module to be wrapped as wrapped_module.

Any configuration parameters meant for the wrapped module should be added to the config passed to ruleswrapper.

Code may not compile exactly as-is - roughly edited to remove sensitive data.

TODO:

  • split rules into standalone file, so ruleswrapper.py can be shared more easily
data_sources = [
dict(
module=ruleswrapper.__name__,
wrapped_module=plaid.__name__,
dbfile=os.path.join(data_dir, 'plaid/transactions.db'),
earliest_date=datetime.date(2019, 1, 1)
#dir=os.path.join(data_dir, 'plaid/')
),
from typing import Any, Dict, List, OrderedDict
import re
from beancount.core.data import Posting, Transaction, Balance
from beancount_import.journal_editor import JournalEditor
from beancount_import.source import LogFunction, Source, SourceResults, SourceSpec
import importlib
import numbers
from beancount.core.number import MISSING, D, ZERO
def instr(needle):
return lambda hay: needle in hay
RULES=[
{
'criteria': [
("name", lambda n: re.match(r'^(amazon|AMZNGrcy|AMZN Mktp)', n, re.I) is not None),
],
'attributes': {
'payee': "Amazon",
'account': "Expenses:FIXME",
}
},
{
'criteria': [
("name" , lambda n: re.search(r'uber eats', n, re.I)),
("amount", lambda a: abs(a.number) == D('5') ),
],
'attributes': {
'payee': 'Uber Eats',
'tags': ['tips'],
}
},
{ # WITHDRAWAL TRANSFER TO LOAN 0001
'criteria': [
("name" , lambda n: re.search(r'TRANSFER TO LOAN', n, re.I) is not None),
("account_id", lambda i: i == "pzZp6rN8J3XSJq1Jxq" ),
],
'attributes': {
'payee': 'Auto Loan Payment',
'account': 'Assets:Payments:Auto:ThatCar',
}
},
{ 'criteria': [ ("name", instr("WHOLEFDS") ) ], 'attributes': { 'payee': 'Whole Foods' }},
{ 'criteria': [ ("name", instr("CHIPOTLE") ) ], 'attributes': { 'payee': 'Chipotle', 'narration': '', 'account': "Expenses:Food:Restaurants" }},
{ 'criteria': [ ("name", instr(" CAVA ") ) ], 'attributes': { 'payee': 'Cava', 'narration': '', 'account': "Expenses:Food:Restaurants" }},
{ 'criteria': [ ("name", "AUTOPAY PAYMENT - THANK YOU" ) ], 'attributes': { 'payee': "Credit Card Payment" } },
{ 'criteria': [ ("name", "AUTOMATIC PAYMENT - THANK" ) ], 'attributes': { 'payee': 'Credit Card Payment' } },
{ 'criteria': [ ("name", "CHASE CREDIT CRD AUTOPAY" ) ], 'attributes': { 'payee': 'Credit Card Payment' } },
{ 'criteria': [ ("name", instr("YouTubePremium") ) ], 'attributes': { 'payee': "YouTube Premium" , 'narration': None, 'account': 'Expenses:Entertainment:Music' } },
]
def matches_all_criteria( plaid_entry: dict, rule: dict ):
for criteria in rule['criteria']:
attr, logic = criteria
if attr == "plaid":
value = plaid_entry
elif attr == "name":
value = plaid_entry.get( 'plaid_name', plaid_entry.get('plaid_pending_name', plaid_entry.get('ofx_name') ) )
else:
value = plaid_entry.get( attr )
# can't match the criteria if there's no value for the specified attribute
# can't match ALL criteria if one doesn't match
if not value:
return False
if callable(logic):
match = logic( value )
elif isinstance(logic, str):
match = ( value.lower() == logic.lower() )
elif isinstance(logic, numbers.Number):
match = ( value == D(str(value)) )
else:
raise RuntimeError("Unsupported match criteria type: %s" % type(logic))
# can't match ALL criteria if one doesn't match
if not match:
return False
return True
def match_rules( posting: Posting ) -> List[Dict[str, Any]]:
entry_dict:OrderedDict[str, Any] = posting.meta
apply = []
for rule in RULES:
match = matches_all_criteria( entry_dict, rule )
if not match: continue
apply.append( rule['attributes'] )
return apply
def merge_rule_attributes( to_apply: list ) -> Dict[str, Any]:
merged: Dict[str, Any] = {
'payee': None,
'narration': None,
'account': None,
'tags': set(),
'flag': None,
}
for apply in to_apply:
for attr, value in apply.items():
if not attr in merged:
raise RuntimeError("Plaid module: Rule updates unknown attribute [%s]" % attr)
if isinstance( merged[attr], set ):
if isinstance( value, str ): value = [ value ]
for v in value:
merged[attr].add( v )
else:
merged[attr] = value
# remove unset entries so the logic below can use .get(attr, DEFAULT)
return dict([(k,v) for k,v in merged.items() if v is not None])
def load(spec: SourceSpec, log_status: LogFunction):
if not 'wrapped_module' in spec:
raise Exception("ruleswrapper source requires wrapped_module specifying the module being wrapped")
original_module = importlib.import_module(spec.pop('wrapped_module'))
loaded: Source = original_module.load(spec, log_status)
old_prepare = loaded.prepare
def new_prepare(journal: JournalEditor, results: SourceResults):
old_prepare(journal, results)
for result in results.pending: # Type: ImportResult
for i, entry in enumerate(result.entries): # Type: Directive
if type(entry) != Transaction:
# only supports Transactions, although it might make sense
# to extend to other types of Directives
continue
# current logic assumes a single Posting with an account
# owned by this source, and a single Posting with an account
# not owned by this source - any more than that, and not sure
# how to proceed
owned_postings = list(filter(lambda p: p.account in results.accounts, entry.postings))
other_postings = list(filter(lambda p: p.account not in results.accounts, entry.postings))
if len(owned_postings) != 1 or len(other_postings) != 1:
log_status("Unable to process rules - needed 1 owned posting and 1 other posting, got: %d owned, %d other" % (len(owned_postings), len(other_postings)))
continue
matches = match_rules( owned_postings[0] )
if matches:
rule_overrides = merge_rule_attributes( matches )
other_postings = [
Posting(
account = rule_overrides.get('account', other_postings[0].account),
units = other_postings[0].units,
cost = other_postings[0].cost,
price = other_postings[0].price,
flag = other_postings[0].flag,
meta = other_postings[0].meta,
)
]
result.entries[i] = Transaction(
meta = entry.meta,
date = entry.date,
flag = entry.flag,
payee = rule_overrides.get('payee', entry.payee),
narration = rule_overrides.get('narration', entry.narration),
tags = rule_overrides.get('tags', entry.tags),
links = entry.links,
postings = [
owned_postings[0],
other_postings[0]
]
)
# result.entries = list(filter(lambda e: e is not None, result.entries))
loaded.prepare = new_prepare
return loaded
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment