- Build Applications
- Kafka Clients
Interoperability between Apache Kafka and Apache Pulsar
Produce and consume messages between Kafka and Pulsar
StreamNative supports three data entry formats, kafka
, mixed_kafka
, and pulsar
. Each format has distinct characteristics:
kafka
format: This provides the best performance; however, a Pulsar consumer cannot consume it unless a payload processor is employed.mixed_kafka
format: Themixed_kafka
format functions similarly to thekafka
format. It supports some non-official Kafka clients for encoding or decoding Kafka messages. The performance is moderate.pulsar
format: This is the default data entry format on StreamNative cloud which supporting interoperability between Kafka and Pulsar clients, including Kafka client to Kafka client, and Pulsar client to Kafka client interactions, and vice versa. This means data between Apache Kafka and Apache Pulsar are interoperable. It is suitable for most scenarios where performance is not a critical consideration.
StreamNative Cloud also supports specifying the entry format on a per-topic basis. The entry format can be set through topic properties by using bin/pulsar-admin topics update-properties
. The configuration key is kafkaEntryFormat
, and the possible values are kafka
, mixed_kafka
, or pulsar
. The default value is pulsar
if not specified.
Interoperability between Pulsar and Kafka clients
With the kafka
or mixed_kafka
entry format, Kafka producers can produce and consume messages directly and freely. However, Pulsar producers SHOULD NOT produce messages in these topics because they are unable to encode messages into a format consumable by Kafka clients.
However, since version 2.9
of the Pulsar client, we introduced a message payload processor for Pulsar consumers. This means that messages produced from Kafka producers can now be consumed and decoded by Pulsar consumers.
For the pulsar
format, it allows messages to be freely produced and consumed between Kafka and Pulsar clients. The message format conversion is automatically handled by the broker, enabling more flexible use of either Kafka clients or Pulsar clients.
Step to Produce msg with Kafka and Consume msg with Pulsar client
Requirements
Pulsar Client Version ≥ 2.9 Pulsar Admin Version ≥ 2.10
Step
Create topic with kafka format via Pulsar admin CLI
Note
The mixed_kafka format can be following the same step, and using
kafkaEntryFormat=mixed_kafka
property.bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics create persistent://public/default/topic-with-kafka-format --metadata kafkaEntryFormat=kafka # Get properties to check if the entry format properties setting successfully bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics get-properties persistent://public/default/topic-with-kafka-format
Install client libraries
<!-- Use to decode the Kafka format message --> <dependency> <groupId>io.streamnative.pulsar.handlers</groupId> <artifactId>kafka-payload-processor</artifactId> <version>3.1.0.4</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.1.0</version> </dependency>
Kafka Producer and Pulsar Consumer with Kafka format
import io.streamnative.pulsar.handlers.kop.KafkaPayloadProcessor; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaFormatKafkaProducePulsarConsume { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // replace these configs with your cluster final String kafkaServerUrl = "<Kafka Service Url>"; final String pulsarServerUrl = "<Pulsar Service Http Url>"; final String jwtToken = "<API Key>"; final String token = "token:" + jwtToken; final String topicName = "topic-with-kafka-format"; final String namespace = "public/default"; final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 1. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); // 2. Create a producer final KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 3. Produce messages with Kafka producer for (int i = 0; i < 5; i++) { String value = "hello world"; final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value)); final RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("Send " + value + " to " + recordMetadata); } producer.close(); // 4. Consume messages with Pulsar consumer PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarServerUrl) .authentication(AuthenticationFactory.token(jwtToken)) .build(); Consumer<byte[]> consumer = client.newConsumer() .topic(topicName) .subscriptionName("test") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) // Set the Kafka payload processor to decode the kafka format message .messagePayloadProcessor(new KafkaPayloadProcessor()) .subscribe(); for (int i = 0; i < 5; i++) { Message<byte[]> msg = consumer.receive(); consumer.acknowledge(msg); System.out.println("Receive message " + new String(msg.getData())); } consumer.close(); client.close(); } }
Step to Produce msg with Pulsar and Consume msg with Kafka client
Create topic with pulsar format via Pulsar admin CLI
bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics create-partitioned-topic persistent://public/default/topic-with-pulsar-format -p 3 --metadata kafkaEntryFormat=pulsar bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics get-properties persistent://public/default/topic-with-pulsar-format
Install client libraries
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.1.0</version> </dependency>
Pulsar Producer and Kafka Consumer with Pulsar format
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; public class PulsarFormatPulsarProduceKafkaConsume { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // Replace these configs with your cluster final String kafkaServerUrl = "<Kafka Service Url>"; final String pulsarServerUrl = "<Pulsar Service Http Url>"; final String jwtToken = "<API Key>"; final String token = "token:" + jwtToken; final String topicName = "topic-with-pulsar-format"; final String namespace = "public/default"; // 1. Create a Pulsar producer PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarServerUrl) .authentication(AuthenticationFactory.token(jwtToken)) .build(); Producer<byte[]> producer = client.newProducer() .topic(topicName) .create(); // 2. Produce messages with Pulsar producer for (int i = 0; i < 5; i++) { String message = "my-message-" + i; MessageId msgId = producer.send(message.getBytes()); System.out.println("Publish " + "my-message-" + i + " and message ID " + msgId); } producer.close(); client.close(); // 3. Consume messages with Pulsar consumer final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 4. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(topicName)); // 5. Consume messages with Kafka consumer boolean running = true; while (running) { System.out.println("running"); final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { records.forEach(record -> System.out.println("Receive record: " + record.value() + " from " + record.topic() + "-" + record.partition() + "@" + record.offset())); running = false; } } consumer.close(); } }
Interoperability between Kafka and Pulsar Transactions
Currently, it is not possible to interoperate Kafka and Pulsar transactions together. You must choose one or the other because they use different mechanisms to store transactional states.
Interoperability between Kafka and Pulsar Schema
StreamNative Cloud supports both the Kafka and Pulsar schema registries as central repositories to store registered schema information, which enables producers and consumers to coordinate the schema of a topic's messages through brokers. However, Kafka schemas and Pulsar schemas cannot be used simultaneously due to their differing API definitions and schema storage locations.
We also plan to achieve a unified schema registry, which will support both Kafka and Pulsar schemas. This will allow for the exchangeable production and consumption of messages with schema using both Pulsar and Kafka clients:
- Pulsar consumers will be able to consume messages with schema produced by Kafka producers.
- Kafka consumers will be able to consume messages with schema produced by Pulsar producers.