Interoperability with Pulsar
Pub/Sub messages between KSN and Apache Pulsar
The open-source KoP supports three entry formats, kafka
, mixed_kafka
, and pulsar
. There are a few differences between each entry format.
kafka
format: It provides the best performance, but Pulsar Consumer cannot consume it unless users employ a payload processor. mixed_kafka
format: The mixed_kafka
format works similarly to the kafka
format. It is used to support some non-official Kafka clients for encoding or decoding Kafka messages. The performance is medium. pulsar
format: pulsar
is the default entryFormat
in KoP. It encodes or decodes formats between the Kafka message and the Pulsar message. Therefore, the performance is the worst. The benefit is that both the Kafka and Pulsar client consumers can consume the messages from the Pulsar cluster.
The KSN uses pulsar format by default, and it can support Kafka client to Kafka client or Pulsar client to Kafka client and vice versa. It means the data between KSN and Apache Pulsar are interoperable. It is suitable for most scenarios if performance is not the bottleneck.
KSN also supports specifying the topic-level entry format. The entry format can be specified through topic properties, the config key is kafkaEntryFormat
, and the value is kafka
, mixed_kafka
, or pulsar
, the default is pulsar
If not specified.
Interoperability of Pulsar and Kafka clients
With kafka or mixed_kafka entry format, Kafka Producer can pub/sub directly, but for the Pulsar producer it can't send messages to KSN, because Pulsar producer are not able to encode the kafka format of the message.
However, since Pulsar client version 2.9, we introduced the message payload processor for Pulsar consumers, which means the message from Kafka producer can be consumed and decoded by Pulsar consumers.
For the pulsar format, it can pub/sub between KSN and Apache Pulsar. The message format conversion will automatically be done in the KSN broker, allowing you to migrate from Kafka to Pulsar more flexibly.
Step to Produce msg with Kafka and Consume msg with Pulsar client
Requirements
Pulsar Client Version ≥ 2.9 Pulsar Admin Version ≥ 2.10
Step
Create topic with kafka format via Pulsar admin CLI
Note
The mixed_kafka format can be following the same step, and using
kafkaEntryFormat=mixed_kafka
property.bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<JWT Token> \ topics create persistent://public/default/topic-with-kafka-format --metadata kafkaEntryFormat=kafka # Get properties to check if the entry format properties setting successfully bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<JWT Token> \ topics get-properties persistent://public/default/topic-with-kafka-format
Install client libraries
<!-- Use to decode the Kafka format message --> <dependency> <groupId>io.streamnative.pulsar.handlers</groupId> <artifactId>kafka-payload-processor</artifactId> <version>3.1.0.4</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.1.0</version> </dependency>
Kafka Producer and Pulsar Consumer with Kafka format
import io.streamnative.pulsar.handlers.kop.KafkaPayloadProcessor; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaFormatKafkaProducePulsarConsume { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // replace these configs with your cluster final String kafkaServerUrl = "<Kafka Service Url>"; final String pulsarServerUrl = "<Pulsar Service Http Url>"; final String jwtToken = "<JWT Token>"; final String token = "token:" + jwtToken; final String topicName = "persistent://public/default/topic-with-kafka-format"; final String namespace = "public/default"; final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 1. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); // 2. Create a producer final KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 3. Produce messages with Kafka producer for (int i = 0; i < 5; i++) { String value = "hello world"; final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value)); final RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("Send " + value + " to " + recordMetadata); } producer.close(); // 4. Consume messages with Pulsar consumer PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarServerUrl) .authentication(AuthenticationFactory.token(jwtToken)) .build(); Consumer<byte[]> consumer = client.newConsumer() .topic(topicName) .subscriptionName("test") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) // Set the Kafka payload processor to decode the kafka format message .messagePayloadProcessor(new KafkaPayloadProcessor()) .subscribe(); for (int i = 0; i < 5; i++) { Message<byte[]> msg = consumer.receive(); consumer.acknowledge(msg); System.out.println("Receive message " + new String(msg.getData())); } consumer.close(); client.close(); } }
Step to Produce msg with Pulsar and Consume msg with Kafka client
Create topic with pulsar format via Pulsar admin CLI
bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<JWT Token> \ topics create-partitioned-topic persistent://public/default/topic-with-pulsar-format -p 3 --metadata kafkaEntryFormat=pulsar bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<JWT Token> \ topics get-properties persistent://public/default/topic-with-pulsar-format
Install client libraries
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.1.0</version> </dependency>
Pulsar Producer and Kafka Consumer with Pulsar format
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; public class PulsarFormatPulsarProduceKafkaConsume { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // Replace these configs with your cluster final String kafkaServerUrl = "<Kafka Service Url>"; final String pulsarServerUrl = "<Pulsar Service Http Url>"; final String jwtToken = "<JWT Token>"; final String token = "token:" + jwtToken; final String topicName = "persistent://public/default/topic-with-pulsar-format"; final String namespace = "public/default"; // 1. Create a Pulsar producer PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarServerUrl) .authentication(AuthenticationFactory.token(jwtToken)) .build(); Producer<byte[]> producer = client.newProducer() .topic(topicName) .create(); // 2. Produce messages with Pulsar producer for (int i = 0; i < 5; i++) { String message = "my-message-" + i; MessageId msgId = producer.send(message.getBytes()); System.out.println("Publish " + "my-message-" + i + " and message ID " + msgId); } producer.close(); client.close(); // 3. Consume messages with Pulsar consumer final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 4. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(topicName)); // 5. Consume messages with Kafka consumer boolean running = true; while (running) { System.out.println("running"); final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { records.forEach(record -> System.out.println("Receive record: " + record.value() + " from " + record.topic() + "-" + record.partition() + "@" + record.offset())); running = false; } } consumer.close(); } }
Interact KSN and Apache Pulsar Transaction
The KSN transaction can't Interact with Apache Pulsar and vice versa. One of them is they are using different places to store the transaction state.
Schema
KSN and Apache Pulsar both have schema registry as a central repository to store the registered schema information, which enables producers/consumers to coordinate the schema of a topic's messages through brokers. However, their schemas cannot be used between KSN and Apache Pulsar, since they have very different API definitions and the schema storage locations of KSN and Apache Pulsar are different.
We also plan to achieve a unified schema registry, which means the KSN will reuse the existing Pulsar schema registry, so we can produce and consume messages with schema using both Pulsar and Kafka clients exchangeable.
- Pulsar consumers can consume messages with schema produced by Kafka producers.
- Kafka consumers can consume messages with schema produced by Pulsar producers.