protected void testKafkaSourcePulsarFunction(TopicName pulsarTopic) throws Exception {
final String tenantName = pulsarTopic.getTenant();
final String namespaceName = pulsarTopic.getNamespacePortion();
// create kafka topic
final String kafkaTopic = "test-kafka-source-pulsar-function-" + Base58.randomString(8);
final NewTopic newKafkaTopic = new NewTopic(kafkaTopic, 1, (short) 1);
kafkaAdmin.createTopics(
Sets.newHashSet(newKafkaTopic),
new CreateTopicsOptions()
);
// register Schema.STRING to the pulsar topic. because Kafka source is running
// first, it will trigger creating the pulsar topic with Schema.BYTES. Hence when
// running ExclamationFunction, the function will fail to run because it attempts
// to consume the topic (with Schema.BYTES) using Schema.STRING.
admin.schemas().createSchema(pulsarTopic.toString(), Schema.STRING.getSchemaInfo());
// create pulsar output topic
final String outputPulsarTopic = pulsarTopic.toString() + "-output";
@Cleanup
final Consumer<String> pulsarConsumer = client.newConsumer(Schema.STRING)
.subscriptionName("test-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(outputPulsarTopic)
.subscribe();
// create source
final String sourceName = "test-kafka-source";
createKafkaSource(
tenantName,
namespaceName,
sourceName,
kafkaTopic,
pulsarTopic.toString()
);
// create function
final String functionName = "test-exclamation-function";
createExclamationFunction(
tenantName,
namespaceName,
functionName,
pulsarTopic.toString(),
outputPulsarTopic
);
final int numMessages = 10;
// once both the kafka source and function is running. produce the data.
try (Producer<String, String> producer = newKafkaProducer(
new StringSerializer(), new StringSerializer()
)) {
for (int i = 0; i < numMessages; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
kafkaTopic,
"value-" + i
);
producer.send(record);
}
}
// receive messages from pulsar output topic (written by Exclamation Function)
for (int i = 0; i < numMessages; i++) {
Message<String> message = pulsarConsumer.receive();
assertEquals("value-" + i + "!", message.getValue());
}
}
protected boolean assertSourceInstanceRunning(SourceInstanceStatusData status) {
return status.isRunning();
}
protected boolean assertFunctionInstanceRunning(FunctionInstanceStatusData status) {
return status.isRunning();
}
protected void createKafkaSource(String tenantName,
String namespaceName,
String sourceName,
String kafkaTopic,
String pulsarTopicName) throws Exception {
Map<String, Object> kafkaSourceConfig = buildKafkaSourceConfig(
"PLAINTEXT://" + pulsarService().getClusterName()
+ "-" + ExternalServices.KAFKA + ":9092",
kafkaTopic,
"test-group"
);
log.info("Source config : {}", kafkaSourceConfig);
try {
SourceConfig sourceConfig = SourceConfig.builder()
.archive("builtin://kafka")
.tenant(tenantName)
.namespace(namespaceName)
.name(sourceName)
.parallelism(1)
.configs(kafkaSourceConfig)
.topicName(pulsarTopicName)
.build();
admin.sources().createSource(
sourceConfig,
sourceConfig.getArchive()
);
} catch (PulsarAdminException pae) {
log.error("Failed to create a kafka source : {}", pae.getHttpError(), pae);
throw pae;
}
SourceConfig config = admin.sources().getSource(
tenantName,
namespaceName,
sourceName
);
log.info("Fetch the source config for {}/{}/{}: {}",
tenantName, namespaceName, sourceName, config);
TestUtils.<Void, SourceStatus>retryUntil(
null,
ignored -> admin.sources().getSourceStatus(
tenantName,
namespaceName,
sourceName
),
status -> {
log.info("Get source status : {}", status);
if (status.getNumRunning() < 1) {
return false;
} else {
for (SourceInstanceStatus is : status.getInstances()) {
if (!assertSourceInstanceRunning(is.getStatus())) {
return false;
}
}
return true;
}
}
);
}
protected void createExclamationFunction(String tenantName,
String namespaceName,
String functionName,
String inputPulsarTopic,
String outputPulsarTopic) throws Exception {
PulsarContainerServiceBase containerService = (PulsarContainerServiceBase) pulsarService();
String[] cmdArgs = new String[]{
"/pulsar/bin/pulsar-admin",
"functions",
"create",
"--tenant", tenantName,
"--namespace", namespaceName,
"--name", functionName,
"--inputs", inputPulsarTopic,
"--output", outputPulsarTopic,
"--classname",
"org.apache.pulsar.functions.api.examples.ExclamationFunction",
"--jar",
"/pulsar/examples/api-examples.jar"
};
ExecResult result = containerService.execCmd(
PulsarComponent.BROKER,
cmdArgs
);
assertEquals(0, result.getExitCode());
TestUtils.<Void, FunctionStatus>retryUntil(
null,
ignored -> admin.functions().getFunctionStatus(
tenantName,
namespaceName,
functionName
),
status -> {
log.info("Get function status : {}", status);
if (status.getNumRunning() < 1) {
return false;
} else {
for (FunctionInstanceStatus is : status.getInstances()) {
if (!assertFunctionInstanceRunning(is.getStatus())) {
return false;
}
}
return true;
}
}
);
}
Created
August 2, 2019 08:26
-
-
Save sijie/297e3de42f8184350e651f11db91408b to your computer and use it in GitHub Desktop.
Kafka source with a String function example
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment