- Java
- Tutorial
Build Producer
Create a directory for the Java files in this project:
mkdir -p src/main/java/io/streamnative/developer
Let's create the Java producer application by pasting the following code into a file src/main/java/io/streamnative/developer/ProducerExample.java
.
package io.streamnative.developer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Random;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
import static org.apache.kafka.common.config.SaslConfigs.*;
public class ProducerExample {
public static void main(final String[] args) {
final Properties props = new Properties() {{
// User-specific properties that you must set
put(BOOTSTRAP_SERVERS_CONFIG, "<BOOTSTRAP SERVERS>");
put(SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='unused' password='token:<API KEY>';");
// Fixed properties
put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
put(ACKS_CONFIG, "all");
put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
put(SASL_MECHANISM, "PLAIN");
}};
final String topic = "purchases";
String[] users = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
String[] items = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};
try (final Producer<String, String> producer = new KafkaProducer<>(props)) {
final Random rnd = new Random();
final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
String user = users[rnd.nextInt(users.length)];
String item = items[rnd.nextInt(items.length)];
producer.send(
new ProducerRecord<>(topic, user, item),
(event, ex) -> {
if (ex != null)
ex.printStackTrace();
else
System.out.printf("Produced event to topic %s: key = %-10s value = %s%n", topic, user, item);
});
}
System.out.printf("%s events were produced to topic %s%n", numMessages, topic);
}
}
}
Fill in the appropriate <BOOTSTRAP SERVERS>
endpoint and <API KEY>
in the BOOTSTRAP_SERVERS_CONFIG
and SASL_JAAS_CONFIG
properties where the client configuration props
object is created.
You can test the syntax before proceding by running the following command:
gradle build
And you should see the following output:
BUILD SUCCESSFUL in 1s