Created
March 5, 2018 20:52
-
-
Save versae/b4631fda466670206f240b7eb2712d45 to your computer and use it in GitHub Desktop.
NREGA scrapping
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# # NREGA Scrapping | |
# | |
# ## Setup the driver | |
# In[1]: | |
import itertools | |
import time | |
import os | |
import numpy as np | |
from selenium import webdriver | |
from selenium.webdriver.support.ui import Select | |
from selenium.webdriver.chrome.options import Options | |
from tqdm import tqdm | |
download_directory = os.path.join(os.path.expanduser("~"), "Downloads", "nrega") | |
options = webdriver.ChromeOptions() | |
options.set_headless(headless=True) | |
options.add_argument("--incognito") | |
options.add_argument("--disable-extensions") | |
options.add_experimental_option("prefs", { | |
"download.default_directory": download_directory, | |
"download.prompt_for_download": False, | |
"download.directory_upgrade": True, | |
"safebrowsing.enabled": True | |
}) | |
# FirefoxProfile fxProfile = new FirefoxProfile(); | |
# fxProfile.setPreference("browser.download.folderList",2); | |
# fxProfile.setPreference("browser.download.manager.showWhenStarting",false); | |
# fxProfile.setPreference("browser.download.dir","c:\\mydownloads"); | |
# fxProfile.setPreference("browser.helperApps.neverAsk.saveToDisk","text/csv"); | |
# In[2]: | |
def click_all(elements, delay=0): | |
for element in elements: | |
try: | |
element.location_once_scrolled_into_view # also scrolls to element | |
element.click() | |
except: | |
pass | |
time.sleep(delay) | |
# In[3]: | |
def expand_click(lis, delay=0): | |
for li in lis: | |
click_all((element for element in li.find_elements_by_css_selector(".accordion") | |
if element.text.strip() != "-"), delay) | |
# In[4]: | |
def select_by_id(element_id, value=None, text=None): | |
select_element = browser.find_element_by_id(element_id) | |
select_element.location_once_scrolled_into_view | |
if value: | |
Select(select_element).select_by_value(str(value)) | |
if text: | |
Select(select_element).select_by_visible_text(str(text)) | |
# In[5]: | |
def enable_download_in_headless_chrome(browser, download_dir): | |
# https://bugs.chromium.org/p/chromium/issues/detail?id=696481#c86 | |
# add missing support for chrome "send_command" to selenium webdriver | |
browser.command_executor._commands["send_command"] = ("POST", '/session/$sessionId/chromium/send_command') | |
params = {'cmd': 'Page.setDownloadBehavior', 'params': {'behavior': 'allow', 'downloadPath': download_dir}} | |
browser.execute("send_command", params) | |
# In[6]: | |
browser = webdriver.Chrome(chrome_options=options) | |
# ## Regions | |
# In[7]: | |
def select_regions(state_names=None, delay=0): | |
select_by_id("regionselect", text="GP") | |
container = browser.find_element_by_id("middlecontainer") | |
if state_names: | |
states = [li for li in container.find_elements_by_class_name("statebox") if li.text[2:] in state_names] | |
else: | |
states = [li for li in container.find_elements_by_class_name("statebox")] | |
expand_click(states, delay) | |
# run it twice each since sometimes the requests fail | |
expand_click(container.find_elements_by_class_name("districtbox"), delay) | |
expand_click(container.find_elements_by_class_name("districtbox"), delay) | |
expand_click(container.find_elements_by_class_name("blockbox"), delay) | |
expand_click(container.find_elements_by_class_name("blockbox"), delay) | |
for li in states: | |
state_input = li.find_element_by_tag_name("input") | |
state_input.click() | |
# ## Indicators, years, and download | |
# In[8]: | |
def select_indicators(person_days=None, age_range=None, month=None): | |
for panel in ("UpdatePanelmiddle", "UpdatePanelleft", "UpdatePanelright"): | |
click_all(browser.find_element_by_id(panel).find_elements_by_css_selector("input")) | |
if person_days: | |
select_by_id("DdlstTxtBox1", text=person_days) | |
else: | |
click_all(browser.find_elements_by_id("TxtBox1")) | |
if age_range: | |
select_by_id("DdlstTxtBox9", text=age_range) | |
else: | |
click_all(browser.find_elements_by_id("TxtBox9")) | |
if month: | |
for month_select in ("DdlstTxtBox5", "DdlstTxtBox6", "DdlstTxtBox7", "DdlstTxtBox8"): | |
select_by_id(month_select, text=month) | |
else: | |
for month_select in ("TxtBox5", "TxtBox6", "TxtBox7", "TxtBox8"): | |
click_all(browser.find_elements_by_id(month_select)) | |
# In[9]: | |
def select_year(year): | |
select_by_id("DdlstFinYear", value=year) | |
# ## Main | |
# In[10]: | |
url = "http://nregarep2.nic.in/netnrega/dynamic2/DynamicReport_new4.aspx" | |
state_names = ('Andaman and Nicobar', 'Andhra Pradesh', 'Arunachal Pradesh', 'Assam', 'Bihar', 'Chhattisgarh', 'Dadra & Nagar Haveli', 'Daman & Diu', 'Goa', 'Gujarat', 'Haryana', 'Himachal Pradesh', 'Jammu and Kashmir', 'Jharkhand', 'Karnataka', 'Kerala', 'Lakshadweep', 'Madhya Pradesh', 'Maharashtra', 'Manipur', 'Meghalaya', 'Mizoram', 'Nagaland', 'Odisha', 'Puducherry', 'Punjab', 'Rajasthan', 'Sikkim', 'Tamil Nadu', 'Tripura', 'Uttar Pradesh', 'Uttarakhand', 'West Bengal') | |
years = [str(y) for y in range(2011, 2018)] # 2011 doesn't quite work | |
persons_days = ("", ">14", "1-10", "11-20", "21-30", "31-40", "41-50", "51-60", "61-70", "71-80", "81-99", "100", ">100") | |
age_ranges = ("", "18-30 yrs", "30-40 yrs", "40-50 yrs", "50-60 yrs", ">60 yrs") | |
months = ("", "April", "May", "June", "July", "August", "September", "October", "November", "December", "January", "February", "March") | |
indicator_list = [state_names, years, months, persons_days, age_ranges] | |
indicators = itertools.product(*indicator_list) | |
def main(indicators, url, total=None, click_delay=0.5, download_delay=5): | |
count = 0 | |
print(f"Total form submissions to make: {total or 0}") | |
for indicator in indicators: | |
count += 1 | |
log_set = set() | |
browser.get(url) | |
state_name, year, month, persons_day, age_range, *_ = indicator | |
enable_download_in_headless_chrome(browser, download_directory) | |
select_year(year) | |
select_indicators(persons_day, age_range, month) | |
header = "{count:5}. {indicator}... ".format( | |
count=count, | |
indicator=", ".join(filter(bool, indicator)) | |
) | |
print(header, end="") | |
select_regions([state_name], delay=click_delay) | |
logs = browser.get_log("browser") | |
is_severe = False | |
if logs: | |
for log in logs: | |
if (log["level"] == "SEVERE" | |
and "favicon" not in log["message"] | |
and log["message"] not in log_set): | |
is_severe = True | |
print(f"\n\tERROR: {log['message']}", end="") | |
log_set.add(log["message"]) | |
if is_severe: | |
print("\n\t", "-" * len(header), sep="") | |
# download | |
browser.find_element_by_id("dwnldDummy").click() | |
time.sleep(download_delay) | |
filepath = os.path.join(download_directory, "report.xls") | |
if os.path.isfile(filepath): | |
os.rename(filepath, os.path.join(download_directory, f"{header[:-4].strip()}.xls")) | |
print("OK") | |
elif not is_severe: | |
print("FAIL") | |
print() | |
main(indicators, url, total=np.prod([m for m in map(len, indicator_list)]), click_delay=0.5, download_delay=5) | |
# In[ ]: | |
browser.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment