Last active
December 19, 2023 04:37
-
-
Save edxmorgan/881d3a2f2df1ceaeaf3a1041859b2972 to your computer and use it in GitHub Desktop.
Proxy integrated into selenium in scrapy middleware
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Define here the models for your spider middleware | |
# | |
# See documentation in: | |
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html | |
import sys | |
import time | |
import logging | |
from scrapy import signals | |
from scrapy.mail import MailSender | |
from scrapy.utils.project import get_project_settings | |
# useful for handling different item types with a single interface | |
from itemadapter import is_item, ItemAdapter | |
from scrapy.http import HtmlResponse | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.common.exceptions import TimeoutException | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.common.by import By | |
from shutil import which | |
import undetected_chromedriver as uc | |
if not (sys.platform == "linux" or sys.platform == "linux2"): | |
uc.TARGET_VERSION = 90 | |
from datetime import datetime | |
from selenium.webdriver.common.keys import Keys | |
from pyvirtualdisplay import Display | |
from scrapy.downloadermiddlewares.retry import RetryMiddleware | |
from scrapy.utils.response import response_status_message | |
import os | |
import codecs | |
from selenium.webdriver.common.action_chains import ActionChains | |
settings = get_project_settings() | |
class CouponsRetryMiddleware(RetryMiddleware): | |
def process_response(self, request, response, spider): | |
if request.meta.get('dont_retry', False): | |
return response | |
if response.status in self.retry_http_codes: | |
reason = response_status_message(response.status) | |
return self._retry(request, reason, spider) or response | |
if (response.status == 200) and (request.meta.get('myoferToken')) and (not any(item for item in response.meta["cookieJar"] if item["name"] == "token")): | |
reason = "Missing token cookie" | |
spider.logger.info('Spider %s retrying' % reason) | |
return self._retry(request,reason, spider) or response | |
return response | |
class CouponsSpiderMiddleware: | |
# Not all methods need to be defined. If a method is not defined, | |
# scrapy acts as if the spider middleware does not modify the | |
# passed objects. | |
@classmethod | |
def from_crawler(cls, crawler): | |
# This method is used by Scrapy to create your spiders. | |
s = cls() | |
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened) | |
return s | |
def process_spider_input(self, response, spider): | |
# Called for each response that goes through the spider | |
# middleware and into the spider. | |
# Should return None or raise an exception. | |
return None | |
def process_spider_output(self, response, result, spider): | |
# Called with the results returned from the Spider, after | |
# it has processed the response. | |
# Must return an iterable of Request, or item objects. | |
for i in result: | |
yield i | |
def process_spider_exception(self, response, exception, spider): | |
# Called when a spider or process_spider_input() method | |
# (from other spider middleware) raises an exception. | |
# Should return either None or an iterable of Request or item objects. | |
pass | |
def process_start_requests(self, start_requests, spider): | |
# Called with the start requests of the spider, and works | |
# similarly to the process_spider_output() method, except | |
# that it doesn’t have a response associated. | |
# Must return only requests (not items). | |
for r in start_requests: | |
yield r | |
def spider_opened(self, spider): | |
spider.logger.info('Spider opened: %s' % spider.name) | |
class CouponsDownloaderMiddleware: | |
# Not all methods need to be defined. If a method is not defined, | |
# scrapy acts as if the downloader middleware does not modify the | |
# passed objects. | |
def __init__(self): | |
mailfrom=settings.get("MAIL_ADDRESS") | |
smtpport=settings.get("MAIL_PORT") | |
smtpuser=settings.get("MAIL_USER") | |
smtppass=settings.get("MAIL_PASSWORD") | |
smtphost=settings.get("SMTP_HOST") | |
self.mailer = MailSender(mailfrom=mailfrom,smtphost=smtphost, | |
smtpport=smtpport,smtpuser=smtpuser,smtppass=smtppass) | |
@classmethod | |
def from_crawler(cls, crawler): | |
# This method is used by Scrapy to create your spiders. | |
s = cls() | |
s.cookie = "" | |
if sys.platform == "linux" or sys.platform == "linux2": | |
s.display = Display(visible=0, size=(800, 600)) | |
s.display.start() | |
logging.info("Virtual Display Initiated") | |
chrome_options = Options() | |
if crawler.spider.undetectable: | |
s.driver = uc.Chrome() | |
if crawler.spider.proxy: | |
proxyauth_plugin_path = s.create_proxyauth_extension( | |
proxy_host=crawler.settings.get('SELENIUM_PROXY_HOST'), | |
proxy_port=crawler.settings.get('SELENIUM_PROXY_PORT'), | |
proxy_username=f"lum-customer-{s.user}-ip-{s.ip}-zone-{s.zone}", | |
proxy_password=crawler.settings.get('SELENIUM_PROXY_PASSWORD'), | |
scheme='http') | |
options = uc.ChromeOptions() | |
options.add_extension(proxyauth_plugin_path) | |
s.driver = uc.Chrome(options=options) | |
else: | |
# driver_location = "/usr/bin/chromedriver" | |
driver_location = which('chromedriver') | |
# binary_location = "/usr/bin/google-chrome" | |
userAgent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.56 Safari/537.36" | |
# chrome_options.binary_location = binary_location | |
chrome_options.add_argument(f'user-agent={userAgent}') | |
chrome_options.add_argument("--ignore-certificate-errors") | |
chrome_options.add_argument("--ignore-ssl-errors") | |
chrome_options.add_argument("--headless" ) | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
s.driver = webdriver.Chrome(executable_path=driver_location,chrome_options=chrome_options) # your chosen driver | |
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened) | |
crawler.signals.connect(s.spider_closed, signal=signals.spider_closed) | |
return s | |
def create_proxyauth_extension(self, | |
proxy_host, | |
proxy_port, | |
proxy_username, | |
proxy_password, | |
scheme='http', | |
plugin_path=None): | |
"""Proxy Auth Extension | |
args: | |
proxy_host (str): domain or ip address, ie proxy.domain.com | |
proxy_port (int): port | |
proxy_username (str): auth username | |
proxy_password (str): auth password | |
kwargs: | |
scheme (str): proxy scheme, default http | |
plugin_path (str): absolute path of the extension | |
return str -> plugin_path | |
""" | |
if plugin_path is None: | |
file = './chrome_proxy_helper' | |
if not os.path.exists(file): | |
os.mkdir(file) | |
plugin_path = file + '/%s_%s@%s_%s.zip' % ( | |
proxy_username, proxy_password, proxy_host, proxy_port) | |
manifest_json = """ | |
{ | |
"version": "1.0.0", | |
"manifest_version": 2, | |
"name": "Chrome Proxy", | |
"permissions": [ | |
"proxy", | |
"tabs", | |
"unlimitedStorage", | |
"storage", | |
"<all_urls>", | |
"webRequest", | |
"webRequestBlocking" | |
], | |
"background": { | |
"scripts": ["background.js"] | |
}, | |
"minimum_chrome_version":"22.0.0" | |
} | |
""" | |
background_js = string.Template(""" | |
var config = { | |
mode: "fixed_servers", | |
rules: { | |
singleProxy: { | |
scheme: "${scheme}", | |
host: "${host}", | |
port: parseInt(${port}) | |
}, | |
bypassList: ["foobar.com"] | |
} | |
}; | |
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); | |
function callbackFn(details) { | |
return { | |
authCredentials: { | |
username: "${username}", | |
password: "${password}" | |
} | |
}; | |
} | |
chrome.webRequest.onAuthRequired.addListener( | |
callbackFn, | |
{urls: ["<all_urls>"]}, | |
['blocking'] | |
); | |
""").substitute( | |
host=proxy_host, | |
port=proxy_port, | |
username=proxy_username, | |
password=proxy_password, | |
scheme=scheme, | |
) | |
with zipfile.ZipFile(plugin_path, 'w') as zp: | |
zp.writestr("manifest.json", manifest_json) | |
zp.writestr("background.js", background_js) | |
return plugin_path | |
def popElement(self,interactElement): | |
try: | |
element = WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.ID, interactElement))) | |
self.driver.execute_script("arguments[0].click();", element) | |
except Exception as ex: | |
logging.error(ex) | |
self.driver.save_screenshot(f"{settings.get('SCREENSHOTS_PATH')}{interactElement}_click_error.png") | |
n = os.path.join(settings.get('SCREENSHOTS_PATH'), f"{interactElement}_PageSave.html") | |
f = codecs.open(n, "w", "utf-8") | |
h = self.driver.page_source | |
f.write(h) | |
def xpath_pop_element(self, sel): | |
try: | |
element = WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.XPATH, sel))) | |
self.driver.execute_script("arguments[0].click();", element) | |
except Exception as ex: | |
logging.error(ex) | |
self.driver.save_screenshot(f"{settings.get('SCREENSHOTS_PATH')}{interactElement}_click_error.png") | |
n = os.path.join(settings.get('SCREENSHOTS_PATH'), f"{interactElement}_PageSave.html") | |
f = codecs.open(n, "w", "utf-8") | |
h = self.driver.page_source | |
f.write(h) | |
def selenium_login(self,usrEId,pwdEId,username,password,spider): | |
try: | |
usrElement = WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.ID, usrEId))) | |
usrElement.send_keys(username) | |
if spider.name == 'ashmoret': | |
element = WebDriverWait(self.driver, 10).until( | |
EC.presence_of_element_located((By.XPATH, '//*[@id="f_login"]/div[4]/input'))) | |
self.driver.execute_script("arguments[0].click();", element) | |
self.driver.find_element_by_id(pwdEId).send_keys(password,Keys.ENTER) | |
self.cookie = self.driver.get_cookies() | |
except TimeoutException as timeex: | |
logging.error(timeex) | |
except NoSuchElementException as noElementex: | |
logging.error(noElementex) | |
def process_request(self, request, spider): | |
# only process tagged request or delete this if you want all | |
if not (request.meta.get('selenium') or spider.undetectable): | |
return | |
if (not request.meta.get('login')) and (spider.name == 'hvr'): | |
for k in self.cookie: | |
self.driver.add_cookie(k) | |
self.driver.get(request.url) | |
if request.meta.get('scroll'): | |
self.scroll() | |
if spider.wait: | |
try: | |
elementId = spider.elementId | |
element_present = EC.presence_of_element_located((By.ID, elementId)) | |
if request.meta.get('elementId'): | |
elementId = request.meta.get('elementId') | |
element_present = EC.presence_of_element_located((By.ID, elementId)) | |
if request.meta.get('elementClass'): | |
elementId = request.meta.get('elementClass') | |
element_present = EC.presence_of_element_located((By.CLASS_NAME,elementId)) | |
WebDriverWait(self.driver, 2).until(element_present) | |
except TimeoutException: | |
spider.logger.error('Spider %s took too long to load' % spider.name) | |
return | |
if request.meta.get('interactElement'): | |
self.popElement(request.meta.get('interactElement')) | |
if request.meta.get("interact-xpath"): | |
self.xpath_pop_element(request.meta.get("interact-xpath")) | |
if request.meta.get('login'): | |
self.selenium_login(spider.usrEId,spider.pwdEId,spider.username,spider.password,spider) | |
body = self.driver.page_source | |
url = request.url | |
response = HtmlResponse(url, body=body, encoding='utf-8', request=request) | |
response.meta['cookieJar'] = self.driver.get_cookies() | |
if request.meta.get("script"): | |
response.meta['script_response'] = self.driver.execute_script(request.meta.get("script")) | |
return response | |
def scroll(self): | |
SCROLL_PAUSE_TIME = 2 | |
# Get scroll height | |
last_height = self.driver.execute_script("return document.body.scrollHeight") | |
main_scroll_count = 0 | |
while True: | |
# Scroll down to bottom | |
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
# Wait to load page | |
time.sleep(SCROLL_PAUSE_TIME) | |
# Calculate new scroll height and compare with last scroll height | |
new_height = self.driver.execute_script("return document.body.scrollHeight") | |
main_scroll_count = main_scroll_count + 1 | |
if new_height == last_height: | |
break | |
last_height = new_height | |
def process_response(self, request, response, spider): | |
# Called with the response returned from the downloader. | |
# Must either; | |
# - return a Response object | |
# - return a Request object | |
# - or raise IgnoreRequest | |
return response | |
def process_exception(self, request, exception, spider): | |
# Called when a download handler or a process_request() | |
# (from other downloader middleware) raises an exception. | |
# Must either: | |
# - return None: continue processing this exception | |
# - return a Response object: stops process_exception() chain | |
# - return a Request object: stops process_exception() chain | |
pass | |
def spider_opened(self, spider): | |
spider.logger.info('Spider opened: %s' % spider.name) | |
return self.mailer.send(to=settings.get("EMAIL_LIST"), cc=settings.get("CC_LIST"), subject=f"IGROUP Coupon Scraping - Spider {spider.name} status",body=f"Spider {spider.name} started at {datetime.now().strftime('%m/%d/%Y, %H:%M:%S')}") | |
def spider_closed(self, spider): | |
spider.logger.info('Spider closed: %s' % spider.name) | |
if self.driver: | |
self.driver.close() | |
self.driver = None | |
if sys.platform == "linux" or sys.platform == "linux2": | |
self.display.stop() | |
spider.logger.info("Virtual Display killed") | |
return self.mailer.send(to=settings.get("EMAIL_LIST"), cc=settings.get("CC_LIST"), subject=f"IGROUP Coupon Scraping - Spider {spider.name} status",body=f"Spider {spider.name} closed at {datetime.now().strftime('%m/%d/%Y, %H:%M:%S')}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment