Created
January 9, 2023 13:19
-
-
Save jakzal/8ae335148e0686ca6875e3de7f2efd90 to your computer and use it in GitHub Desktop.
Learn how to run Debezium with PostgreSQL and Kafka with Junit 5
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kaffeinelabs.debezium; | |
import io.debezium.testing.testcontainers.DebeziumContainer; | |
import org.testcontainers.containers.GenericContainer; | |
import org.testcontainers.containers.KafkaContainer; | |
import org.testcontainers.containers.Network; | |
import org.testcontainers.containers.PostgreSQLContainer; | |
import org.testcontainers.lifecycle.Startable; | |
import org.testcontainers.utility.DockerImageName; | |
import java.util.stream.Stream; | |
public class DebeziumContainers implements Startable { | |
private static final Network network = Network.newNetwork(); | |
public static final PostgreSQLContainer<?> postgres = | |
new PostgreSQLContainer<>(DockerImageName.parse("debezium/postgres:11").asCompatibleSubstituteFor("postgres")) | |
.withNetwork(network) | |
.withNetworkAliases("postgres"); | |
public static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")) | |
.withNetwork(network); | |
public static final DebeziumContainer debezium = | |
new DebeziumContainer("debezium/connect:2.1.1.Final") | |
.withNetwork(network) | |
.withKafka(kafka) | |
.dependsOn(kafka); | |
@Override | |
public void start() { | |
Stream.of(postgres, kafka).parallel().forEach(GenericContainer::start); | |
debezium.start(); | |
} | |
@Override | |
public void stop() { | |
debezium.stop(); | |
Stream.of(postgres, kafka).parallel().forEach(GenericContainer::stop); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kaffeinelabs.debezium; | |
import com.jayway.jsonpath.JsonPath; | |
import io.debezium.testing.testcontainers.ConnectorConfiguration; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.junit.jupiter.api.Test; | |
import org.rnorth.ducttape.unreliables.Unreliables; | |
import org.testcontainers.junit.jupiter.Container; | |
import org.testcontainers.junit.jupiter.Testcontainers; | |
import java.sql.Connection; | |
import java.sql.DriverManager; | |
import java.sql.SQLException; | |
import java.sql.Statement; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.UUID; | |
import java.util.concurrent.TimeUnit; | |
import static org.junit.jupiter.api.Assertions.assertEquals; | |
// @see https://debezium.io/documentation/reference/stable/integrations/testcontainers.html | |
@Testcontainers | |
public class LearnDebeziumTest { | |
@Container | |
private static final DebeziumContainers debeziumContainers = new DebeziumContainers(); | |
@Test | |
public void canRegisterPostgreSqlConnector() throws Exception { | |
try (Connection connection = getConnection(); | |
Statement statement = connection.createStatement(); | |
KafkaConsumer<String, String> consumer = getConsumer( | |
)) { | |
statement.execute("create schema todo"); | |
statement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))"); | |
statement.execute("alter table todo.Todo replica identity full"); | |
statement.execute("insert into todo.Todo values (1, 'Learn CDC')"); | |
statement.execute("insert into todo.Todo values (2, 'Learn Debezium')"); | |
ConnectorConfiguration connector = ConnectorConfiguration | |
.forJdbcContainer(debeziumContainers.postgres) | |
.with("topic.prefix", "dbserver1"); | |
debeziumContainers.debezium.registerConnector("my-connector", connector); | |
consumer.subscribe(List.of("dbserver1.todo.todo")); | |
List<ConsumerRecord<String, String>> changeEvents = drain(consumer, 2); | |
assertEquals(1, JsonPath.<Integer>read(changeEvents.get(0).key(), "$.id")); | |
assertEquals("r", JsonPath.<String>read(changeEvents.get(0).value(), "$.op")); | |
assertEquals("Learn CDC", JsonPath.<String>read(changeEvents.get(0).value(), "$.after.title")); | |
assertEquals(2, JsonPath.<Integer>read(changeEvents.get(1).key(), "$.id")); | |
assertEquals("r", JsonPath.<String>read(changeEvents.get(1).value(), "$.op")); | |
assertEquals("Learn Debezium", JsonPath.<String>read(changeEvents.get(1).value(), "$.after.title")); | |
consumer.unsubscribe(); | |
} | |
} | |
private Connection getConnection() throws SQLException { | |
return DriverManager.getConnection(debeziumContainers.postgres.getJdbcUrl(), debeziumContainers.postgres.getUsername(), debeziumContainers.postgres.getPassword()); | |
} | |
private KafkaConsumer<String, String> getConsumer() { | |
return new KafkaConsumer<>( | |
Map.of( | |
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, debeziumContainers.kafka.getBootstrapServers(), | |
ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), | |
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" | |
), | |
new StringDeserializer(), | |
new StringDeserializer()); | |
} | |
private List<ConsumerRecord<String, String>> drain(KafkaConsumer<String, String> consumer, int expectedRecordCount) { | |
List<ConsumerRecord<String, String>> allRecords = new ArrayList<>(); | |
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { | |
consumer.poll(Duration.ofMillis(50)) | |
.iterator() | |
.forEachRemaining(allRecords::add); | |
return allRecords.size() == expectedRecordCount; | |
}); | |
return allRecords; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment