// Note: for code simplicity, this example does not check the return value of
// the rd_kafka_* APIs.
#include <librdkafka/rdkafka.h>
#include <stdatomic.h>
#include <string.h>
static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) {
if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
printf("Sent message %s to %lu\n", (char *)msg->payload, (unsigned long)msg->offset);
atomic_fetch_add((atomic_int *)opaque, 1);
} else {
fprintf(stderr, "Received error: %s\n", rd_kafka_err2str(msg->err));
}
}
int main(int argc, char *argv[]) {
const char *bootstrap_servers = "<bootstrap-servers>";
const char *topic = "my-topic";
const char *username = "public"; // the tenant name
const char *token = "token:<api-key>";
const char *group = "my-group";
char errstr[1024];
rd_kafka_conf_t *consumer_conf = rd_kafka_conf_new();
rd_kafka_conf_set(consumer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr));
rd_kafka_conf_set(consumer_conf, "group.id", group, errstr, sizeof(errstr));
rd_kafka_conf_set(consumer_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
rd_kafka_conf_set(consumer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
rd_kafka_conf_set(consumer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr));
rd_kafka_conf_set(consumer_conf, "sasl.username", username, errstr, sizeof(errstr));
rd_kafka_conf_set(consumer_conf, "sasl.password", token, errstr, sizeof(errstr));
// Note: Ursa does not support read_committed for now
rd_kafka_conf_set(consumer_conf, "isolation.level", "read_uncommitted", errstr, sizeof(errstr));
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, consumer_conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\n", errstr);
return 1;
}
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
rd_kafka_subscribe(consumer, topics);
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_conf_t *producer_conf = rd_kafka_conf_new();
rd_kafka_conf_set(producer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr));
rd_kafka_conf_set(producer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
rd_kafka_conf_set(producer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr));
rd_kafka_conf_set(producer_conf, "sasl.username", username, errstr, sizeof(errstr));
rd_kafka_conf_set(producer_conf, "sasl.password", token, errstr, sizeof(errstr));
rd_kafka_conf_set_dr_msg_cb(producer_conf, dr_msg_cb);
atomic_int count = ATOMIC_VAR_INIT(0);
rd_kafka_conf_set_opaque(producer_conf, &count);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, producer_conf, errstr, sizeof(errstr));
if (!producer) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
return 1;
}
const int num_messages = 10;
for (int i = 0; i < 10; i++) {
char value[128];
snprintf(value, sizeof(value), "msg-%d", i);
rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE(&value[0], strlen(value)), RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
}
// Poll until the message is sent
int num_produce_done;
while ((num_produce_done = atomic_load(&count)) < num_messages) {
rd_kafka_poll(producer, 1);
}
printf("%d messages are loaded\n", num_messages);
for (int i = 0; i < num_messages;) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 10);
if (!msg) continue;
printf("Received msg %s from %s-%d@%lu\n", (char *)msg->payload, topic, msg->partition,
(unsigned long)msg->offset);
i++;
rd_kafka_message_destroy(msg);
}
rd_kafka_destroy(producer);
rd_kafka_consumer_close(consumer);
rd_kafka_destroy(consumer);
return 0;
}