- Build Applications
- Kafka Clients
Connect to your cluster using the Kafka C client (rdkafka)
Note
This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce
and consume
permissions to a namespace for the target topic.
This document describes how to connect to your StreamNative cluster using the Kafka C client using API Keys authentication.
Before you begin
Note
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal totoken:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
Install the Kafka C client. See https://github.com/confluentinc/librdkafka?tab=readme-ov-file#installation
Build a C application to produce and consume messages.
// Note: for code simplicity, this example does not check the return value of // the rd_kafka_* APIs. #include <librdkafka/rdkafka.h> #include <stdatomic.h> #include <string.h> static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) { if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) { printf("Sent message %s to %lu\n", (char *)msg->payload, (unsigned long)msg->offset); atomic_fetch_add((atomic_int *)opaque, 1); } else { fprintf(stderr, "Received error: %s\n", rd_kafka_err2str(msg->err)); } } int main(int argc, char *argv[]) { const char *bootstrap_servers = "<bootstrap-servers>"; const char *topic = "my-topic"; const char *username = "public"; // the tenant name const char *token = "token:<api-key>"; const char *group = "my-group"; char errstr[1024]; rd_kafka_conf_t *consumer_conf = rd_kafka_conf_new(); rd_kafka_conf_set(consumer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr)); rd_kafka_conf_set(consumer_conf, "group.id", group, errstr, sizeof(errstr)); rd_kafka_conf_set(consumer_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr)); rd_kafka_conf_set(consumer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)); rd_kafka_conf_set(consumer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)); rd_kafka_conf_set(consumer_conf, "sasl.username", username, errstr, sizeof(errstr)); rd_kafka_conf_set(consumer_conf, "sasl.password", token, errstr, sizeof(errstr)); rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, consumer_conf, errstr, sizeof(errstr)); if (!consumer) { fprintf(stderr, "Failed to create consumer: %s\n", errstr); return 1; } rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA); rd_kafka_subscribe(consumer, topics); rd_kafka_topic_partition_list_destroy(topics); rd_kafka_conf_t *producer_conf = rd_kafka_conf_new(); rd_kafka_conf_set(producer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr)); rd_kafka_conf_set(producer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)); rd_kafka_conf_set(producer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)); rd_kafka_conf_set(producer_conf, "sasl.username", username, errstr, sizeof(errstr)); rd_kafka_conf_set(producer_conf, "sasl.password", token, errstr, sizeof(errstr)); rd_kafka_conf_set_dr_msg_cb(producer_conf, dr_msg_cb); atomic_int count = ATOMIC_VAR_INIT(0); rd_kafka_conf_set_opaque(producer_conf, &count); rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, producer_conf, errstr, sizeof(errstr)); if (!producer) { fprintf(stderr, "Failed to create producer: %s\n", errstr); return 1; } const int num_messages = 10; for (int i = 0; i < 10; i++) { char value[128]; snprintf(value, sizeof(value), "msg-%d", i); rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&value[0], strlen(value)), RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END); } // Poll until the message is sent int num_produce_done; while ((num_produce_done = atomic_load(&count)) < num_messages) { rd_kafka_poll(producer, 1); } printf("%d messages are loaded\n", num_messages); for (int i = 0; i < num_messages;) { rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 10); if (!msg) continue; printf("Received msg %s from %s-%d@%lu\n", (char *)msg->payload, topic, msg->partition, (unsigned long)msg->offset); i++; rd_kafka_message_destroy(msg); } rd_kafka_destroy(producer); rd_kafka_consumer_close(consumer); rd_kafka_destroy(consumer); return 0; }
<bootstrap-servers>
: the Kafka service URL of your StreamNative cluster.<api-key>
: an API key of your service account.
The code example uses C11 standard, so you need to compile it like:
gcc main.c -std=c11 -lrdkafka
Run the Go application and you should see the following output:
Sent message msg-0 to 0 Sent message msg-1 to 1 Sent message msg-2 to 2 Sent message msg-3 to 3 Sent message msg-4 to 4 Sent message msg-5 to 5 Sent message msg-6 to 6 Sent message msg-7 to 7 Sent message msg-8 to 8 Sent message msg-9 to 9 10 messages are loaded Received msg msg-0 from my-topic-0@0 Received msg msg-1 from my-topic-0@1 Received msg msg-2 from my-topic-0@2 Received msg msg-3 from my-topic-0@3 Received msg msg-4 from my-topic-0@4 Received msg msg-5 from my-topic-0@5 Received msg msg-6 from my-topic-0@6 Received msg msg-7 from my-topic-0@7 Received msg msg-8 from my-topic-0@8 Received msg msg-9 from my-topic-0@9