I recently used ThreadPoolExecutor
for concurrent API calls. Here's a snippet from my code:
def enrich_dataframe(df, cidr_column='cidr_range', max_workers=8):
total_cidrs = len(df[cidr_column])
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_cidr, cidr): cidr for cidr in df[cidr_column]}
results = []
for future in tqdm_notebook(as_completed(futures), total=total_cidrs, desc="Processing CIDRs"):
results.append(future.result())
results_df = pd.DataFrame(results)
return df.merge(results_df, on=cidr_column, how='left')
- Concurrency: Enables simultaneous API calls
- Simplicity: Easy to implement without low-level threading knowledge
- Resource Management: Automatic thread pool handling
- Progress Tracking: Simple integration with
tqdm
While I opted for ThreadPoolExecutor, asyncio is another viable option.
- Concurrency Model: ThreadPoolExecutor uses multiple threads; asyncio uses a single-threaded event loop
- Use Case: ThreadPoolExecutor for I/O-bound tasks leveraging multi-core processors; asyncio for efficient single-threaded I/O operations
- Scalability: ThreadPoolExecutor limited by system threads; asyncio potentially handles more concurrent operations
- Compatibility: ThreadPoolExecutor works with any Python function; asyncio requires coroutines
- Learning Curve: ThreadPoolExecutor is simpler; asyncio offers more control but is more complex
ThreadPoolExecutor:
- I/O-bound tasks benefiting from true parallelism
- Working with non-asyncio-compatible libraries
- Quick implementation with minimal code changes
Asyncio:
- Very large number of concurrent operations
- Fine-grained control over asynchronous tasks
- Building fully asynchronous applications
Below the code used:
import pandas as pd
import requests
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.notebook import tqdm as tqdm_notebook
###
default_header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"}
###
# 1) API Request
def fetch_routing_history(resource):
base_url = "https://stat.ripe.net/data/routing-history/data.json"
start_time = int(datetime(2022, 12, 31, tzinfo=timezone.utc).timestamp())
end_time = int(datetime.now(timezone.utc).timestamp())
params = {
"data_overload_limit": "ignore",
"resource": resource,
"starttime": start_time,
"endtime": end_time
}
response = requests.get(base_url, params=params, headers=default_header)
return response.json()
# 2) Extract Information from API response
def extract_unique_origins(data):
origins = set()
for item in data['data']['by_origin']:
origins.add(item['origin'])
return list(origins)
# 3) Creating output results
def process_cidr(cidr):
try:
data = fetch_routing_history(cidr)
unique_origins = extract_unique_origins(data)
return {
'cidr_range': cidr,
'was_announced_by_multiple_asn': 1 if len(unique_origins) > 1 else 0,
'asn_announcer_list': ','.join(unique_origins)
}
except Exception as e:
print(f"Error processing {cidr}: {str(e)}")
return {
'cidr_range': cidr,
'was_announced_by_multiple_asn': None,
'asn_announcer_list': None
}
# 4) Execute everything and merge on original dataframe
# with progress bar for notebook
def enrich_dataframe(df, cidr_column='cidr_range', max_workers=8):
total_cidrs = len(df[cidr_column])
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_cidr, cidr): cidr for cidr in df[cidr_column]}
results = []
for future in tqdm_notebook(as_completed(futures), total=total_cidrs, desc="Processing CIDRs"):
results.append(future.result())
results_df = pd.DataFrame(results)
return df.merge(results_df, on=cidr_column, how='left')
# without progress bar for other env than notebook
# def enrich_dataframe(df, cidr_column='cidr_range', max_workers=8):
# with ThreadPoolExecutor(max_workers=max_workers) as executor:
# future_to_cidr = {executor.submit(process_cidr, cidr): cidr for cidr in df[cidr_column]}
# results = []
# for future in as_completed(future_to_cidr):
# results.append(future.result())
# results_df = pd.DataFrame(results)
# return df.merge(results_df, on=cidr_column, how='left')