Created
February 19, 2024 11:55
-
-
Save mbafford/ff2ece412ce3188f9eab692b3823e917 to your computer and use it in GitHub Desktop.
plaid-sync sqlite Source for beancount-importer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Meant to be used with: | |
https://github.com/mbafford/plaid-sync/ | |
and: | |
https://github.com/jbms/beancount-import | |
Set up as a data source in your beancount-import config: | |
dict( | |
module=ruleswrapper.__name__, | |
wrapped_module=plaid.__name__, | |
dbfile=os.path.join(data_dir, 'plaid/transactions.db'), | |
earliest_date=datetime.date(2019, 1, 1) | |
), | |
""" | |
from typing import Any, List, Dict, Union, Optional, Set | |
import json | |
import datetime | |
import collections | |
import re | |
import os | |
import numbers | |
import sqlite3 | |
from beancount.core.data import Transaction, Posting, Balance, EMPTY_SET | |
from beancount.core.amount import Amount | |
from beancount.core.flags import FLAG_OKAY | |
from beancount.core.number import MISSING, D, ZERO | |
from beancount_import.source import link_based_source, description_based_source, Source | |
from beancount_import.source import ImportResult, SourceResults | |
from beancount_import.matching import FIXME_ACCOUNT | |
from beancount_import.journal_editor import JournalEditor | |
# account may be either the mint_id or the journal account name | |
PlaidEntry = collections.namedtuple( | |
'PlaidEntry', | |
[ | |
'date', | |
'transaction_id', 'pending_transaction_id', | |
'pending', | |
'account_id', | |
'amount', | |
'name', | |
'payment_channel', | |
'transaction_type', | |
'payment_processor', | |
'filename', | |
'categories', | |
'address', 'city', 'region', 'postal', | |
] | |
) | |
PlaidBalance = collections.namedtuple( | |
'PlaidBalance', ['account_id', 'account_type', 'date', 'balance_current', 'currency_code']) | |
plaid_date_format = '%Y-%m-%d' | |
def parse_date( val: str ): | |
if not val: return None | |
try: | |
return datetime.datetime.strptime(val, plaid_date_format).date() | |
except Exception as e: | |
raise RuntimeError('Invalid date: %r' % val) from e | |
def load_entry( data:Dict, filename )->PlaidEntry: | |
date = parse_date(data['date']) | |
# str() to lock in the decimal places parsed from the JSON (double value), | |
# otherwise, calling D() directly on the amount from the JSON results in | |
# floating point precision issues | |
amount = -1*round(D(str(data['amount'])),2) | |
return PlaidEntry( | |
account_id = data['account_id'], | |
transaction_id = data.get('transaction_id', None), | |
pending_transaction_id = data.get('pending_transaction_id', None), | |
pending = data['pending'], | |
date = date, | |
address = data['location']['address'], | |
city = data['location']['city'], | |
region = data['location']['region'], | |
postal = data['location']['postal_code'], | |
amount = Amount(amount, data['iso_currency_code']), | |
name = data['name'], | |
payment_channel = data['payment_channel'], | |
transaction_type = data['transaction_type'], | |
payment_processor = data['payment_meta']['payment_processor'], | |
filename = filename, | |
categories = " / ".join(data['category']) if data['category'] else None | |
) | |
def load_transactions_from_sqlitedb(dbfile: str, earliest_date: datetime.date = None)->List[PlaidEntry]: | |
con = sqlite3.connect(dbfile) | |
c = con.cursor() | |
if earliest_date is None: | |
earliest_date = datetime.date(1900,1,1) | |
r = c.execute(""" | |
select plaid_json from transactions | |
where archived is null | |
and json_extract(plaid_json, '$.date') >= ? | |
""", [earliest_date]) | |
return list(map(lambda r: load_entry( json.loads(r[0] ), dbfile ), r.fetchall() )) | |
def load_balances_from_sqlitedb(dbfile: str) -> List[PlaidBalance]: | |
con = sqlite3.connect(dbfile) | |
c = con.cursor() | |
r = c.execute(""" | |
select account_id, account_type, date, balance_current, currency_code from balances | |
""") | |
return [ PlaidBalance(*e) for e in r.fetchall() ] | |
def load_transactions_from_file(filename: str, currency: str = 'USD') -> List[PlaidEntry]: | |
try: | |
entries = [] | |
filename = os.path.abspath(filename) | |
with open(filename, 'r', encoding='utf-8', newline='') as f: | |
for line in f: | |
data = json.loads(line) | |
entries.append( load_entry(data, filename) ) | |
entries.reverse() | |
entries.sort(key=lambda x: x.date) # sort by date | |
return entries | |
except Exception as e: | |
raise RuntimeError('Plaid JSON-L file has incorrect format', filename) from e | |
def get_info(raw_entry: PlaidEntry) -> dict: | |
return dict( | |
type='text/csv', | |
filename=raw_entry.filename, | |
# line=raw_entry.line, | |
) | |
def _make_balance_entry(account: str, balance: PlaidBalance): | |
account_balance = round(D(str(balance.balance_current)),2) | |
amount = account_balance | |
if balance.account_type != 'depository': | |
amount = -1 * amount | |
amount = Amount(amount, balance.currency_code) | |
return ImportResult( | |
date=balance.date, | |
info=None, | |
entries=[ | |
Balance( | |
account=account, | |
date=parse_date(balance.date), | |
meta=None, | |
amount=amount, | |
tolerance=None, | |
diff_amount=None) | |
]) | |
def _make_import_result(account: str, plaid_entry: PlaidEntry) -> ImportResult: | |
meta = collections.OrderedDict() | |
for field,value in plaid_entry._asdict().items(): | |
if field == "filename": continue | |
if field == "pending": continue | |
if value is not None: | |
meta["plaid_%s" % field] = value | |
# in order to avoid conflicts when merging in a cleared entry over a pending entry | |
# need to rename some fields so there won't be conflicts between the cleared and pending | |
# this also allows us to clearly identify pending vs. cleared entries | |
if plaid_entry.pending: | |
if plaid_entry.pending_transaction_id and plaid_entry.transaction_id: | |
print("PENDING entry has both transaction_id and pending_transaction_id") | |
import pdb; pdb.set_trace | |
else: | |
# rename key metadata to have _pending_ in the name | |
# date, amount, and name often change between the pending transaction and the final | |
# transaction - so name them plaid_pending_ to avoid the matcher thinking they are | |
# different transactions | |
# transaction_id ALWAYS changes between pending and final - and final has a key | |
# plaid_pending_transaction_id that's the same as the pending entry's transaction_id | |
for attr in ['transaction_id', 'date', 'amount', 'name']: | |
meta['plaid_pending_%s' % attr] = meta['plaid_%s' % attr] | |
del(meta['plaid_%s' % attr]) | |
# transaction_ids = [ 'plaid.%s' % plaid_entry.transaction_id ] | |
# if plaid_entry.pending_transaction_id: | |
# transaction_ids.append( 'plaid.%s' % plaid_entry.pending_transaction_id ) | |
src_posting = Posting( | |
account=account, | |
units=plaid_entry.amount, | |
cost=None, | |
price=None, | |
flag=None, | |
meta=meta, | |
) | |
dst_posting = Posting( | |
account=FIXME_ACCOUNT, # rule_overrides.get('account', FIXME_ACCOUNT), | |
units=-plaid_entry.amount, | |
cost=None, | |
price=None, | |
flag=None, | |
meta=None, | |
) | |
flag = FLAG_OKAY # rule_overrides.get('flag', FLAG_OKAY) | |
# TODO: should this be a configuration option? | |
# if plaid_entry.pending: | |
# flag = '!' | |
transaction = Transaction( | |
meta = None, | |
date = plaid_entry.date, | |
flag = flag, | |
payee = plaid_entry.name, # rule_overrides.get('payee', plaid_entry.name), | |
narration = None, # rule_overrides.get('narration', None), | |
tags = EMPTY_SET, # rule_overrides.get('tags', EMPTY_SET), | |
links = EMPTY_SET, # frozenset(transaction_ids), | |
postings=[ | |
src_posting, | |
dst_posting, | |
]) | |
return ImportResult( | |
date=plaid_entry.date, info=get_info(plaid_entry), entries=[transaction]) | |
# Previously, this would be based on the links concept, but I'm not sure what value links have | |
# over just using the metadata property. One definite disadvantage of links is they pollute the | |
# Fava web interface search/filter drop-down. | |
# | |
# Searches through all existing Transactions and pulls out the existing `plaid_transaction_id` | |
# values. | |
# | |
# OLD approach: self.get_entries_with_link( all_entries=journal.all_entries, results=results, valid_links=transaction_ids) | |
def _get_existing_plaid_transaction_ids( all_entries ): | |
tids = set() | |
ptids = set() | |
for entry in all_entries: | |
if not isinstance(entry, Transaction): continue | |
for posting in entry.postings: | |
tid = posting.meta.get('plaid_transaction_id') | |
if tid: tids.add(tid) | |
ptid = posting.meta.get('plaid_pending_transaction_id') | |
if ptid: ptids.add(ptid) | |
return (tids, ptids,) | |
class PlaidSource(link_based_source.LinkBasedSource, Source): | |
def __init__(self, | |
filename: str = None, | |
dir: str = None, | |
dbfile: str = None, | |
earliest_date: datetime.date = None, | |
# TODO: include balances like Mint does? balances_directory: Optional[str] = None, | |
**kwargs) -> None: | |
super().__init__(link_prefix='plaid.', **kwargs) | |
self.log_status('Plaid: loading %s' % filename) | |
if dir: | |
self.plaid_entries = [] | |
for filename in os.listdir(dir): | |
if not filename.endswith('.jsonl'): continue | |
filename = os.path.join(dir, filename) | |
entries = load_transactions_from_file(filename=filename) | |
self.log_status("Loaded %d entries from: %s" % ( len(entries), filename ) ) | |
self.plaid_entries.extend( entries ) | |
elif dbfile: | |
self.plaid_entries = load_transactions_from_sqlitedb(dbfile, earliest_date) | |
self.plaid_balances = load_balances_from_sqlitedb(dbfile) | |
self.log_status("Loaded %d entries from: %s" % ( len(self.plaid_entries), dbfile ) ) | |
elif filename: | |
self.plaid_entries = load_transactions_from_file(filename=filename) | |
self.log_status("Loaded %d entries from: %s" % ( len(self.plaid_entries), filename ) ) | |
else: | |
raise RuntimeError("Plaid module: Must specify either `dir` or `filename`") | |
def prepare(self, journal: JournalEditor, results: SourceResults) -> None: | |
bc2plaid, plaid2bc = description_based_source.get_account_mapping(journal.accounts, 'plaid_id') | |
new_tids = set([ e.transaction_id for e in self.plaid_entries]) | |
(existing_tids, existing_ptids) = _get_existing_plaid_transaction_ids( journal.all_entries ) | |
unknown_accounts = set() | |
# for balance in self.plaid_balances: | |
# account = plaid2bc.get( balance.account_id, None ) | |
# if not account: | |
# unknown_accounts.add( balance.account_id ) | |
# else: | |
# results.add_pending_entry(_make_balance_entry(account, balance)) | |
for entry in self.plaid_entries: | |
if entry.transaction_id in existing_tids: continue | |
if entry.pending and entry.transaction_id in existing_ptids: continue | |
account = plaid2bc.get( entry.account_id, None ) | |
if not account: | |
unknown_accounts.add( entry.account_id ) | |
else: | |
results.add_pending_entry(_make_import_result(account, entry)) | |
if len(unknown_accounts) > 0: | |
self.log_status("Transactions from the following Plaid accounts were ignored.") | |
for account_id in unknown_accounts: | |
self.log_status(" %s" % account_id) | |
self.log_status("Please add these accounts to your journal, marked with plaid_id attributes.") | |
results.add_accounts(bc2plaid.keys()) | |
def get_example_key_value_pairs(self, transaction: Transaction, posting: Posting): | |
ret = {} | |
def maybe_add_key(key): | |
x = posting.meta.get(key) | |
if x is not None: | |
ret[key] = x | |
maybe_add_key('plaid_name') | |
maybe_add_key('plaid_pending_name') | |
maybe_add_key('plaid_account_id') | |
# maybe_add_key('plaid_pending_categories') | |
# maybe_add_key('plaid_categories') | |
return ret | |
def is_posting_cleared(self, posting: Posting): | |
if not posting.meta: | |
return False | |
tid = posting.meta.get('plaid_transaction_id') | |
if tid is None: | |
return False | |
return True | |
@property | |
def name(self): | |
return 'plaid' | |
def load(spec, log_status): | |
return PlaidSource(log_status=log_status, **spec) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment