Skip to content

Instantly share code, notes, and snippets.

@ColeMurray
Created August 10, 2024 23:29
Show Gist options
  • Save ColeMurray/bd444c24873fb6cadfef218cfd1576ea to your computer and use it in GitHub Desktop.
Save ColeMurray/bd444c24873fb6cadfef218cfd1576ea to your computer and use it in GitHub Desktop.
Automated Website Data Extraction
import json
import logging
import time
from typing import Dict, Any
import requests
from bs4 import BeautifulSoup
from openai import OpenAI
from requests.exceptions import RequestException
from tenacity import retry, stop_after_attempt, wait_random_exponential
from usp.tree import sitemap_tree_for_homepage
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
client = OpenAI()
def generate_schema(task_description: str, model: str) -> Dict[str, Any]:
logger.info("Generating complex schema based on task description")
prompt = f"""
Task: Create a detailed Pydantic schema for extracted data based on the following task description:
"{task_description}"
Instructions:
1. Analyze the task description carefully.
2. Create a JSON object representing a complex schema with nested structures if necessary.
3. Use the following format for each field:
"field_name": {{
"type": "<python_type>",
"description": "<brief description>",
"optional": true/false,
"validation": {{
// Additional validation rules (if any)
}}
}}
4. For nested structures, use "type": "dict" and include a "properties" key with sub-fields.
5. For lists of objects, use "type": "list" and include an "items" key with the object schema.
6. Available Python types: str, int, float, bool, datetime, url, email, list, dict
7. Always include "url" and "confidence" fields in the root of the schema.
Example of a valid complex schema:
{{
"url": {{
"type": "str",
"description": "Source URL of the extracted data",
"optional": false
}},
"title": {{
"type": "str",
"description": "Title of the content",
"optional": false,
"validation": {{
"min_length": 5,
"max_length": 200
}}
}},
"publish_date": {{
"type": "datetime",
"description": "Publication date of the content",
"optional": true
}},
"author": {{
"type": "dict",
"description": "Author information",
"optional": true,
"properties": {{
"name": {{
"type": "str",
"description": "Author's name",
"optional": false
}},
"email": {{
"type": "email",
"description": "Author's email",
"optional": true
}}
}}
}},
"tags": {{
"type": "list",
"description": "List of tags associated with the content",
"optional": true,
"items": {{
"type": "str"
}}
}},
"confidence": {{
"type": "float",
"description": "Confidence score of the extraction",
"optional": false,
"validation": {{
"min_value": 0.0,
"max_value": 1.0
}}
}}
}}
Now, generate a complex schema for the given task. Respond with only the JSON schema:
"""
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
try:
schema = json.loads(response.choices[0].message.content)
validate_schema(schema)
logger.info(f"Generated complex schema: {json.dumps(schema, indent=2)}")
return schema
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON from model output: {str(e)}")
raise
except ValueError as e:
logger.error(f"Invalid schema structure: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error in schema generation: {str(e)}")
raise
def validate_schema(schema: Dict[str, Any]):
required_fields = ["url", "confidence"]
for field in required_fields:
if field not in schema:
raise ValueError(f"Schema is missing required field: {field}")
def validate_field(field_name: str, field_schema: Dict[str, Any]):
required_keys = ["type", "description", "optional"]
for key in required_keys:
if key not in field_schema:
raise ValueError(f"Field '{field_name}' is missing required key: {key}")
valid_types = ["str", "int", "float", "bool", "datetime", "url", "email", "list", "dict"]
if field_schema["type"] not in valid_types:
raise ValueError(f"Field '{field_name}' has invalid type: {field_schema['type']}")
if field_schema["type"] == "dict" and "properties" not in field_schema:
raise ValueError(f"Field '{field_name}' of type 'dict' is missing 'properties'")
if field_schema["type"] == "list" and "items" not in field_schema:
raise ValueError(f"Field '{field_name}' of type 'list' is missing 'items'")
for field_name, field_schema in schema.items():
validate_field(field_name, field_schema)
from pydantic import BaseModel, Field, ValidationError, create_model, EmailStr, HttpUrl, constr, \
ConfigDict
from typing import List, Dict, Any, Optional, Literal
def create_pydantic_model(schema: Dict[str, Any]):
logger.info("Creating complex Pydantic model from schema")
def create_field(field_schema: Dict[str, Any]):
field_type = field_schema["type"]
is_optional = field_schema.get("optional", False)
validation = field_schema.get("validation", {})
if field_type == "str":
if "enum" in validation:
field = Literal[tuple(validation["enum"])]
else:
field = constr(**{k: v for k, v in validation.items() if k in ["min_length", "max_length"]})
elif field_type == "int":
field = int
elif field_type == "float":
field = float
elif field_type == "bool":
field = bool
elif field_type == "datetime":
field = datetime
elif field_type == "url":
field = HttpUrl
elif field_type == "email":
field = EmailStr
elif field_type == "list":
item_field = create_field(field_schema["items"])
field = List[item_field]
elif field_type == "dict":
nested_model = create_pydantic_model(field_schema["properties"])
field = nested_model
else:
logger.warning(f"Unknown field type: {field_type}. Defaulting to str.")
field = str
return Optional[field] if is_optional else field
fields = {}
for field_name, field_schema in schema.items():
field_type = create_field(field_schema)
field_description = field_schema.get("description", "")
is_optional = field_schema.get("optional", False)
validation = field_schema.get("validation", {})
field_info = Field(
default=None if is_optional else ...,
description=field_description,
**{k: v for k, v in validation.items() if k in ["gt", "ge", "lt", "le", "min_items", "max_items"]}
)
fields[field_name] = (field_type, field_info)
class ExtractedDataModel(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
ExtractedData = create_model('ExtractedData', __base__=ExtractedDataModel, **fields)
logger.info(f"Created complex ExtractedData model: {ExtractedData.model_json_schema()}")
return ExtractedData
from datetime import datetime, timezone
def get_sitemap(site_url: str, max_urls: int = 100) -> List[str]:
logger.info(f"Fetching sitemap from {site_url}")
try:
def sort_key(page):
if page.last_modified is None:
return datetime.min.replace(tzinfo=timezone.utc)
if page.last_modified.tzinfo is None:
return page.last_modified.replace(tzinfo=timezone.utc)
return page.last_modified.astimezone(timezone.utc)
tree = sitemap_tree_for_homepage(site_url)
pages = list(tree.all_pages())
pages = sorted(pages, key=sort_key, reverse=True)
logger.info(f"Found {len(pages)} pages in sitemap")
urls = [page.url for page in pages[:max_urls]]
urls = list(set(urls))
logger.info(f"Returning {len(urls)} URLs for processing")
return urls
except Exception as e:
logger.error(f"Error fetching sitemap: {e}")
raise
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=60))
def analyze_urls(urls: List[str], extraction_task: str, model: str) -> List[Dict[str, Any]]:
logger.info("Analyzing URLs for relevance")
prompt = f"""
Given the following extraction task: '{extraction_task}', analyze these URLs:
{json.dumps(urls, indent=2)}
For each URL, determine if it's relevant to the task and provide a brief reason.
You currently are in crawl mode: "low-budget" - be strict. only crawl a page if it is likely to be relevant.
Respond in the following JSON format:
{{
"results": [
{{
"url": string,
"reason": string
"is_relevant": boolean,
}},
...
]
}}
"""
print(f"Prompt: {prompt}")
try:
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system",
"content": "You are an expert data analysis assistant that analyzes URLs based on their relevance to a given task. In the reason, think through step by step why this is relevant. Be concise and critical."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
analyzed_urls = json.loads(response.choices[0].message.content)
print(analyzed_urls)
validated_urls = [url_data for url_data in analyzed_urls['results'] if url_data['is_relevant']]
logger.info(f"Analyzed {len(validated_urls)} relevant URLs")
return validated_urls
except Exception as e:
logger.error(f"Error analyzing URLs: {e}")
raise
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=60))
def crawl_url(url):
logger.info(f"Crawling URL: {url}")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.text
except RequestException as e:
logger.error(f"Error crawling URL {url}: {e}")
raise
def extract_main_content(html):
logger.info("Extracting main content from HTML")
soup = BeautifulSoup(html, 'html.parser')
for script in soup(["script", "style"]):
script.decompose()
return soup.get_text()
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=60))
def extract_information(url, content, extraction_task, ExtractedData, model):
logger.info("Extracting complex information using LLM")
prompt = f"""
Given the following content from {url}, extract information relevant to this task: '{extraction_task}'
Respond in the following JSON format, adhering strictly to the provided schema:
{ExtractedData.schema_json()}
Ensure all required fields are filled and optional fields are included only if the information is available.
For nested structures and lists, provide complete and valid data.
Content:
{content[:8000]} # Increased limit for more context
"""
logger.info(f"Prompt: {prompt}")
try:
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system",
"content": "You are a helpful assistant that extracts specific, structured information from text based on a given task and schema. Only use the provided context, do not make up information"},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
logger.info(f"Response: {response.choices[0].message.content}")
extracted_data = json.loads(response.choices[0].message.content)
validated_data = ExtractedData(**extracted_data)
return validated_data
except ValidationError as e:
logger.error(f"Validation error: {e}")
raise
except Exception as e:
logger.error(f"Error extracting information: {e}")
raise
def main(sitemap_url, extraction_task, model):
logger.info("Starting complex data extraction workflow")
try:
schema = generate_schema(extraction_task, model)
ExtractedData = create_pydantic_model(schema)
urls = get_sitemap(sitemap_url)
analyzed_urls = analyze_urls(urls, extraction_task, model)
results = []
for url_analysis in analyzed_urls:
try:
html = crawl_url(url_analysis['url'])
content = extract_main_content(html)
extracted_info = extract_information(url_analysis['url'], content, extraction_task, ExtractedData,
model)
results.append(extracted_info.dict())
time.sleep(1) # Be polite to the server
except Exception as e:
logger.error(f"Error processing URL {url_analysis['url']}: {e}")
continue
logger.info("Saving complex extraction results")
with open("complex_extraction_results.json", "w") as f:
json.dump(results, f, indent=2)
logger.info("Complex extraction complete!")
except Exception as e:
logger.error(f"An error occurred during the complex extraction process: {e}")
@retry(stop=stop_after_attempt(3), wait=wait_random_exponential(min=1, max=60))
def consolidate_results(results: List[Dict[str, Any]], extraction_task: str, model: str) -> Dict[str, Any]:
logger.info("Consolidating extraction results")
prompt = f"""
Task: Consolidate the following extraction results into a single, comprehensive response.
Extraction task: '{extraction_task}'
Extracted data from multiple pages:
{json.dumps(results, indent=2)}
Instructions:
1. Analyze all the extracted data carefully.
2. Create a consolidated summary that includes all relevant information from all pages.
3. Organize the information logically and avoid redundancy.
4. If there are conflicting pieces of information, mention the discrepancies.
5. Provide an overall confidence score for the consolidated information.
6. Format your response as a JSON object with appropriate fields based on the extraction task.
7. Include a 'sources' field that lists the URLs from which information was extracted.
Respond with only the JSON object containing the consolidated information.
"""
try:
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system",
"content": "You are an expert data analyst tasked with consolidating and summarizing information extracted from multiple web pages."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
consolidated_data = json.loads(response.choices[0].message.content)
logger.info("Successfully consolidated extraction results")
return consolidated_data
except Exception as e:
logger.error(f"Error consolidating results: {e}")
raise
def main(sitemap_url, extraction_task, model):
logger.info("Starting complex data extraction workflow")
try:
schema = generate_schema(extraction_task, model)
ExtractedData = create_pydantic_model(schema)
urls = get_sitemap(sitemap_url)
analyzed_urls = analyze_urls(urls, extraction_task, model)
results = []
for url_analysis in analyzed_urls:
try:
html = crawl_url(url_analysis['url'])
content = extract_main_content(html)
extracted_info = extract_information(url_analysis['url'], content, extraction_task, ExtractedData,
model)
results.append(extracted_info.dict())
time.sleep(1) # Be polite to the server
except Exception as e:
logger.error(f"Error processing URL {url_analysis['url']}: {e}")
continue
logger.info("Saving individual extraction results")
with open("individual_extraction_results.json", "w") as f:
json.dump(results, f, indent=2)
logger.info("Consolidating results")
consolidated_result = consolidate_results(results, extraction_task, model)
logger.info("Saving consolidated extraction result")
with open("consolidated_extraction_result.json", "w") as f:
json.dump(consolidated_result, f, indent=2)
logger.info("Complex extraction and consolidation complete!")
except Exception as e:
logger.error(f"An error occurred during the complex extraction process: {str(e)}")
logger.exception("Detailed error information:")
if __name__ == "__main__":
sitemap_url = "https://example.com"
extraction_task = input("Please describe the complex extraction task: ")
model = "gpt-4o-mini" # You can change this to allow user input if desired
main(sitemap_url, extraction_task, model)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment