%▶ ./member_card/main.py --email=jeff.hogan1@gmail.com --membership-sku=SQ3671268 -q
[I 220117 11:10:10 main:160] Stats: len(subscriptions)=2494 / len(active_subscriptions)=2030 / len(inactive_subscriptions)=464
[I 220117 11:10:10 main:168] Subscription found for jeff.hogan1@gmail.com:
[<AnnualSubscription jeff.hogan1@gmail.com 2022-01-07T19:56:44.782000+00:00 fulfillment_status=PENDING (active=True) ... >, <AnnualSubscription jeff.hogan1@gmail.com 2021-07-15T20:23:31.635000+00:00 fulfillment_status=FULFILLED (active=True) ... >]
<print membership card here>
Created
January 17, 2022 17:13
-
-
Save jeffwecan/50fc45bb53b2b5cba7d2ce357dea60a6 to your computer and use it in GitHub Desktop.
member_card/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import logging | |
import os | |
from collections import defaultdict | |
from datetime import datetime, timedelta | |
from typing import List | |
from zoneinfo import ZoneInfo | |
import logzero | |
from dateutil.parser import parse | |
from logzero import logger | |
from squarespace import Squarespace | |
class AnnualSubscription(object): | |
one_year_ago = datetime.now(tz=ZoneInfo("UTC")) - timedelta(days=366) | |
def __init__(self, order): | |
self._order = order | |
self.created_on = parse(self._order["createdOn"]) | |
def __getattr__(self, key): | |
if value := self._order.get(key): | |
return value | |
# convert key from snake to camel case | |
components = key.split("_") | |
# via: https://stackoverflow.com/a/19053800 | |
# We capitalize the first letter of each component except the first one | |
# with the 'title' method and join them together. | |
camelKey = components[0] + "".join(x.title() for x in components[1:]) | |
if value := self._order.get(camelKey): | |
return value | |
raise AttributeError(f"no {key=} in <AnnualSubscription _order... >") | |
def __repr__(self): | |
return f"<AnnualSubscription {self.customer_email} {self.created_on.isoformat()} fulfillment_status={self.fulfillment_status} (active={self.is_active}) ... >" | |
@property | |
def is_canceled(self): | |
return self.fulfillment_status == "CANCELED" | |
@property | |
def is_active(self): | |
if self.is_canceled: | |
return False | |
if self.created_on <= self.one_year_ago: | |
return False | |
return True | |
# def get_profile_by_email(squarespace, email): | |
# resp = squarespace.get_profile_by_email( | |
# email=email, | |
# ) | |
# logger.debug(f"get_profile_by_email(email={email}) => {resp}") | |
# print(json.dumps(resp)) | |
def load_membership_orders_datetime_window( | |
squarespace, modified_before=None, modified_after=None, fulfillment_status=None | |
) -> List[AnnualSubscription]: | |
order_params = dict( | |
modifiedAfter=modified_after, | |
modifiedBefore=modified_before, | |
fulfillmentStatus=fulfillment_status, | |
) | |
return load_all_membership_orders( | |
squarespace=squarespace, order_params=order_params | |
) | |
def load_all_membership_orders(squarespace, order_params) -> List[AnnualSubscription]: | |
# remove "None"s | |
order_params = {k: v for k, v in order_params.items() if v is not None} | |
all_orders = [] | |
subscriptions = [] | |
logger.debug(f"Grabbing all orders with {order_params=}") | |
for order in squarespace.all_orders(**order_params): | |
all_orders.append(order) | |
order_product_names = [i["productName"] for i in order["lineItems"]] | |
if any(i["sku"] == membership_sku for i in order["lineItems"]): | |
logger.debug( | |
f"{order['id']=} (#{order['orderNumber']}) includes {membership_sku=} in {order_product_names=}" | |
) | |
subscriptions.append(AnnualSubscription(order)) | |
continue | |
logger.debug( | |
f"#{order['orderNumber']} has no {membership_sku=} in {order_product_names=}" | |
) | |
logger.debug(f"{len(all_orders)=} loaded with {len(subscriptions)=} and whatnot") | |
return subscriptions | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-q", | |
"--quiet", | |
help="modify output verbosity", | |
action="store_true", | |
) | |
parser.add_argument( | |
"-e", | |
"--email", | |
default="jeff.hogan1@gmail.com", | |
) | |
parser.add_argument( | |
"-m", | |
"--membership-sku", | |
default="SQ3671268", | |
) | |
parser.add_argument( | |
"-f", | |
"--fulfillment-status", | |
choices=["PENDING", "FULFILLED", "CANCELED", None], | |
default=None, | |
) | |
args = parser.parse_args() | |
if args.quiet: | |
logzero.loglevel(logging.INFO) | |
email = args.email | |
membership_sku = args.membership_sku | |
squarespace = Squarespace(api_key=os.environ["SQUARESPACE_API_KEY"]) | |
# get_profile_by_email( | |
# squarespace=squarespace, | |
# email=args.email, | |
# ) | |
modified_before_dt = datetime.now(tz=ZoneInfo("UTC")) | |
modified_after_dt = modified_before_dt - timedelta(days=30) | |
modified_after = modified_after_dt.strftime("%Y-%m-%dT%H:%M:%SZ") | |
modified_before = modified_before_dt.strftime("%Y-%m-%dT%H:%M:%SZ") | |
# subscriptions = load_membership_orders_datetime_window( | |
# squarespace=squarespace, | |
# modified_after=modified_after, | |
# modified_before=modified_before, | |
# fulfillment_status=args.fulfillment_status, | |
# ) | |
subscriptions = load_all_membership_orders( | |
squarespace=squarespace, | |
order_params=dict(fulfillment_status=args.fulfillment_status), | |
) | |
active_subscriptions = [s for s in subscriptions if s.is_active] | |
inactive_subscriptions = [s for s in subscriptions if not s.is_active] | |
logger.info( | |
f"Stats: {len(subscriptions)=} / {len(active_subscriptions)=} / {len(inactive_subscriptions)=}" | |
) | |
subscriptions_by_email = defaultdict(list) | |
for subscription in subscriptions: | |
subscriptions_by_email[subscription.customer_email].append(subscription) | |
if matching_subscriptions := subscriptions_by_email.get(email): | |
logger.info(f"Subscription found for {email}:\n{matching_subscriptions}") | |
if any(s.is_active for s in matching_subscriptions): | |
print("<print membership card here>") | |
exit(0) | |
logger.warning("<no active subscriptions / matching subscriptions ever / access denied>") | |
exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from time import strftime, gmtime | |
import requests | |
__VERSION__ = "0.0.4" | |
api_baseurl = "https://api.squarespace.com" | |
api_version = "1.0" | |
class SquarespaceError(Exception): | |
pass | |
class Squarespace(object): | |
"""Represents orders from a particular squarespace store. | |
:api_key: | |
Your Squarespace API key. Required. | |
:api_baseurl: | |
The baseuri for the Squarespace API. You shouldn't need to change | |
this. | |
:api_version: | |
The version of the Squarespace API to target. If you change this | |
without making any code changes you may break this library. | |
""" | |
def __init__( | |
self, | |
api_key, | |
api_baseurl=api_baseurl, | |
api_version=api_version, | |
): | |
self.api_key = api_key | |
self.api_baseurl = api_baseurl | |
self.api_version = api_version | |
# Setup our HTTP session | |
self.http = requests.Session() | |
self.http.headers.update({"Authorization": "Bearer " + self.api_key}) | |
self.useragent = "Squarespace python API v%s by Zach White." % __VERSION__ | |
self._next_page = None | |
@property | |
def useragent(self): | |
"""Get the current useragent.""" | |
return self._useragent | |
@useragent.setter | |
def useragent(self, agent_string): | |
"""Set the User-Agent that will be used.""" | |
self._useragent = agent_string | |
self.http.headers.update({"User-Agent": self._useragent}) | |
def post(self, path, object): | |
"""Post an `object` to the Squarespace API. | |
:object: | |
A dictionary containing JSON compatible key/value combinations. | |
""" | |
url = "%s/%s/%s" % (self.api_baseurl, self.api_version, path) | |
logging.debug("url:%s object:%s", url, object) | |
return self.process_request(self.http.post(url, json=object)) | |
def get(self, path, args=None): | |
"""Retrieve an endpoint from the Squarespace API.""" | |
if not args: | |
args = {} | |
url = "%s/%s/%s" % (self.api_baseurl, self.api_version, path) | |
logging.debug("url:%s args:%s", url, args) | |
return self.process_request(self.http.get(url, params=args)) | |
def process_request(self, request): | |
"""Process a request and return the data.""" | |
if request.status_code in [200, 201]: | |
return request.json() | |
elif request.status_code == 204: | |
return True | |
elif request.status_code == 401: | |
raise ValueError("The API key %s is not valid.", self.api_key) | |
elif 200 < request.status_code < 299: | |
logging.warning("Squarespace success response %s:", request.status_code) | |
logging.warning(request.text) | |
raise NotImplementedError( | |
"Squarespace sent us a success response we're not prepared for!" | |
) | |
logging.error("Squarespace error response %s:", request.status_code) | |
logging.error("URL: %s", request.url) | |
logging.error(request.text) | |
if 400 <= request.status_code < 499: | |
raise RuntimeError("Squarespace thinks this request is bogus") | |
if 500 <= request.status_code < 599: | |
raise RuntimeError("Squarespace is having problems, try later.") | |
raise RuntimeError("An unknown error occurred fetching your request.") | |
def get_profile_by_email(self, email): | |
return self.get(path="profiles/", args=dict(filter=f"email,{email}")) | |
def order(self, order_id=None, order_number=None): | |
"""Retrieve a single order.""" | |
if order_id: | |
return self.get("commerce/orders/" + order_id) | |
elif order_number: | |
for order in self.all_orders(): | |
if order["orderNumber"] == order_number: | |
return order | |
else: | |
raise SquarespaceError( | |
"You must specify one of `order_id` or `order_number`" | |
) | |
def orders(self, **args): | |
"""Retrieve the 20 latest orders, by modification date.""" | |
uri = "commerce/orders" | |
result = self.get(uri, args) | |
self._next_page = ( | |
result["pagination"]["nextPageCursor"] | |
if "nextPageCursor" in result["pagination"] | |
else None | |
) | |
return result["result"] | |
def next_page(self): | |
"""Retrieve the next 20 orders, or None if there are no more orders.""" | |
return self.orders(cursor=self._next_page) if self._next_page else None | |
def all_orders(self, **args): | |
orders = self.orders(**args) | |
for order in orders: | |
yield order | |
count = 0 | |
while self._next_page: | |
count += 1 | |
for order in self.next_page(): | |
yield order | |
def fulfill( | |
self, | |
order_id, | |
tracking_number=None, | |
carrier_name=None, | |
service_name=None, | |
tracking_baseurl=None, | |
send_notification=True, | |
): | |
"""Mark an order as shipped. | |
:param tracking_number: | |
The shipping carrier's tracking number. | |
:param carrier_name: | |
The name of the shipping carrier, EG 'USPS'. | |
:param service_name: | |
The shipping service used, EG "First Class Package" | |
""" | |
if not tracking_baseurl: | |
tracking_baseurl = self.tracking_baseurl | |
uri = "commerce/orders/%s/fulfillments" % order_id | |
fulfillment = {"shouldSendNotification": send_notification, "shipments": []} | |
if tracking_number and carrier_name and service_name: | |
fulfillment["shipments"] = [ | |
{ | |
"shipDate": strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()), | |
"carrierName": carrier_name, | |
"service": service_name, | |
"trackingNumber": tracking_number, | |
"trackingUrl": tracking_baseurl + tracking_number, | |
} | |
] | |
return self.post(uri, fulfillment) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment