Skip to content

Instantly share code, notes, and snippets.

@kwdowicz
Created May 13, 2024 20:34
Show Gist options
  • Save kwdowicz/897f150a9271ccd5ea206b2580826610 to your computer and use it in GitHub Desktop.
Save kwdowicz/897f150a9271ccd5ea206b2580826610 to your computer and use it in GitHub Desktop.
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer, CommitMode};
use rdkafka::message::{Message, BorrowedMessage};
use futures::stream::StreamExt;
use tokio;
#[tokio::main]
async fn main() {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "test-group")
.set("bootstrap.servers", "localhost:9092")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&["test-topic"]).expect("Subscription failed");
let mut message_stream = consumer.stream();
while let Some(message) = message_stream.next().await {
match message {
Ok(m) => {
log_message(&m);
if let Err(e) = consumer.commit_message(&m, CommitMode::Async) {
println!("Failed to commit message: {:?}", e);
}
},
Err(e) => println!("Error receiving message: {:?}", e),
}
}
}
fn log_message(msg: &BorrowedMessage) {
let payload_result = msg.payload_view::<str>();
let payload = match payload_result {
Some(Ok(text)) => text,
Some(Err(_)) => "<invalid UTF-8>",
None => "<empty>",
};
println!("Received message: {}", payload);
println!("Topic: {}, Partition: {}, Offset: {}, Timestamp: {:?}",
msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
}
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::message::Message;
fn main() {
let consumer: BaseConsumer = ClientConfig::new()
.set("group.id", "test-group")
.set("bootstrap.servers", "localhost:9092")
.set("auto.offset.reset", "earliest")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&["test-topic"]).expect("Subscription failed");
for message in consumer.iter() {
match message {
Ok(m) => {
if let Some(payload) = m.payload() {
println!("Received message: {:?}", std::str::from_utf8(payload).unwrap());
}
println!("Key: '{:?}', Topic: '{}', Partition: {}, Offset: {}",
m.key(), m.topic(), m.partition(), m.offset());
}
Err(e) => println!("Error receiving message: {:?}", e),
}
}
}
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
#[tokio::main]
async fn main() {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let topic = "test-topic";
let payload = "Kamil!";
let delivery_status = producer
.send(
FutureRecord::to(topic)
.payload(payload)
.key("rusty-key"),
std::time::Duration::from_secs(0),
)
.await;
match delivery_status {
Ok(delivery) => println!("Message delivered: {:?}", delivery),
Err((e, _)) => println!("Message delivery failed: {:?}", e),
}
}
use rdkafka::config::ClientConfig;
use rdkafka::producer::{BaseProducer, BaseRecord};
fn main() {
let producer: BaseProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.expect("Producer creation error");
producer.send(
BaseRecord::to("test-topic")
.payload("hello world")
.key("key")
).expect("Failed to send message");
// Ensure all outstanding messages are delivered
producer.flush(std::time::Duration::from_secs(1));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment