Skip to content

Instantly share code, notes, and snippets.

@Jules-Bertholet
Last active May 12, 2023 03:51
Show Gist options
  • Save Jules-Bertholet/7bf734b3593e8f9831ef279246358b12 to your computer and use it in GitHub Desktop.
Save Jules-Bertholet/7bf734b3593e8f9831ef279246358b12 to your computer and use it in GitHub Desktop.
`spacedustrs` rate limit
use std::{
cmp,
str::FromStr,
sync::{
atomic::{self, AtomicU16},
Arc,
},
time::Duration,
};
use chrono::{DateTime, Utc};
use log::{error, warn};
use once_cell::sync::Lazy;
use reqwest::{Request, Response, StatusCode};
use reqwest_middleware::{ClientWithMiddleware, Middleware};
use spacedust::apis::configuration::Configuration;
use task_local_extensions::Extensions;
use tokio::{
sync::{Mutex, Semaphore, SemaphorePermit},
time::sleep,
};
/// [`Configuration`] object for use in all API calls.
/// Sets API key and manages rate limit.
pub static CONFIGURATION: Lazy<Configuration> = Lazy::new(|| {
let mut configuration = Configuration::new();
configuration.bearer_access_token = Some(include_str!("../TOKEN").to_owned());
let middleware: Box<[Arc<dyn Middleware>]> = Box::new([Arc::new(RateLimitMiddleware)]);
configuration.client = ClientWithMiddleware::new(reqwest::Client::new(), middleware);
configuration
});
/// The current documented burst limit.
const INITIAL_BURST_LIMIT: u16 = 10;
/// The current documented per-second limit.
const INITIAL_RATE_LIMIT: u16 = 2;
/// The burst limit according to the API headers.
static BURST_LIMIT: AtomicU16 = AtomicU16::new(INITIAL_BURST_LIMIT);
/// The per-second rate limit according to the API headers.
static RATE_LIMIT: AtomicU16 = AtomicU16::new(INITIAL_RATE_LIMIT);
// Exponential backoff constants.
/// For exponential backoff retry on server errors.
const BACKOFF_CONSTANT_SECONDS: f64 = 10.0;
/// For exponential backoff retry on server errors.
const BACKOFF_BASE: f64 = 1.5;
/// A permit must be acquired from this pool before any API request.
static REQUEST_SEMAPHORE: Semaphore = Semaphore::const_new(INITIAL_BURST_LIMIT as usize);
struct ReturnPermit;
impl ReturnPermit {
/// Wait for the appropriate amount of time (calculated from the rate limit), then returns a permit to the pool.
async fn return_permit(&mut self, permit: SemaphorePermit<'_>) {
sleep(Duration::from_millis(
1000_u16
.saturating_div(RATE_LIMIT.load(atomic::Ordering::Acquire))
.into(),
))
.await;
drop(permit);
}
}
/// This mutex must be locked before a permit is returned to the pool.
/// Locking ensures that waiting periods are sequential.
static RETURN: Mutex<ReturnPermit> = Mutex::const_new(ReturnPermit);
/// Middleware to enforce rate-limiting for the SpaceTraders API.
#[derive(Default)]
struct RateLimitMiddleware;
#[async_trait::async_trait]
impl reqwest_middleware::Middleware for RateLimitMiddleware {
async fn handle(
&self,
mut req: Request,
extensions: &mut Extensions,
mut next: reqwest_middleware::Next<'_>,
) -> reqwest_middleware::Result<Response> {
// Perform the request, retrying on failure.
let mut retry_info;
let mut server_error_retry_count: i32 = 0;
let result = loop {
retry_info = req
.try_clone()
.map(|cloned_req: Request| (cloned_req, next.clone()));
// Acquire a permit, yield if burst limit attained.
let permit = REQUEST_SEMAPHORE.acquire().await.unwrap();
let result = next.run(req, extensions).await;
// Return permit to the pool after an appropriate timeout.
tokio::spawn(async move {
let mut return_permit = RETURN.lock().await;
return_permit.return_permit(permit).await;
drop(return_permit);
});
match (retry_info, result) {
// If this request isn't retryable, return the response no matter what it is.
(None, result) => break result,
// If this request was successful, return the response.
(_, Ok(response)) if response.status().is_success() => break Ok(response),
// On server error, log and retry with exponential backoff.
(Some((cloned_req, cloned_next)), Ok(resp)) if resp.status().is_server_error() => {
let status = resp.status();
let text = resp.text().await.ok();
error!(
"Server error: {}. This request was previously tried {} times. Response body: {}",
status,
server_error_retry_count,
text.as_deref().unwrap_or("<bytes>")
);
sleep(Duration::from_secs_f64(
BACKOFF_CONSTANT_SECONDS * BACKOFF_BASE.powi(server_error_retry_count),
))
.await;
server_error_retry_count += 1;
req = cloned_req;
next = cloned_next;
}
// If, despite our efforts, we've hit a rate limit, wait for the limits to reset and then retry.
(Some((cloned_req, cloned_next)), Ok(response))
if response.status() == StatusCode::TOO_MANY_REQUESTS =>
{
if let Some(reset) = response
.headers()
.get("x-ratelimit-reset")
.and_then(|h| h.to_str().ok())
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
{
warn!("Rate limit hit! Waiting and retrying.");
let delay = reset
.signed_duration_since(Utc::now())
.to_std()
.unwrap_or(Duration::ZERO);
sleep(delay).await;
req = cloned_req;
next = cloned_next;
} else {
break Ok(response);
}
}
// Otherwise, pass on the error.
(_, response) => break response,
}
};
// Adjust rate limits if they have changed.
if let Ok(res) = &result {
if let Some(limit) = res
.headers()
.get("x-ratelimit-limit-per-second")
.and_then(|h| h.to_str().ok())
.and_then(|h| u16::from_str(h).ok())
{
RATE_LIMIT.store(limit, atomic::Ordering::Release);
}
if let Some(burst_limit) = res
.headers()
.get("x-ratelimit-limit-burst")
.and_then(|h| h.to_str().ok())
.and_then(|h| u16::from_str(h).ok())
{
let old = BURST_LIMIT.swap(burst_limit, atomic::Ordering::AcqRel);
match old.cmp(&burst_limit) {
cmp::Ordering::Less => {
REQUEST_SEMAPHORE.add_permits((burst_limit - old).into());
}
cmp::Ordering::Greater => {
tokio::spawn(async move {
for _ in 0..(old - burst_limit) {
REQUEST_SEMAPHORE.acquire().await.unwrap().forget();
}
});
}
cmp::Ordering::Equal => (),
}
}
}
result
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment