The Kafka sink connector pulls messages from Pulsar topics and persists the messages to Kafka topics. For more information about connectors, see Connector Overview.
This document introduces how to get started with creating an Kafka sink connector and get it up and running.
Quick start
Prerequisites
The prerequisites for connecting an Kafka sink connector to external systems include:
Apache Kafka: Ensure you have a running Kafka instance. You can follow the official Kafka Quickstart guide to set up a Kafka instance if you don't have one already.
1. Create a connector
The following command shows how to use pulsarctl to create a builtin
connector. If you want to create a non-builtin
connector, you need to replace --sink-type kafka
with --archive /path/to/pulsar-io-kafka.nar
. You can find the button to download the nar
package at the beginning of the document.
For StreamNative Cloud User
If you are a StreamNative Cloud user, you need set up your environment first.
pulsarctl sinks create \
--sink-type kafka \
--name kafka-sink \
--tenant public \
--namespace default \
--inputs "Your topic name" \
--parallelism 1 \
--sink-config \
'{
"bootstrapServers": "localhost:9092",
"topic": "kafka-topic-name",
"ack": 1
}'
The --sink-config
is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.
Note
You can also choose to use a variety of other tools to create a connector:
- pulsar-admin: The command arguments for
pulsar-admin
are similar to those ofpulsarctl
. You can find an example for StreamNative Cloud Doc. - RestAPI: You can find an example for StreamNative Cloud Doc.
- Terraform: You can find an example for StreamNative Cloud Doc.
- Function Mesh: The docker image can be found at the beginning of the document.
2. Send messages to the topic
Note
If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
PulsarClient client = PulsarClient.builder()
.serviceUrl("{{Your Pulsar URL}}")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("{{Your topic name}}")
.create();
String message = "hello kafka";
MessageId msgID = producer.send(message);
System.out.println("Publish " + message + " and message ID " + msgID);
producer.flush();
producer.close();
client.close();
You can also send the message using the command line:
$ bin/pulsar-client produce pulsar-topic-name --messages "hello kafka"
3. Check the data on kafka topic
You can consume the data from the kafka topic using the command:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-topic-name --from-beginning
If everything is set up correctly, you should see the message "hello kafka" in the Kafka consumer.
Configuration Properties
This table outlines the properties of a Kafka sink connector.
Name | Type | Required | Default | Description |
---|---|---|---|---|
bootstrapServers | String | true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
acks | String | true | " " (empty string) | The number of acknowledgments that the producer requires the leader to receive before a request completes. <br/>This controls the durability of the sent records. |
batchsize | long | false | 16384L | The batch size that a Kafka producer attempts to batch records together before sending them to brokers. |
maxRequestSize | long | false | 1048576L | The maximum size of a Kafka request in bytes. |
topic | String | true | " " (empty string) | The Kafka topic which receives messages from Pulsar. |
keyDeserializationClass | String | false | org.apache.kafka.common.serialization.StringSerializer | The serializer class for Kafka producers to serialize keys. |
valueDeserializationClass | String | false | org.apache.kafka.common.serialization.ByteArraySerializer | The serializer class for Kafka producers to serialize values.<br/><br/>The serializer is set by a specific implementation of KafkaAbstractSink . |
producerConfigProperties | Map | false | " " (empty string) | The producer configuration properties to be passed to producers. <br/><br/>Note: other properties specified in the connector configuration file take precedence over this configuration. |