Skip to content

Instantly share code, notes, and snippets.

@onionhoney
Created December 4, 2019 06:21
Show Gist options
  • Save onionhoney/f71f64a572e6ec537324562225d626cd to your computer and use it in GitHub Desktop.
Save onionhoney/f71f64a572e6ec537324562225d626cd to your computer and use it in GitHub Desktop.
import numpy as np
import pandas as pd
from pandas.errors import EmptyDataError
from collections import defaultdict
import random
import time
import datetime
import math
import requests as R
import json
from dateutil import parser as dateparser
import hashlib
import argparse
pd.set_option('display.max_columns', 100)
pd.set_option('display.max_colwidth', 100)
SLEEP = 2 # sleep time between api calls
def event_stats(c_events, show=True, save=False):
good_events = c_events[c_events["errors"].isnull()]
public_events = good_events[good_events["created"].notnull()]
bad_events = c_events[c_events["errors"].notnull()]
if save:
good_events.to_csv('crawl/valid_events.csv')
public_events.to_csv('crawl/public_events.csv')
if show:
print("public_events = {}, public_limited_events = {}, bad_events = {}"
.format(len(public_events), len(good_events), len(bad_events)))
rsvps = public_events["yes_rsvp_count"]
rsvps = rsvps[~np.isnan(rsvps)]
print("average yes-RSVPs in public_events = {}, quantile = \n{}".format(
rsvps.mean(), rsvps.quantile([.1, .25, .5, .75, .9])))
event_sizes = public_events.groupby("group.id").size()
print("average events per group in 2017.7-2017.9 = {}, quantile = \n{}".format(
event_sizes.mean(), event_sizes.quantile([.1, .25, .5, .75, .9])))
return public_events
def select_groups(c_events):
ev_count = c_events.groupby("group.id").agg(
{"yes_rsvp_count": ['mean', 'size']})
ev_count = ev_count["yes_rsvp_count"]
# filter by average RSVP
ev_count = ev_count[(ev_count['mean'] <= 30) & (ev_count['mean'] >= 10)]
# filter by average # of events
ev_count = ev_count[(ev_count['size'] <= 30) & (ev_count['size'] >= 10)]
return ev_count.reset_index()
# public_events = event_stats(pd.read_csv('crawl/events.csv'))
#### Get all events #####
# Get all events from creation to now, for designated groups and urlnames
# groups = pd.read_csv('data/groups.csv')
def generate_time_stamps(created_time, nepoch=4):
created_date = dateparser.parse(created_time)
year = created_date.year
month = created_date.month
res = []
for _ in range(nepoch):
start_str = "{}-{:02d}-01T00:00:00.000".format(year, month)
#seen = False
for _ in range(3):
#if (year, month) in finished:
# seen = True
year = year if month < 12 else year + 1
month = (month) % 12 + 1
end_str = "{}-{:02d}-01T00:00:00.000".format(year, month)
#if not seen:
res.append((start_str, end_str))
return res
def get_events_in_interval(groupid, timestart, timeend):
# assumption=capped at 200
url = "https://api.meetup.com/{}/events".format(groupid)
params = {}
params["status"] = "past"
params["no_later_than"] = timeend
params["no_earlier_than"] = timestart
retry = 0
while True:
try:
r = R.get(url, params=params)
res = r.json()
return pd.io.json.json_normalize(res)
except:
if retry >= 3:
print("failed request ", url, params)
return pd.DataFrame()
else:
retry += 1
time.sleep(SLEEP * retry)
continue
def get_all_events_for_group(groupurlname, created_time):
df = pd.DataFrame()
for (start, end) in generate_time_stamps(created_time):
print("getting all events for ", (groupurlname, created_time, start, end))
time.sleep(SLEEP)
df_curr = get_events_in_interval(groupurlname, timestart=start, timeend=end)
df = df.append(df_curr, sort=True)
return df
def crawl_full_events_impl(handle, groupids, urlnames, created_times, batch=20):
# always restart at n - 1
try:
with open(handle, "r") as f:
df = pd.read_csv(f)
finished = set(df["group.id"].values)
except (EmptyDataError, FileNotFoundError):
df = pd.DataFrame()
finished = set()
# Make a 2d array of ....
# finished = defaultdict(set)
# for i in range(len(df)):
# group_urlname = (df.iloc[i]["group.urlname"])
# created_time = int(df.iloc[i]["time"])
# event_date = datetime.datetime.fromtimestamp(created_time / 1000.0)
# timestamp = ( event_date.year, event_date.month )
# finished[group_urlname].add(timestamp)
# print(finished)
# create a mask
# use id to index because for some reason group urlnames can be renamed.....
mask = np.array([id_ not in finished for id_ in groupids])
n_tot = len(urlnames)
params = np.array( list(zip(urlnames, created_times)) )[mask]
if len(params) == 0:
print("{}/{} groups processed. nothing to do".format(n_tot, n_tot) )
return
batches = np.array_split(params, (len(params) + batch - 1) / batch)
n_processed = n_tot - len(params)
for batch in batches:
for (g, t) in batch:
df_curr = get_all_events_for_group(g, t)
df = df.append(df_curr, sort=True)
with open(handle, "w+") as f:
df.to_csv(f, index=False)
n_processed += len(batch)
print( "groups for which full events are processed: {} / {}".format(n_processed, n_tot))
#print(df.tail(1))
def crawl_full_events(groups, dest=None, batch=20):
crawl_full_events_impl(
dest, groups["group.id"].values , groups["urlname"].values , groups["created"].values , batch=batch)
### Get all Attendees ###
# Get all attendees for the (group_urlname, event_id) tuples
def get_attendees_for_event(group_urlname, event_id):
# assumption=capped at 200
url = "https://api.meetup.com/{}/events/{}/rsvps".format(
group_urlname, event_id)
params = {}
params["fields"] = ",".join(["attendance_status", ])
retry = 0
while True:
try:
r = R.get(url, params=params)
res = r.json()
return pd.io.json.json_normalize(res)
except:
if retry >= 3:
print("failed request ", url, params)
return pd.DataFrame()
else:
retry += 1
time.sleep(SLEEP * retry)
continue
def crawl_attendees_impl(handle, group_urlnames, event_ids, batch=20):
# always restart at n - 1
try:
with open(handle, "r") as f:
df = pd.read_csv(f)
finished = set(df["event.id"].values)
except (EmptyDataError, FileNotFoundError):
df = pd.DataFrame()
finished = set()
mask = np.array([event_id not in finished for event_id in event_ids])
n_tot = len(group_urlnames)
params = np.array( list(zip(group_urlnames, event_ids)) )[mask]
if len(params) == 0:
print("{}/{} events processed. nothing to do".format(n_tot, n_tot) )
return
batches = np.array_split(params, (len(params) + batch - 1) / batch)
n_processed = n_tot - len(params)
print("attendee processed: {} / {}".format(n_processed, n_tot))
for batch in batches:
for (u, e) in batch:
print("crawl attendees for ", (u, e))
df_curr = get_attendees_for_event(u, e)
df = df.append(df_curr, sort=True)
time.sleep(SLEEP)
with open(handle, "w+") as f:
df.to_csv(f, index=False)
n_processed += len(batch)
print("attendee processed: {} / {}".format(n_processed, n_tot))
#print(df.tail(1))
def crawl_attendees(events, dest, batch=20):
crawl_attendees_impl(
dest, events["group.urlname"], events["id"], batch=batch)
#### Crawler Logic #####
def event_attendee_crawler(rep_groups, event_dest, attendee_dest):
# we crawl the full_events for each group
crawl_full_events(rep_groups, dest=event_dest)
full_events = pd.read_csv(event_dest)
crawl_attendees(full_events, dest=attendee_dest)
#def run_event_attendee_crawler(first_n=20, event_src='crawl/events.csv', event_dest='crawl/full_events.csv', attendee_dest='crawl/one_yr_attendees.csv'):
# # Select events based on crawl/events.csv, which contains all events of all groups during 2017.7-2017.10
# c_events = pd.read_csv(event_src)
# public_events = event_stats(c_events, show=False)
# rep_groups = select_groups(public_events)
#
# rep_groups = rep_groups.set_index("group.id").join(
# groups.set_index('group_id')).reset_index()
# #rep_events = rep_groups.set_index("group.id").join(public_events.set_index('group.id')).reset_index()
# event_attendee_crawler(rep_groups.head(
# first_n), event_dest=event_dest, attendee_dest=attendee_dest)
############# App Logic ####################
## We will be given the rep_groups, and a 'mod' number, and nothing more.
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Crawl all events one year from creation for given groupIDs")
parser.add_argument('filename')
parser.add_argument('n_channels', type=int)
parser.add_argument('which_channel', type=int)
args = parser.parse_args()
rep_groups = pd.read_csv(args.filename)
group_ids = rep_groups["group.id"].values.astype("int")
which_channel, n_channels = args.which_channel, args.n_channels
assert( 0 <= args.which_channel < args.n_channels ) # , "channel specified is larger than n_channels")
m = hashlib.sha256()
digest = hashlib.sha256()
for s in group_ids:
digest.update( str(s).encode() )
str_id = digest.hexdigest()[-6:]
print("digest for file {} = {}".format(args.filename, str_id) )
print("running {} out of {} channels".format(which_channel, n_channels))
group_hashed = np.array([ int(hashlib.sha1(str(s).encode() ).hexdigest()[-5:], 16) for s in group_ids ])
rep_groups = rep_groups[group_ids % args.n_channels == args.which_channel]
print("rep groups = {}".format(rep_groups.head(10)[ ["group.id", "group_name", "urlname"] ] ))
event_attendee_crawler(rep_groups,
event_dest="crawl/events_{}_{}_{}.csv".format(str_id, which_channel, n_channels),
attendee_dest="crawl/attendees_{}_{}_{}.csv".format(str_id, which_channel, n_channels)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment