- Spring Boot
- Tutorial
Build Producer
Create a directory for the Spring Boot application resource file:
mkdir -p src/main/resources
Paste the following configuration data into a file located at src/main/resources/application.yaml
, substituting your cluster bootstrap servers endpoint and the API key and secret that you just created for the username and password fields, respectively, of the spring.kafka.properties.sasl.jaas.config
value.
spring:
kafka:
bootstrap-servers: <BOOTSTRAP SERVERS>
properties:
security:
protocol: SASL_SSL
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username='unused' password='token:<API KEY>';
mechanism: PLAIN
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Create a directory for the Java files in this project:
mkdir -p src/main/java/examples
We will use SpringBootApplication
annotation for ease of use, auto-configuration and component scanning. Paste the following Java code into a file located at src/main/java/examples/SpringBootWithKafkaApplication.java
.
package examples;
import examples.Producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class SpringBootWithKafkaApplication {
private final Producer producer;
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringBootWithKafkaApplication.class);
application.setWebApplicationType(WebApplicationType.NONE);
application.run(args);
}
@Bean
public CommandLineRunner CommandLineRunnerBean() {
return (args) -> {
for (String arg : args) {
switch (arg) {
case "--producer":
this.producer.sendMessage("awalther", "t-shirts");
this.producer.sendMessage("htanaka", "t-shirts");
this.producer.sendMessage("htanaka", "batteries");
this.producer.sendMessage("eabara", "t-shirts");
this.producer.sendMessage("htanaka", "t-shirts");
this.producer.sendMessage("jsmith", "book");
this.producer.sendMessage("awalther", "t-shirts");
this.producer.sendMessage("jsmith", "batteries");
this.producer.sendMessage("jsmith", "gift card");
this.producer.sendMessage("eabara", "t-shirts");
break;
case "--consumer":
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("myConsumer");
listenerContainer.start();
break;
default:
break;
}
}
};
}
@Autowired
SpringBootWithKafkaApplication(Producer producer) {
this.producer = producer;
}
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
}
You can test the syntax before proceding by running the following command:
gradle build
And you should see the following output:
BUILD SUCCESSFUL in 1s