Created
March 11, 2023 00:04
-
-
Save toksdotdev/5c2f2e8f808d29dea75facfa3308ab64 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use super::RoundRobinStrategy; | |
use crate::balancer::StrategyProducer; | |
use crate::Target; | |
use futures::future::abortable; | |
use futures::stream::AbortHandle; | |
use futures::stream::Abortable; | |
use std::sync::atomic::AtomicBool; | |
use std::sync::atomic::AtomicUsize; | |
use std::sync::atomic::Ordering; | |
use std::sync::Arc; | |
use std::time::Duration; | |
use tokio::sync::Mutex; | |
use tokio::task::JoinHandle; | |
use tokio::time; | |
#[async_trait::async_trait] | |
pub trait Rank { | |
type Item; | |
/// Rank values within self based on a property. | |
async fn rank(&mut self); | |
} | |
#[async_trait::async_trait] | |
impl Rank for Vec<Target> { | |
type Item = Target; | |
async fn rank(&mut self) { | |
todo!() | |
} | |
} | |
pub struct OptimalRoundRobinProducer<T> { | |
left_bucket: Arc<Mutex<T>>, | |
right_bucket: Arc<Mutex<T>>, | |
left_bucket_start_index: Arc<AtomicUsize>, | |
right_bucket_start_index: Arc<AtomicUsize>, | |
/// Toggle to switch from left bucket to right bucket. | |
/// | |
/// Setting the toggle to true causes values to be picked from | |
/// the left bucket. | |
read_from_left_bucket: Arc<AtomicBool>, | |
handle: (Abortable<JoinHandle<()>>, AbortHandle), | |
} | |
impl<T> OptimalRoundRobinProducer<T> | |
where | |
T: Rank + Send + Sync + Clone + 'static, | |
{ | |
pub fn new(haystack: T) -> Self { | |
let left_bucket = Arc::new(Mutex::new(haystack.clone())); | |
let right_bucket = Arc::new(Mutex::new(haystack)); | |
let left_bucket_start_index = Arc::new(AtomicUsize::new(0)); | |
let right_bucket_start_index = Arc::new(AtomicUsize::new(0)); | |
let read_from_left_bucket = Arc::new(AtomicBool::new(true)); | |
let handle = abortable(tokio::spawn(Self::periodically_arrange( | |
left_bucket.clone(), | |
right_bucket.clone(), | |
left_bucket_start_index.clone(), | |
right_bucket_start_index.clone(), | |
read_from_left_bucket.clone(), | |
))); | |
Self { | |
left_bucket_start_index, | |
right_bucket_start_index, | |
read_from_left_bucket, | |
left_bucket, | |
right_bucket, | |
handle, | |
} | |
} | |
fn get_bucket(&self) -> (T, Arc<AtomicUsize>) { | |
if self.read_from_left_bucket.load(Ordering::SeqCst) { | |
( | |
self.left_bucket.blocking_lock().clone(), | |
self.left_bucket_start_index.clone(), | |
) | |
} else { | |
( | |
self.right_bucket.blocking_lock().clone(), | |
self.right_bucket_start_index.clone(), | |
) | |
} | |
} | |
async fn periodically_arrange( | |
left_bucket: Arc<Mutex<T>>, | |
right_bucket: Arc<Mutex<T>>, | |
left_bucket_start_index: Arc<AtomicUsize>, | |
right_bucket_start_index: Arc<AtomicUsize>, | |
read_from_left_bucket: Arc<AtomicBool>, | |
) { | |
// TODO: make this interval confifurable. | |
let mut interval = time::interval(Duration::from_secs(10)); | |
loop { | |
// if false is returned, read_from_left_bucket becomes true. | |
if read_from_left_bucket.fetch_xor(true, Ordering::SeqCst) { | |
right_bucket_start_index.store(0, Ordering::SeqCst); | |
left_bucket.lock().await.rank().await; | |
} else { | |
left_bucket_start_index.store(0, Ordering::SeqCst); | |
right_bucket.lock().await.rank().await; | |
} | |
interval.tick().await; | |
} | |
} | |
} | |
impl StrategyProducer<Target> for OptimalRoundRobinProducer<Vec<Target>> { | |
fn produce(&self) -> Box<dyn crate::balancer::Strategy<Item = Target>> { | |
let (bucket, start_index) = self.get_bucket(); | |
let _ = start_index.compare_exchange(bucket.len(), 0, Ordering::SeqCst, Ordering::SeqCst); | |
Box::new(RoundRobinStrategy::new( | |
start_index.fetch_add(1, Ordering::SeqCst), | |
Arc::new(bucket), | |
)) | |
} | |
} | |
impl<T> Drop for OptimalRoundRobinProducer<T> { | |
fn drop(&mut self) { | |
self.handle.1.abort(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment