Created
March 14, 2024 10:51
-
-
Save Object905/6cafd5e8e56dd60670149296411a407f to your computer and use it in GitHub Desktop.
Pingora kubernetes discovery example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::{BTreeSet, HashMap}; | |
use std::net::{IpAddr, Ipv4Addr, SocketAddrV4}; | |
use std::pin::Pin; | |
use std::str::FromStr; | |
use std::sync::atomic::Ordering; | |
use std::sync::Arc; | |
use async_trait::async_trait; | |
use atomic_option::AtomicOption; | |
use futures::future::{select, Either}; | |
use futures::{FutureExt, Stream, StreamExt}; | |
use k8s_openapi::api::discovery::v1::EndpointSlice; | |
use kube::runtime::reflector::Store; | |
use pingora::prelude::{Error, Result as PingoraResult}; | |
use pingora_core::protocols::l4::socket::SocketAddr; | |
use pingora_core::server::ShutdownWatch; | |
use pingora_core::services::background::BackgroundService; | |
use pingora_load_balancing::discovery::ServiceDiscovery; | |
use pingora_load_balancing::selection::RoundRobin; | |
use pingora_load_balancing::{Backend, LoadBalancer}; | |
pub struct EndpointSliceDiscovery { | |
pub ctx: Arc<ProxyContext>, | |
pub store: Arc<Store<EndpointSlice>>, | |
pub port: u16, | |
} | |
impl EndpointSliceDiscovery { | |
fn kube_to_backends(&self, kube: Arc<EndpointSlice>) -> PingoraResult<Vec<Backend>> { | |
if kube.address_type != "IPv4" { | |
return Err(Error::create( | |
pingora_core::ErrorType::Custom("Can handle only IPv4 from endpoin slices"), | |
pingora_core::ErrorSource::Internal, | |
None, | |
None, | |
)); | |
} | |
let ips: Vec<Ipv4Addr> = kube | |
.endpoints | |
.iter() | |
.filter(|&e| { | |
if let Some(conditions) = e.conditions.as_ref() { | |
if let Some(ready) = conditions.ready { | |
if ready { | |
return true; | |
} | |
} | |
} | |
false | |
}) | |
.map(|endpoint| { | |
endpoint.addresses.iter().map(|ip| { | |
Ipv4Addr::from_str(ip).expect("Failed to parse endpoint IPv4 address") | |
}) | |
}) | |
.flatten() | |
.collect(); | |
let backends: Vec<Backend> = ips | |
.into_iter() | |
.map(|ip| SocketAddr::Inet(std::net::SocketAddr::V4(SocketAddrV4::new(ip, self.port)))) | |
.map(|addr| Backend { | |
addr: addr, | |
weight: 1, | |
}) | |
.collect(); | |
return Ok(backends); | |
} | |
} | |
#[async_trait] | |
impl ServiceDiscovery for EndpointSliceDiscovery { | |
async fn discover(&self) -> PingoraResult<(BTreeSet<Backend>, HashMap<u64, bool>)> { | |
let state = self.store.state(); | |
let backends: PingoraResult<Vec<Vec<Backend>>> = state | |
.into_iter() | |
.map(|kube| self.kube_to_backends(kube)) | |
.collect(); | |
let backends = backends?; | |
let result: BTreeSet<Backend> = backends.into_iter().flatten().collect(); | |
Ok((result, HashMap::new())) | |
} | |
} | |
pub struct EndpointSliceDiscoveryService { | |
pub stream: AtomicOption<Pin<Box<dyn Stream<Item = EndpointSlice> + Send>>>, | |
pub lb: Arc<LoadBalancer<RoundRobin>>, | |
} | |
impl EndpointSliceDiscoveryService { | |
async fn update_backends(&self, _event: EndpointSlice) { | |
self.lb.update().await.expect("Failed to update backends"); | |
} | |
} | |
#[async_trait] | |
impl BackgroundService for EndpointSliceDiscoveryService { | |
async fn start(&self, mut _shutdown: ShutdownWatch) { | |
let owned_stream = *self.stream.take(Ordering::SeqCst).unwrap(); | |
let stream = owned_stream | |
.for_each(|_event| async { | |
self.update_backends(_event).await; | |
() | |
}) | |
.boxed(); | |
let completed = select(stream, _shutdown.changed().boxed()).await; | |
match completed { | |
Either::Left(_) => { | |
panic!("Kubernetes update stream ended abruptly"); | |
}, | |
Either::Right(_) => { | |
// terminating | |
}, | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let kube_client = self.ctx.kube(); | |
if kube_client.is_none() || self.lb.is_some() { | |
return; | |
} | |
let endpoint_slices = | |
Api::<EndpointSlice>::namespaced(kube_client.unwrap().clone(), "default"); | |
let filter = Config::default() | |
.labels(format!("kubernetes.io/service-name={}", self.service_name.as_str()).as_str()); | |
let (store, writer) = reflector::store(); | |
let rf = reflector(writer, watcher(endpoint_slices, filter)); | |
let store = Arc::new(store); | |
let kube_discovery = EndpointSliceDiscovery { | |
ctx: self.ctx.clone(), | |
store: store.clone(), | |
port: self.port, | |
}; | |
let kube_backends = Backends::new(Box::new(kube_discovery)); | |
let mut kube_upstream: LoadBalancer<RoundRobin> = | |
LoadBalancer::from_backends(kube_backends); | |
let hc = health_check::TcpHealthCheck::new(); | |
kube_upstream.set_health_check(hc); | |
kube_upstream.health_check_frequency = Some(Duration::from_secs(5)); | |
let background = background_service(&format!("TcpHealthCheck {}", self), kube_upstream); | |
let upstream = background.task(); | |
server.add_service(background); | |
let kube_upd_service = EndpointSliceDiscoveryService { | |
stream: AtomicOption::new(Box::new( | |
rf.applied_objects().filter_map(|e| ready(e.ok())).boxed(), | |
)), | |
lb: upstream.clone(), | |
}; | |
server.add_service(background_service( | |
&format!("Reflector updater {}", self), | |
kube_upd_service, | |
)); | |
self.lb = Some(upstream); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment