Created
May 13, 2024 20:34
-
-
Save kwdowicz/897f150a9271ccd5ea206b2580826610 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use rdkafka::config::ClientConfig; | |
use rdkafka::consumer::{Consumer, StreamConsumer, CommitMode}; | |
use rdkafka::message::{Message, BorrowedMessage}; | |
use futures::stream::StreamExt; | |
use tokio; | |
#[tokio::main] | |
async fn main() { | |
let consumer: StreamConsumer = ClientConfig::new() | |
.set("group.id", "test-group") | |
.set("bootstrap.servers", "localhost:9092") | |
.set("enable.auto.commit", "false") | |
.set("auto.offset.reset", "earliest") | |
.create() | |
.expect("Consumer creation failed"); | |
consumer.subscribe(&["test-topic"]).expect("Subscription failed"); | |
let mut message_stream = consumer.stream(); | |
while let Some(message) = message_stream.next().await { | |
match message { | |
Ok(m) => { | |
log_message(&m); | |
if let Err(e) = consumer.commit_message(&m, CommitMode::Async) { | |
println!("Failed to commit message: {:?}", e); | |
} | |
}, | |
Err(e) => println!("Error receiving message: {:?}", e), | |
} | |
} | |
} | |
fn log_message(msg: &BorrowedMessage) { | |
let payload_result = msg.payload_view::<str>(); | |
let payload = match payload_result { | |
Some(Ok(text)) => text, | |
Some(Err(_)) => "<invalid UTF-8>", | |
None => "<empty>", | |
}; | |
println!("Received message: {}", payload); | |
println!("Topic: {}, Partition: {}, Offset: {}, Timestamp: {:?}", | |
msg.topic(), msg.partition(), msg.offset(), msg.timestamp()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use rdkafka::config::ClientConfig; | |
use rdkafka::consumer::{BaseConsumer, Consumer}; | |
use rdkafka::message::Message; | |
fn main() { | |
let consumer: BaseConsumer = ClientConfig::new() | |
.set("group.id", "test-group") | |
.set("bootstrap.servers", "localhost:9092") | |
.set("auto.offset.reset", "earliest") | |
.create() | |
.expect("Consumer creation failed"); | |
consumer.subscribe(&["test-topic"]).expect("Subscription failed"); | |
for message in consumer.iter() { | |
match message { | |
Ok(m) => { | |
if let Some(payload) = m.payload() { | |
println!("Received message: {:?}", std::str::from_utf8(payload).unwrap()); | |
} | |
println!("Key: '{:?}', Topic: '{}', Partition: {}, Offset: {}", | |
m.key(), m.topic(), m.partition(), m.offset()); | |
} | |
Err(e) => println!("Error receiving message: {:?}", e), | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use rdkafka::config::ClientConfig; | |
use rdkafka::producer::{FutureProducer, FutureRecord}; | |
#[tokio::main] | |
async fn main() { | |
let producer: FutureProducer = ClientConfig::new() | |
.set("bootstrap.servers", "localhost:9092") | |
.set("message.timeout.ms", "5000") | |
.create() | |
.expect("Producer creation error"); | |
let topic = "test-topic"; | |
let payload = "Kamil!"; | |
let delivery_status = producer | |
.send( | |
FutureRecord::to(topic) | |
.payload(payload) | |
.key("rusty-key"), | |
std::time::Duration::from_secs(0), | |
) | |
.await; | |
match delivery_status { | |
Ok(delivery) => println!("Message delivered: {:?}", delivery), | |
Err((e, _)) => println!("Message delivery failed: {:?}", e), | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use rdkafka::config::ClientConfig; | |
use rdkafka::producer::{BaseProducer, BaseRecord}; | |
fn main() { | |
let producer: BaseProducer = ClientConfig::new() | |
.set("bootstrap.servers", "localhost:9092") | |
.create() | |
.expect("Producer creation error"); | |
producer.send( | |
BaseRecord::to("test-topic") | |
.payload("hello world") | |
.key("key") | |
).expect("Failed to send message"); | |
// Ensure all outstanding messages are delivered | |
producer.flush(std::time::Duration::from_secs(1)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment