Skip to content

Instantly share code, notes, and snippets.

@davins90
Created July 22, 2024 16:44
Show Gist options
  • Save davins90/e66bcbcfe6b349e393c3b8ca6434d31e to your computer and use it in GitHub Desktop.
Save davins90/e66bcbcfe6b349e393c3b8ca6434d31e to your computer and use it in GitHub Desktop.

Parallel API Calls

ThreadPoolExecutor

I recently used ThreadPoolExecutor for concurrent API calls. Here's a snippet from my code:

def enrich_dataframe(df, cidr_column='cidr_range', max_workers=8):
    total_cidrs = len(df[cidr_column])
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_cidr, cidr): cidr for cidr in df[cidr_column]}
        results = []
        
        for future in tqdm_notebook(as_completed(futures), total=total_cidrs, desc="Processing CIDRs"):
            results.append(future.result())
    
    results_df = pd.DataFrame(results)
    return df.merge(results_df, on=cidr_column, how='left')

Why I Chose ThreadPoolExecutor

  • Concurrency: Enables simultaneous API calls
  • Simplicity: Easy to implement without low-level threading knowledge
  • Resource Management: Automatic thread pool handling
  • Progress Tracking: Simple integration with tqdm

Asyncio: The Alternative

While I opted for ThreadPoolExecutor, asyncio is another viable option.

Key Differences

  1. Concurrency Model: ThreadPoolExecutor uses multiple threads; asyncio uses a single-threaded event loop
  2. Use Case: ThreadPoolExecutor for I/O-bound tasks leveraging multi-core processors; asyncio for efficient single-threaded I/O operations
  3. Scalability: ThreadPoolExecutor limited by system threads; asyncio potentially handles more concurrent operations
  4. Compatibility: ThreadPoolExecutor works with any Python function; asyncio requires coroutines
  5. Learning Curve: ThreadPoolExecutor is simpler; asyncio offers more control but is more complex

When to Use Each

ThreadPoolExecutor:

  • I/O-bound tasks benefiting from true parallelism
  • Working with non-asyncio-compatible libraries
  • Quick implementation with minimal code changes

Asyncio:

  • Very large number of concurrent operations
  • Fine-grained control over asynchronous tasks
  • Building fully asynchronous applications

Below the code used:

import pandas as pd
import requests

from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm.notebook import tqdm as tqdm_notebook

###

default_header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"}

###

# 1) API Request
def fetch_routing_history(resource):
    base_url = "https://stat.ripe.net/data/routing-history/data.json"
    start_time = int(datetime(2022, 12, 31, tzinfo=timezone.utc).timestamp())
    end_time = int(datetime.now(timezone.utc).timestamp())
    
    params = {
        "data_overload_limit": "ignore",
        "resource": resource,
        "starttime": start_time,
        "endtime": end_time
    }
    
    response = requests.get(base_url, params=params, headers=default_header)
    return response.json()

# 2) Extract Information from API response
def extract_unique_origins(data):
    origins = set()
    for item in data['data']['by_origin']:
        origins.add(item['origin'])
    return list(origins)

# 3) Creating output results
def process_cidr(cidr):
    try:
        data = fetch_routing_history(cidr)
        unique_origins = extract_unique_origins(data)
        return {
            'cidr_range': cidr,
            'was_announced_by_multiple_asn': 1 if len(unique_origins) > 1 else 0,
            'asn_announcer_list': ','.join(unique_origins)
        }
    except Exception as e:
        print(f"Error processing {cidr}: {str(e)}")
        return {
            'cidr_range': cidr,
            'was_announced_by_multiple_asn': None,
            'asn_announcer_list': None
        }
    
# 4) Execute everything and merge on original dataframe    

# with progress bar for notebook
def enrich_dataframe(df, cidr_column='cidr_range', max_workers=8):
    total_cidrs = len(df[cidr_column])
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_cidr, cidr): cidr for cidr in df[cidr_column]}
        results = []
        
        for future in tqdm_notebook(as_completed(futures), total=total_cidrs, desc="Processing CIDRs"):
            results.append(future.result())
    
    results_df = pd.DataFrame(results)
    return df.merge(results_df, on=cidr_column, how='left')

# without progress bar for other env than notebook
# def enrich_dataframe(df, cidr_column='cidr_range', max_workers=8):
#     with ThreadPoolExecutor(max_workers=max_workers) as executor:
#         future_to_cidr = {executor.submit(process_cidr, cidr): cidr for cidr in df[cidr_column]}
#         results = []
#         for future in as_completed(future_to_cidr):
#             results.append(future.result())
    
#     results_df = pd.DataFrame(results)
#     return df.merge(results_df, on=cidr_column, how='left')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment