package org.example;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class SNCloudReadCommittedConsumer {
public static void main(String[] args) {
// Replace these configs for your cluster
String serverUrl = "SERVER-URL";
String jwtToken = "YOUR-API-KEY";
String token = "token:" + jwtToken;
final String topicName = "test-transaction-topic";
String namespace = "public/default";
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
namespace, token));
// Set the isolation level to read_committed, which means only committed messages will be consumed
// If you want to consume both committed and uncommitted messages, set it to read_uncommitted
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Create a consumer
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topicName));
System.out.println("running");
while (true) {
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if (!records.isEmpty()) {
records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
+ record.topic() + "-" + record.partition() + "@" + record.offset()));
}
}
}
}