1. Build Applications
  2. Kafka Clients

Connect to your cluster using the Kafka C client (rdkafka)

Note

This QuickStart assumes that you have created a Pulsar cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to your Pulsar cluster using the Kafka Go client through Token authentication.

Before you begin

Note

  • Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
  • A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
  • The password for different utilities as kcat will be equal to token:TOKEN
  • Get the JWT token.

    1. On the left navigation pane, click Service Accounts.

    2. In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.

  • Get the service URL of your Pulsar cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).

Steps

  1. Install the Kafka C client. See https://github.com/confluentinc/librdkafka?tab=readme-ov-file#installation

  2. Build a C application to produce and consume messages.

    // Note: for code simplicity, this example does not check the return value of
    // the rd_kafka_* APIs.
    #include <librdkafka/rdkafka.h>
    #include <stdatomic.h>
    #include <string.h>
    
    static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) {
      if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
        printf("Sent message %s to %lu\n", (char *)msg->payload, (unsigned long)msg->offset);
        atomic_fetch_add((atomic_int *)opaque, 1);
      } else {
        fprintf(stderr, "Received error: %s\n", rd_kafka_err2str(msg->err));
      }
    }
    
    int main(int argc, char *argv[]) {
      const char *bootstrap_servers = "<bootstrap-servers>";
      const char *topic = "my-topic";
      const char *username = "public";  // the tenant name
      const char *token = "token:<jwt-token>";
      const char *group = "my-group";
      char errstr[1024];
    
      rd_kafka_conf_t *consumer_conf = rd_kafka_conf_new();
      rd_kafka_conf_set(consumer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "group.id", group, errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "sasl.username", username, errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "sasl.password", token, errstr, sizeof(errstr));
    
      rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, consumer_conf, errstr, sizeof(errstr));
      if (!consumer) {
        fprintf(stderr, "Failed to create consumer: %s\n", errstr);
        return 1;
      }
    
      rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
      rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
      rd_kafka_subscribe(consumer, topics);
      rd_kafka_topic_partition_list_destroy(topics);
    
      rd_kafka_conf_t *producer_conf = rd_kafka_conf_new();
      rd_kafka_conf_set(producer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr));
      rd_kafka_conf_set(producer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
      rd_kafka_conf_set(producer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr));
      rd_kafka_conf_set(producer_conf, "sasl.username", username, errstr, sizeof(errstr));
      rd_kafka_conf_set(producer_conf, "sasl.password", token, errstr, sizeof(errstr));
    
      rd_kafka_conf_set_dr_msg_cb(producer_conf, dr_msg_cb);
      atomic_int count = ATOMIC_VAR_INIT(0);
      rd_kafka_conf_set_opaque(producer_conf, &count);
    
      rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, producer_conf, errstr, sizeof(errstr));
      if (!producer) {
        fprintf(stderr, "Failed to create producer: %s\n", errstr);
        return 1;
      }
    
      const int num_messages = 10;
      for (int i = 0; i < 10; i++) {
        char value[128];
        snprintf(value, sizeof(value), "msg-%d", i);
        rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                          RD_KAFKA_V_VALUE(&value[0], strlen(value)), RD_KAFKA_V_OPAQUE(NULL),
                          RD_KAFKA_V_END);
      }
      // Poll until the message is sent
      int num_produce_done;
      while ((num_produce_done = atomic_load(&count)) < num_messages) {
        rd_kafka_poll(producer, 1);
      }
      printf("%d messages are loaded\n", num_messages);
    
      for (int i = 0; i < num_messages;) {
        rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 10);
        if (!msg) continue;
    
        printf("Received msg %s from %s-%d@%lu\n", (char *)msg->payload, topic, msg->partition,
               (unsigned long)msg->offset);
        i++;
        rd_kafka_message_destroy(msg);
      }
    
      rd_kafka_destroy(producer);
      rd_kafka_consumer_close(consumer);
      rd_kafka_destroy(consumer);
      return 0;
    }
    
    • bootstrap-servers: the Kafka service URL of your Pulsar cluster.
    • jwt-token: the JWT token of your service account.

    The code example uses C11 standard, so you need to compile it like:

    gcc main.c -std=c11 -lrdkafka
    
  3. Run the Go application and you should see the following output:

    Sent message msg-0 to 0
    Sent message msg-1 to 1
    Sent message msg-2 to 2
    Sent message msg-3 to 3
    Sent message msg-4 to 4
    Sent message msg-5 to 5
    Sent message msg-6 to 6
    Sent message msg-7 to 7
    Sent message msg-8 to 8
    Sent message msg-9 to 9
    10 messages are loaded
    Received msg msg-0 from my-topic-0@0
    Received msg msg-1 from my-topic-0@1
    Received msg msg-2 from my-topic-0@2
    Received msg msg-3 from my-topic-0@3
    Received msg msg-4 from my-topic-0@4
    Received msg msg-5 from my-topic-0@5
    Received msg msg-6 from my-topic-0@6
    Received msg msg-7 from my-topic-0@7
    Received msg msg-8 from my-topic-0@8
    Received msg msg-9 from my-topic-0@9
    
Previous
Kafka Java