Created
December 4, 2019 06:21
-
-
Save onionhoney/f71f64a572e6ec537324562225d626cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
from pandas.errors import EmptyDataError | |
from collections import defaultdict | |
import random | |
import time | |
import datetime | |
import math | |
import requests as R | |
import json | |
from dateutil import parser as dateparser | |
import hashlib | |
import argparse | |
pd.set_option('display.max_columns', 100) | |
pd.set_option('display.max_colwidth', 100) | |
SLEEP = 2 # sleep time between api calls | |
def event_stats(c_events, show=True, save=False): | |
good_events = c_events[c_events["errors"].isnull()] | |
public_events = good_events[good_events["created"].notnull()] | |
bad_events = c_events[c_events["errors"].notnull()] | |
if save: | |
good_events.to_csv('crawl/valid_events.csv') | |
public_events.to_csv('crawl/public_events.csv') | |
if show: | |
print("public_events = {}, public_limited_events = {}, bad_events = {}" | |
.format(len(public_events), len(good_events), len(bad_events))) | |
rsvps = public_events["yes_rsvp_count"] | |
rsvps = rsvps[~np.isnan(rsvps)] | |
print("average yes-RSVPs in public_events = {}, quantile = \n{}".format( | |
rsvps.mean(), rsvps.quantile([.1, .25, .5, .75, .9]))) | |
event_sizes = public_events.groupby("group.id").size() | |
print("average events per group in 2017.7-2017.9 = {}, quantile = \n{}".format( | |
event_sizes.mean(), event_sizes.quantile([.1, .25, .5, .75, .9]))) | |
return public_events | |
def select_groups(c_events): | |
ev_count = c_events.groupby("group.id").agg( | |
{"yes_rsvp_count": ['mean', 'size']}) | |
ev_count = ev_count["yes_rsvp_count"] | |
# filter by average RSVP | |
ev_count = ev_count[(ev_count['mean'] <= 30) & (ev_count['mean'] >= 10)] | |
# filter by average # of events | |
ev_count = ev_count[(ev_count['size'] <= 30) & (ev_count['size'] >= 10)] | |
return ev_count.reset_index() | |
# public_events = event_stats(pd.read_csv('crawl/events.csv')) | |
#### Get all events ##### | |
# Get all events from creation to now, for designated groups and urlnames | |
# groups = pd.read_csv('data/groups.csv') | |
def generate_time_stamps(created_time, nepoch=4): | |
created_date = dateparser.parse(created_time) | |
year = created_date.year | |
month = created_date.month | |
res = [] | |
for _ in range(nepoch): | |
start_str = "{}-{:02d}-01T00:00:00.000".format(year, month) | |
#seen = False | |
for _ in range(3): | |
#if (year, month) in finished: | |
# seen = True | |
year = year if month < 12 else year + 1 | |
month = (month) % 12 + 1 | |
end_str = "{}-{:02d}-01T00:00:00.000".format(year, month) | |
#if not seen: | |
res.append((start_str, end_str)) | |
return res | |
def get_events_in_interval(groupid, timestart, timeend): | |
# assumption=capped at 200 | |
url = "https://api.meetup.com/{}/events".format(groupid) | |
params = {} | |
params["status"] = "past" | |
params["no_later_than"] = timeend | |
params["no_earlier_than"] = timestart | |
retry = 0 | |
while True: | |
try: | |
r = R.get(url, params=params) | |
res = r.json() | |
return pd.io.json.json_normalize(res) | |
except: | |
if retry >= 3: | |
print("failed request ", url, params) | |
return pd.DataFrame() | |
else: | |
retry += 1 | |
time.sleep(SLEEP * retry) | |
continue | |
def get_all_events_for_group(groupurlname, created_time): | |
df = pd.DataFrame() | |
for (start, end) in generate_time_stamps(created_time): | |
print("getting all events for ", (groupurlname, created_time, start, end)) | |
time.sleep(SLEEP) | |
df_curr = get_events_in_interval(groupurlname, timestart=start, timeend=end) | |
df = df.append(df_curr, sort=True) | |
return df | |
def crawl_full_events_impl(handle, groupids, urlnames, created_times, batch=20): | |
# always restart at n - 1 | |
try: | |
with open(handle, "r") as f: | |
df = pd.read_csv(f) | |
finished = set(df["group.id"].values) | |
except (EmptyDataError, FileNotFoundError): | |
df = pd.DataFrame() | |
finished = set() | |
# Make a 2d array of .... | |
# finished = defaultdict(set) | |
# for i in range(len(df)): | |
# group_urlname = (df.iloc[i]["group.urlname"]) | |
# created_time = int(df.iloc[i]["time"]) | |
# event_date = datetime.datetime.fromtimestamp(created_time / 1000.0) | |
# timestamp = ( event_date.year, event_date.month ) | |
# finished[group_urlname].add(timestamp) | |
# print(finished) | |
# create a mask | |
# use id to index because for some reason group urlnames can be renamed..... | |
mask = np.array([id_ not in finished for id_ in groupids]) | |
n_tot = len(urlnames) | |
params = np.array( list(zip(urlnames, created_times)) )[mask] | |
if len(params) == 0: | |
print("{}/{} groups processed. nothing to do".format(n_tot, n_tot) ) | |
return | |
batches = np.array_split(params, (len(params) + batch - 1) / batch) | |
n_processed = n_tot - len(params) | |
for batch in batches: | |
for (g, t) in batch: | |
df_curr = get_all_events_for_group(g, t) | |
df = df.append(df_curr, sort=True) | |
with open(handle, "w+") as f: | |
df.to_csv(f, index=False) | |
n_processed += len(batch) | |
print( "groups for which full events are processed: {} / {}".format(n_processed, n_tot)) | |
#print(df.tail(1)) | |
def crawl_full_events(groups, dest=None, batch=20): | |
crawl_full_events_impl( | |
dest, groups["group.id"].values , groups["urlname"].values , groups["created"].values , batch=batch) | |
### Get all Attendees ### | |
# Get all attendees for the (group_urlname, event_id) tuples | |
def get_attendees_for_event(group_urlname, event_id): | |
# assumption=capped at 200 | |
url = "https://api.meetup.com/{}/events/{}/rsvps".format( | |
group_urlname, event_id) | |
params = {} | |
params["fields"] = ",".join(["attendance_status", ]) | |
retry = 0 | |
while True: | |
try: | |
r = R.get(url, params=params) | |
res = r.json() | |
return pd.io.json.json_normalize(res) | |
except: | |
if retry >= 3: | |
print("failed request ", url, params) | |
return pd.DataFrame() | |
else: | |
retry += 1 | |
time.sleep(SLEEP * retry) | |
continue | |
def crawl_attendees_impl(handle, group_urlnames, event_ids, batch=20): | |
# always restart at n - 1 | |
try: | |
with open(handle, "r") as f: | |
df = pd.read_csv(f) | |
finished = set(df["event.id"].values) | |
except (EmptyDataError, FileNotFoundError): | |
df = pd.DataFrame() | |
finished = set() | |
mask = np.array([event_id not in finished for event_id in event_ids]) | |
n_tot = len(group_urlnames) | |
params = np.array( list(zip(group_urlnames, event_ids)) )[mask] | |
if len(params) == 0: | |
print("{}/{} events processed. nothing to do".format(n_tot, n_tot) ) | |
return | |
batches = np.array_split(params, (len(params) + batch - 1) / batch) | |
n_processed = n_tot - len(params) | |
print("attendee processed: {} / {}".format(n_processed, n_tot)) | |
for batch in batches: | |
for (u, e) in batch: | |
print("crawl attendees for ", (u, e)) | |
df_curr = get_attendees_for_event(u, e) | |
df = df.append(df_curr, sort=True) | |
time.sleep(SLEEP) | |
with open(handle, "w+") as f: | |
df.to_csv(f, index=False) | |
n_processed += len(batch) | |
print("attendee processed: {} / {}".format(n_processed, n_tot)) | |
#print(df.tail(1)) | |
def crawl_attendees(events, dest, batch=20): | |
crawl_attendees_impl( | |
dest, events["group.urlname"], events["id"], batch=batch) | |
#### Crawler Logic ##### | |
def event_attendee_crawler(rep_groups, event_dest, attendee_dest): | |
# we crawl the full_events for each group | |
crawl_full_events(rep_groups, dest=event_dest) | |
full_events = pd.read_csv(event_dest) | |
crawl_attendees(full_events, dest=attendee_dest) | |
#def run_event_attendee_crawler(first_n=20, event_src='crawl/events.csv', event_dest='crawl/full_events.csv', attendee_dest='crawl/one_yr_attendees.csv'): | |
# # Select events based on crawl/events.csv, which contains all events of all groups during 2017.7-2017.10 | |
# c_events = pd.read_csv(event_src) | |
# public_events = event_stats(c_events, show=False) | |
# rep_groups = select_groups(public_events) | |
# | |
# rep_groups = rep_groups.set_index("group.id").join( | |
# groups.set_index('group_id')).reset_index() | |
# #rep_events = rep_groups.set_index("group.id").join(public_events.set_index('group.id')).reset_index() | |
# event_attendee_crawler(rep_groups.head( | |
# first_n), event_dest=event_dest, attendee_dest=attendee_dest) | |
############# App Logic #################### | |
## We will be given the rep_groups, and a 'mod' number, and nothing more. | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description="Crawl all events one year from creation for given groupIDs") | |
parser.add_argument('filename') | |
parser.add_argument('n_channels', type=int) | |
parser.add_argument('which_channel', type=int) | |
args = parser.parse_args() | |
rep_groups = pd.read_csv(args.filename) | |
group_ids = rep_groups["group.id"].values.astype("int") | |
which_channel, n_channels = args.which_channel, args.n_channels | |
assert( 0 <= args.which_channel < args.n_channels ) # , "channel specified is larger than n_channels") | |
m = hashlib.sha256() | |
digest = hashlib.sha256() | |
for s in group_ids: | |
digest.update( str(s).encode() ) | |
str_id = digest.hexdigest()[-6:] | |
print("digest for file {} = {}".format(args.filename, str_id) ) | |
print("running {} out of {} channels".format(which_channel, n_channels)) | |
group_hashed = np.array([ int(hashlib.sha1(str(s).encode() ).hexdigest()[-5:], 16) for s in group_ids ]) | |
rep_groups = rep_groups[group_ids % args.n_channels == args.which_channel] | |
print("rep groups = {}".format(rep_groups.head(10)[ ["group.id", "group_name", "urlname"] ] )) | |
event_attendee_crawler(rep_groups, | |
event_dest="crawl/events_{}_{}_{}.csv".format(str_id, which_channel, n_channels), | |
attendee_dest="crawl/attendees_{}_{}_{}.csv".format(str_id, which_channel, n_channels) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment