Interoperability with Pulsar

Pub/Sub messages between KSN and Apache Pulsar

The open-source KoP supports three entry formats, kafka, mixed_kafka, and pulsar. There are a few differences between each entry format.

kafka format: It provides the best performance, but Pulsar Consumer cannot consume it unless users employ a payload processor. mixed_kafka format: The mixed_kafka format works similarly to the kafka format. It is used to support some non-official Kafka clients for encoding or decoding Kafka messages. The performance is medium. pulsar format: pulsar is the default entryFormat in KoP. It encodes or decodes formats between the Kafka message and the Pulsar message. Therefore, the performance is the worst. The benefit is that both the Kafka and Pulsar client consumers can consume the messages from the Pulsar cluster.

The KSN uses pulsar format by default, and it can support Kafka client to Kafka client or Pulsar client to Kafka client and vice versa. It means the data between KSN and Apache Pulsar are interoperable. It is suitable for most scenarios if performance is not the bottleneck.

KSN also supports specifying the topic-level entry format. The entry format can be specified through topic properties, the config key is kafkaEntryFormat, and the value is kafka, mixed_kafka, or pulsar, the default is pulsar If not specified.

Interoperability of Pulsar and Kafka clients

interoperability-of-pulsar-and-kafka-clients.png

With kafka or mixed_kafka entry format, Kafka Producer can pub/sub directly, but for the Pulsar producer it can't send messages to KSN, because Pulsar producer are not able to encode the kafka format of the message.

However, since Pulsar client version 2.9, we introduced the message payload processor for Pulsar consumers, which means the message from Kafka producer can be consumed and decoded by Pulsar consumers.

For the pulsar format, it can pub/sub between KSN and Apache Pulsar. The message format conversion will automatically be done in the KSN broker, allowing you to migrate from Kafka to Pulsar more flexibly.

Step to Produce msg with Kafka and Consume msg with Pulsar client

Requirements

Pulsar Client Version ≥ 2.9 Pulsar Admin Version ≥ 2.10

Step

  1. Create topic with kafka format via Pulsar admin CLI

    Note

    The mixed_kafka format can be following the same step, and using kafkaEntryFormat=mixed_kafka property.

    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<JWT Token> \
      topics create persistent://public/default/topic-with-kafka-format --metadata kafkaEntryFormat=kafka
    
    # Get properties to check if the entry format properties setting successfully
    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<JWT Token> \
      topics get-properties persistent://public/default/topic-with-kafka-format
    
  2. Install client libraries

    <!-- Use to decode the Kafka format message -->
    <dependency>
        <groupId>io.streamnative.pulsar.handlers</groupId>
        <artifactId>kafka-payload-processor</artifactId>
        <version>3.1.0.4</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>3.1.0</version>
    </dependency>
    
  3. Kafka Producer and Pulsar Consumer with Kafka format

    import io.streamnative.pulsar.handlers.kop.KafkaPayloadProcessor;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.SubscriptionInitialPosition;
    
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class KafkaFormatKafkaProducePulsarConsume {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // replace these configs with your cluster
            final String kafkaServerUrl = "<Kafka Service Url>";
            final String pulsarServerUrl = "<Pulsar Service Http Url>";
            final String jwtToken = "<JWT Token>";
            final String token = "token:" + jwtToken;
    
            final String topicName = "topic-with-kafka-format";
            final String namespace = "public/default";
    
            final Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 1. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // 2. Create a producer
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 3. Produce messages with Kafka producer
            for (int i = 0; i < 5; i++) {
                String value = "hello world";
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
            producer.close();
    
            // 4. Consume messages with Pulsar consumer
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl(pulsarServerUrl)
                    .authentication(AuthenticationFactory.token(jwtToken))
                    .build();
    
            Consumer<byte[]> consumer = client.newConsumer()
                    .topic(topicName)
                    .subscriptionName("test")
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                    // Set the Kafka payload processor to decode the kafka format message
                    .messagePayloadProcessor(new KafkaPayloadProcessor())
                    .subscribe();
    
            for (int i = 0; i < 5; i++) {
                Message<byte[]> msg = consumer.receive();
                consumer.acknowledge(msg);
                System.out.println("Receive message " + new String(msg.getData()));
            }
    
            consumer.close();
            client.close();
        }
    }
    

Step to Produce msg with Pulsar and Consume msg with Kafka client

  1. Create topic with pulsar format via Pulsar admin CLI

    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<JWT Token> \
      topics create-partitioned-topic persistent://public/default/topic-with-pulsar-format -p 3 --metadata kafkaEntryFormat=pulsar
    
    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<JWT Token> \
      topics get-properties persistent://public/default/topic-with-pulsar-format
    
  2. Install client libraries

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>3.1.0</version>
    </dependency>
    
  3. Pulsar Producer and Kafka Consumer with Pulsar format

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClient;
    
    import java.io.IOException;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class PulsarFormatPulsarProduceKafkaConsume {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // Replace these configs with your cluster
            final String kafkaServerUrl = "<Kafka Service Url>";
            final String pulsarServerUrl = "<Pulsar Service Http Url>";
            final String jwtToken = "<JWT Token>";
            final String token = "token:" + jwtToken;
    
            final String topicName = "topic-with-pulsar-format";
            final String namespace = "public/default";
    
            // 1. Create a Pulsar producer
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl(pulsarServerUrl)
                    .authentication(AuthenticationFactory.token(jwtToken))
                    .build();
    
            Producer<byte[]> producer = client.newProducer()
                    .topic(topicName)
                    .create();
    
            // 2. Produce messages with Pulsar producer
            for (int i = 0; i < 5; i++) {
                String message = "my-message-" + i;
                MessageId msgId = producer.send(message.getBytes());
                System.out.println("Publish " + "my-message-" + i
                        + " and message ID " + msgId);
            }
            producer.close();
            client.close();
    
            // 3. Consume messages with Pulsar consumer
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    
            // 4. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(topicName));
    
            // 5. Consume messages with Kafka consumer
            boolean running = true;
            while (running) {
                System.out.println("running");
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                            + record.topic() + "-" + record.partition() + "@" + record.offset()));
                    running = false;
                }
            }
            consumer.close();
        }
    }
    

Interact KSN and Apache Pulsar Transaction

The KSN transaction can't Interact with Apache Pulsar and vice versa. One of them is they are using different places to store the transaction state.

Schema

KSN and Apache Pulsar both have schema registry as a central repository to store the registered schema information, which enables producers/consumers to coordinate the schema of a topic's messages through brokers. However, their schemas cannot be used between KSN and Apache Pulsar, since they have very different API definitions and the schema storage locations of KSN and Apache Pulsar are different.

We also plan to achieve a unified schema registry, which means the KSN will reuse the existing Pulsar schema registry, so we can produce and consume messages with schema using both Pulsar and Kafka clients exchangeable.

  • Pulsar consumers can consume messages with schema produced by Kafka producers.
  • Kafka consumers can consume messages with schema produced by Pulsar producers.
Previous
Compacted Topic