Skip to content

Instantly share code, notes, and snippets.

@gerhc
Last active December 2, 2015 15:20
Show Gist options
  • Save gerhc/ebe0b6bc51821bd8e6d5 to your computer and use it in GitHub Desktop.
Save gerhc/ebe0b6bc51821bd8e6d5 to your computer and use it in GitHub Desktop.
fetch and clean descriptions
import hashlib
import justext
import re
import redis
import requests
from queue import Queue as ThreadQueue
from threading import Thread
from multiprocessing import Process, Queue
class DescriptionThreadWorker(Thread):
def __init__(self, process_num, thread_num, queue, redis_sock):
self.queue = queue
self.retries = 3
self.process_num = process_num
self.thread_num = thread_num
self.r = redis.Redis(unix_socket_path=redis_sock)
self.tokenizer = re.compile(r'\w+', flags=re.UNICODE | re.MULTILINE | re.DOTALL | re.I)
super(DescriptionThreadWorker, self).__init__()
def _clean(self, dirty):
if dirty is None or len(dirty) == 0:
return []
paragraphs = justext.justext(dirty, justext.get_stoplist('Spanish'))
def valid(paragraph):
if not paragraph.is_boilerplate:
return True
return len(self.tokenizer.findall(paragraph.text)) > 4
def to_text(paragraph):
return paragraph.text
return [self.tokenizer.findall(to_text(paragraph)) for paragraph in paragraphs if valid(paragraph)]
def _fetch_description(self, item_id):
retry = 0
while retry < self.retries:
try:
retry += 1
url = 'https://api.xxxxxxxxx.com/items/{}/description'.format(item_id)
r = requests.get(url)
if r.ok:
json_rsp = r.json()
return self._clean(json_rsp['text']), self._clean(json_rsp['plain_text'])
except Exception as e:
pass
return [], []
def _hash(self, val):
return hashlib.sha224(val.encode('utf-8')).hexdigest()
def _save(self, item_id, descriptions):
if len(descriptions) == 0:
return
site_id = item_id[:3]
try:
mset_map = {}
for description in descriptions:
mset_map[site_id + self._hash(''.join(description))] = description
self.r.mset(mset_map)
except:
# ATM we don't care if it fails to save
pass
def run(self):
print('Starting Thread {}-{}'.format(self.process_num, self.thread_num))
total_items = 0
for item_id in iter(self.queue.get, None):
try:
if item_id == 'TIME_TO_DIE':
return
if item_id is None or len(item_id) == 0:
continue
print('Fetching {}'.format(item_id))
text, plain_text = self._fetch_description(item_id)
descriptions = text + plain_text
self._save(item_id, descriptions)
total_items += 1
if total_items % 100 == 0:
print('P-{}-{} processed: {}'.format(self.process_num, self.thread_num, total_items))
except Exception as e:
print('Error fetching: {}'.format(e))
class DescriptionProcessWorker(Process):
def __init__(self, process_num, q, redis_sock, num_threads=100):
self.q = q
self.process_num = process_num
self.redis_sock = redis_sock
self.num_threads = num_threads
super(DescriptionProcessWorker, self).__init__()
def run(self):
queue = ThreadQueue()
threads = []
for i in range(self.num_threads):
t = DescriptionThreadWorker(self.process_num, i, queue, self.redis_sock)
threads.append(t)
t.start()
for item_id in iter(self.q.get, None):
try:
if item_id == 'TIME_TO_DIE':
# Kill all
for _ in range(self.num_threads):
queue.put(item_id)
return
if item_id is None or len(item_id) == 0:
continue
queue.put(item_id)
except Exception as e:
print('Error delegating: {}'.format(e))
def read_item_ids(filename):
with open(filename, 'r') as f:
for line in f:
yield line.strip()
if __name__ == '__main__':
request_queue = Queue()
redis_sock = '/tmp/redis.sock'
jobs = []
workers = 4
for i in range(workers):
p = DescriptionProcessWorker(i, request_queue, redis_sock)
jobs.append(p)
p.start()
for item_id in read_item_ids('items.csv'):
request_queue.put(item_id)
for _ in range(workers):
request_queue.put('TIME_TO_DIE')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment