Skip to content

Instantly share code, notes, and snippets.

@mbafford
Created February 19, 2024 11:55
Show Gist options
  • Save mbafford/ff2ece412ce3188f9eab692b3823e917 to your computer and use it in GitHub Desktop.
Save mbafford/ff2ece412ce3188f9eab692b3823e917 to your computer and use it in GitHub Desktop.
plaid-sync sqlite Source for beancount-importer
"""
Meant to be used with:
https://github.com/mbafford/plaid-sync/
and:
https://github.com/jbms/beancount-import
Set up as a data source in your beancount-import config:
dict(
module=ruleswrapper.__name__,
wrapped_module=plaid.__name__,
dbfile=os.path.join(data_dir, 'plaid/transactions.db'),
earliest_date=datetime.date(2019, 1, 1)
),
"""
from typing import Any, List, Dict, Union, Optional, Set
import json
import datetime
import collections
import re
import os
import numbers
import sqlite3
from beancount.core.data import Transaction, Posting, Balance, EMPTY_SET
from beancount.core.amount import Amount
from beancount.core.flags import FLAG_OKAY
from beancount.core.number import MISSING, D, ZERO
from beancount_import.source import link_based_source, description_based_source, Source
from beancount_import.source import ImportResult, SourceResults
from beancount_import.matching import FIXME_ACCOUNT
from beancount_import.journal_editor import JournalEditor
# account may be either the mint_id or the journal account name
PlaidEntry = collections.namedtuple(
'PlaidEntry',
[
'date',
'transaction_id', 'pending_transaction_id',
'pending',
'account_id',
'amount',
'name',
'payment_channel',
'transaction_type',
'payment_processor',
'filename',
'categories',
'address', 'city', 'region', 'postal',
]
)
PlaidBalance = collections.namedtuple(
'PlaidBalance', ['account_id', 'account_type', 'date', 'balance_current', 'currency_code'])
plaid_date_format = '%Y-%m-%d'
def parse_date( val: str ):
if not val: return None
try:
return datetime.datetime.strptime(val, plaid_date_format).date()
except Exception as e:
raise RuntimeError('Invalid date: %r' % val) from e
def load_entry( data:Dict, filename )->PlaidEntry:
date = parse_date(data['date'])
# str() to lock in the decimal places parsed from the JSON (double value),
# otherwise, calling D() directly on the amount from the JSON results in
# floating point precision issues
amount = -1*round(D(str(data['amount'])),2)
return PlaidEntry(
account_id = data['account_id'],
transaction_id = data.get('transaction_id', None),
pending_transaction_id = data.get('pending_transaction_id', None),
pending = data['pending'],
date = date,
address = data['location']['address'],
city = data['location']['city'],
region = data['location']['region'],
postal = data['location']['postal_code'],
amount = Amount(amount, data['iso_currency_code']),
name = data['name'],
payment_channel = data['payment_channel'],
transaction_type = data['transaction_type'],
payment_processor = data['payment_meta']['payment_processor'],
filename = filename,
categories = " / ".join(data['category']) if data['category'] else None
)
def load_transactions_from_sqlitedb(dbfile: str, earliest_date: datetime.date = None)->List[PlaidEntry]:
con = sqlite3.connect(dbfile)
c = con.cursor()
if earliest_date is None:
earliest_date = datetime.date(1900,1,1)
r = c.execute("""
select plaid_json from transactions
where archived is null
and json_extract(plaid_json, '$.date') >= ?
""", [earliest_date])
return list(map(lambda r: load_entry( json.loads(r[0] ), dbfile ), r.fetchall() ))
def load_balances_from_sqlitedb(dbfile: str) -> List[PlaidBalance]:
con = sqlite3.connect(dbfile)
c = con.cursor()
r = c.execute("""
select account_id, account_type, date, balance_current, currency_code from balances
""")
return [ PlaidBalance(*e) for e in r.fetchall() ]
def load_transactions_from_file(filename: str, currency: str = 'USD') -> List[PlaidEntry]:
try:
entries = []
filename = os.path.abspath(filename)
with open(filename, 'r', encoding='utf-8', newline='') as f:
for line in f:
data = json.loads(line)
entries.append( load_entry(data, filename) )
entries.reverse()
entries.sort(key=lambda x: x.date) # sort by date
return entries
except Exception as e:
raise RuntimeError('Plaid JSON-L file has incorrect format', filename) from e
def get_info(raw_entry: PlaidEntry) -> dict:
return dict(
type='text/csv',
filename=raw_entry.filename,
# line=raw_entry.line,
)
def _make_balance_entry(account: str, balance: PlaidBalance):
account_balance = round(D(str(balance.balance_current)),2)
amount = account_balance
if balance.account_type != 'depository':
amount = -1 * amount
amount = Amount(amount, balance.currency_code)
return ImportResult(
date=balance.date,
info=None,
entries=[
Balance(
account=account,
date=parse_date(balance.date),
meta=None,
amount=amount,
tolerance=None,
diff_amount=None)
])
def _make_import_result(account: str, plaid_entry: PlaidEntry) -> ImportResult:
meta = collections.OrderedDict()
for field,value in plaid_entry._asdict().items():
if field == "filename": continue
if field == "pending": continue
if value is not None:
meta["plaid_%s" % field] = value
# in order to avoid conflicts when merging in a cleared entry over a pending entry
# need to rename some fields so there won't be conflicts between the cleared and pending
# this also allows us to clearly identify pending vs. cleared entries
if plaid_entry.pending:
if plaid_entry.pending_transaction_id and plaid_entry.transaction_id:
print("PENDING entry has both transaction_id and pending_transaction_id")
import pdb; pdb.set_trace
else:
# rename key metadata to have _pending_ in the name
# date, amount, and name often change between the pending transaction and the final
# transaction - so name them plaid_pending_ to avoid the matcher thinking they are
# different transactions
# transaction_id ALWAYS changes between pending and final - and final has a key
# plaid_pending_transaction_id that's the same as the pending entry's transaction_id
for attr in ['transaction_id', 'date', 'amount', 'name']:
meta['plaid_pending_%s' % attr] = meta['plaid_%s' % attr]
del(meta['plaid_%s' % attr])
# transaction_ids = [ 'plaid.%s' % plaid_entry.transaction_id ]
# if plaid_entry.pending_transaction_id:
# transaction_ids.append( 'plaid.%s' % plaid_entry.pending_transaction_id )
src_posting = Posting(
account=account,
units=plaid_entry.amount,
cost=None,
price=None,
flag=None,
meta=meta,
)
dst_posting = Posting(
account=FIXME_ACCOUNT, # rule_overrides.get('account', FIXME_ACCOUNT),
units=-plaid_entry.amount,
cost=None,
price=None,
flag=None,
meta=None,
)
flag = FLAG_OKAY # rule_overrides.get('flag', FLAG_OKAY)
# TODO: should this be a configuration option?
# if plaid_entry.pending:
# flag = '!'
transaction = Transaction(
meta = None,
date = plaid_entry.date,
flag = flag,
payee = plaid_entry.name, # rule_overrides.get('payee', plaid_entry.name),
narration = None, # rule_overrides.get('narration', None),
tags = EMPTY_SET, # rule_overrides.get('tags', EMPTY_SET),
links = EMPTY_SET, # frozenset(transaction_ids),
postings=[
src_posting,
dst_posting,
])
return ImportResult(
date=plaid_entry.date, info=get_info(plaid_entry), entries=[transaction])
# Previously, this would be based on the links concept, but I'm not sure what value links have
# over just using the metadata property. One definite disadvantage of links is they pollute the
# Fava web interface search/filter drop-down.
#
# Searches through all existing Transactions and pulls out the existing `plaid_transaction_id`
# values.
#
# OLD approach: self.get_entries_with_link( all_entries=journal.all_entries, results=results, valid_links=transaction_ids)
def _get_existing_plaid_transaction_ids( all_entries ):
tids = set()
ptids = set()
for entry in all_entries:
if not isinstance(entry, Transaction): continue
for posting in entry.postings:
tid = posting.meta.get('plaid_transaction_id')
if tid: tids.add(tid)
ptid = posting.meta.get('plaid_pending_transaction_id')
if ptid: ptids.add(ptid)
return (tids, ptids,)
class PlaidSource(link_based_source.LinkBasedSource, Source):
def __init__(self,
filename: str = None,
dir: str = None,
dbfile: str = None,
earliest_date: datetime.date = None,
# TODO: include balances like Mint does? balances_directory: Optional[str] = None,
**kwargs) -> None:
super().__init__(link_prefix='plaid.', **kwargs)
self.log_status('Plaid: loading %s' % filename)
if dir:
self.plaid_entries = []
for filename in os.listdir(dir):
if not filename.endswith('.jsonl'): continue
filename = os.path.join(dir, filename)
entries = load_transactions_from_file(filename=filename)
self.log_status("Loaded %d entries from: %s" % ( len(entries), filename ) )
self.plaid_entries.extend( entries )
elif dbfile:
self.plaid_entries = load_transactions_from_sqlitedb(dbfile, earliest_date)
self.plaid_balances = load_balances_from_sqlitedb(dbfile)
self.log_status("Loaded %d entries from: %s" % ( len(self.plaid_entries), dbfile ) )
elif filename:
self.plaid_entries = load_transactions_from_file(filename=filename)
self.log_status("Loaded %d entries from: %s" % ( len(self.plaid_entries), filename ) )
else:
raise RuntimeError("Plaid module: Must specify either `dir` or `filename`")
def prepare(self, journal: JournalEditor, results: SourceResults) -> None:
bc2plaid, plaid2bc = description_based_source.get_account_mapping(journal.accounts, 'plaid_id')
new_tids = set([ e.transaction_id for e in self.plaid_entries])
(existing_tids, existing_ptids) = _get_existing_plaid_transaction_ids( journal.all_entries )
unknown_accounts = set()
# for balance in self.plaid_balances:
# account = plaid2bc.get( balance.account_id, None )
# if not account:
# unknown_accounts.add( balance.account_id )
# else:
# results.add_pending_entry(_make_balance_entry(account, balance))
for entry in self.plaid_entries:
if entry.transaction_id in existing_tids: continue
if entry.pending and entry.transaction_id in existing_ptids: continue
account = plaid2bc.get( entry.account_id, None )
if not account:
unknown_accounts.add( entry.account_id )
else:
results.add_pending_entry(_make_import_result(account, entry))
if len(unknown_accounts) > 0:
self.log_status("Transactions from the following Plaid accounts were ignored.")
for account_id in unknown_accounts:
self.log_status(" %s" % account_id)
self.log_status("Please add these accounts to your journal, marked with plaid_id attributes.")
results.add_accounts(bc2plaid.keys())
def get_example_key_value_pairs(self, transaction: Transaction, posting: Posting):
ret = {}
def maybe_add_key(key):
x = posting.meta.get(key)
if x is not None:
ret[key] = x
maybe_add_key('plaid_name')
maybe_add_key('plaid_pending_name')
maybe_add_key('plaid_account_id')
# maybe_add_key('plaid_pending_categories')
# maybe_add_key('plaid_categories')
return ret
def is_posting_cleared(self, posting: Posting):
if not posting.meta:
return False
tid = posting.meta.get('plaid_transaction_id')
if tid is None:
return False
return True
@property
def name(self):
return 'plaid'
def load(spec, log_status):
return PlaidSource(log_status=log_status, **spec)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment