Last active
October 28, 2023 18:21
-
-
Save tinshade/2c603f4d795dc61b07c1dcfc5dbab241 to your computer and use it in GitHub Desktop.
Find rooms for cherroro
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Selenium Imports | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.support.ui import WebDriverWait | |
from webdriver_manager.chrome import ChromeDriverManager | |
from selenium.webdriver.support import expected_conditions as EC | |
# Other Imports | |
import csv | |
import time | |
class RoomFinder: | |
def __init__(self, target, counter=99999, should_overwrite=False, new_install=False, output_option=None, budget=None, run_headless=True): | |
self.url = "https://www.spareroom.co.uk/flatshare/?offset={0}&search_id={1}&" | |
self.target_search_id = target | |
self.target = self.fabricate_url(target) | |
self.page_limit = counter | |
self.results = [] | |
self.driver = None | |
self.should_overwrite = should_overwrite | |
self.start_timestamp = str(round(time.time() * 1000)) | |
self.chrome_driver_path = ( | |
ChromeDriverManager().install() if new_install else "./chromedriver.exe" | |
) | |
self.output_option = output_option | |
self.budget = budget | |
self.headless_mode = run_headless | |
def initalize_scrapper(self): | |
options = Options() | |
if self.headless_mode: | |
options.add_argument("--headless=new") | |
self.driver = webdriver.Chrome( | |
service=Service(self.chrome_driver_path), options=options | |
) | |
WebDriverWait(self.driver, 10) | |
self.driver.get(self.target) | |
def generate_csv(self, data) -> bool: | |
try: | |
print("Writing to CSV...") | |
headers = ["Title", "Link", "Price"] | |
filename = ( | |
"./Listings/Listings.csv" | |
if self.should_overwrite | |
else f"./Listings/Listings_{self.target_search_id}_{self.start_timestamp}.csv" | |
) | |
with open(filename, "w+", newline="", encoding='UTF-8') as file: | |
writer = csv.writer(file) | |
writer.writerow(headers) | |
for each in data: | |
temp = list(each.values()) | |
writer.writerow(temp) | |
return True | |
except Exception as e: | |
print(e) | |
return False | |
finally: | |
print("Done writing to CSV") | |
def handle_cookie_prompt(self) -> bool: | |
try: | |
WebDriverWait(self.driver, 10) | |
cookie_button = self.driver.find_elements( | |
By.XPATH, '//*[@id="onetrust-accept-btn-handler"]' | |
) | |
if len(cookie_button) > 0: | |
self.driver.execute_script("arguments[0].click();", cookie_button[0]) | |
print("Cookie prompt was accepted!") | |
else: | |
print("Cookie prompt wasn't shown!") | |
return True | |
except Exception as e: | |
print("Something went wrong while handling the cookie prompt!", e) | |
return False | |
def is_within_budget(self,price): | |
#TODO: Handle ranges like : £60 - £120 | |
price = price.replace('£','').replace('pcm', '').replace('pw','').replace(',','').strip() | |
if not price.isnumeric(): | |
return True # Accept as a false positive instead of showing nothing | |
price = int(price) | |
return price <= self.budget if self.budget else True # Accept anything if no budget was set | |
def scraper(self) -> bool: | |
try: | |
listings = self.driver.find_elements(By.CLASS_NAME, "listing-result") | |
if len(listings): | |
print("Page and listings loaded") | |
for listing in listings: | |
title = ( | |
listing.find_element(By.TAG_NAME, "article") | |
.find_element(By.TAG_NAME, "header") | |
.find_element(By.TAG_NAME, "a") | |
.get_attribute("title") | |
) | |
link = ( | |
listing.find_element(By.TAG_NAME, "article") | |
.find_element(By.TAG_NAME, "header") | |
.find_element(By.TAG_NAME, "a") | |
) | |
link = link.get_property("href") | |
price = ( | |
listing.find_element(By.TAG_NAME, "article") | |
.find_element(By.TAG_NAME, "header") | |
.find_element(By.TAG_NAME, "a") | |
.find_element(By.TAG_NAME, "strong") | |
) | |
price = price.get_attribute("innerHTML") | |
price = ( | |
price.replace('<abbr title="">', "") | |
.replace('<abbr title="per calendar month">', "") | |
.replace('<abbr title="per week">', "") | |
.split("</abbr>")[0] | |
) | |
if self.is_within_budget(price): | |
self.results.append({"title": title, "link": link, "price": price}) | |
return True | |
except Exception as e: | |
self.scrapper_exception = e | |
print("Scrapper Exception", e) | |
return False | |
def validate_target(self, target): | |
if not target or "spareroom" not in target: | |
print( | |
"Target required!\n Target link has to be a SpareRooms link as follows : https://www.spareroom.co.uk/flatshare/?search_id=1256661132&" | |
) | |
return False | |
return True | |
def safely_exit_driver(self): | |
self.driver.quit() | |
def get_last_page(self) -> int: | |
max_elements = self.driver.find_element(By.CLASS_NAME, "navcurrent").get_attribute("innerHTML") | |
max_elements = max_elements.split('</strong> of <strong>')[1] | |
max_elements = max_elements.split('</strong>')[0].replace('<strong>', '')[0] | |
return int(max_elements) | |
def fabricate_url(self, search_id, offset='0'): | |
search_id = str(search_id) | |
target = self.url.format(offset,search_id) | |
return target | |
def safe_typecast(self, user_input): | |
try: | |
return int(user_input) | |
except Exception as e: | |
return user_input | |
def main(self) -> bool: | |
if not self.validate_target(self.target): | |
return False | |
fp_ratio = [] | |
self.initalize_scrapper() | |
cookie_handled = self.handle_cookie_prompt() | |
if not cookie_handled: | |
print("Cookie was not handled!") | |
return False | |
try: | |
max_pages = self.get_last_page() | |
print("Max number of pages available for this search: ", max_pages) | |
if self.page_limit > max_pages: | |
self.page_limit = max_pages | |
for count in range(0, self.page_limit): | |
WebDriverWait(self.driver, 5) | |
self.target = self.fabricate_url(self.target_search_id, count) | |
print(self.target) | |
self.driver.get(self.target) | |
print("Waiting for the page to load..") | |
WebDriverWait(self.driver, 5) | |
print(f"Visiting page: {count+1}") | |
fp_ratio.append("Passed" if self.scraper() else "Failed") | |
print("Pages that have passed / failed : ", fp_ratio) | |
return True | |
except Exception as e: | |
print( | |
"Something went wrong while trying to scrap the website. Please account for the errors manually!", | |
e, | |
) | |
return False | |
finally: | |
print("Shutting down driver") | |
self.safely_exit_driver() | |
if __name__ == "__main__": | |
def find_rooms(search_id): | |
rf = RoomFinder(target=search_id, new_install=True, output_option=2, budget=650) # Install new web-driver for Chrome (Static path is breaking for Chrome 115^) | |
results = rf.main() | |
if results: | |
results = rf.results | |
decide = rf.output_option | |
if not decide: | |
decide = input("Do you want to print the output or generate a CSV file out of it?: \n[[1]] Print\n[2] Generate\n\n") | |
decide = self.safe_typecast(decide) | |
if isinstance(decide, int): | |
if decide == 2: | |
rf.generate_csv(results) | |
else: | |
print(results) | |
else: | |
print( | |
"Your input has to be either a 1 or a 2. I will generate the CSV for you this time" | |
) | |
rf.generate_csv(results) | |
else: | |
print( | |
"Something went wrong while trying to get the listings you were looking for." | |
) | |
def loop_targets(target_arr): | |
for search_id in target_arr: | |
find_rooms(search_id) | |
target = [ | |
"1256661132", # Boundary Lane | |
"1258817846", # Trafford | |
"1259003482", # M16 | |
"1259044053", # M32 | |
"1259044177", # M15 | |
"1259044216", # M14 | |
"1259044250", # M13 | |
"1259044315", # M12 | |
"1259044358", # M1 | |
"1259044379", # M2 | |
"1259044413", # M3 | |
"1259044430", # M4 | |
] | |
loop_targets(target) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment