1. Spring Boot
  2. Tutorial

Build Producer

Create a directory for the Spring Boot application resource file:

mkdir -p src/main/resources

Paste the following configuration data into a file located at src/main/resources/application.yaml, substituting your cluster bootstrap servers endpoint and the API key and secret that you just created for the username and password fields, respectively, of the spring.kafka.properties.sasl.jaas.config value.

spring:
  kafka:
    bootstrap-servers: <BOOTSTRAP SERVERS>
    properties:
      security:
        protocol: SASL_SSL
      sasl:
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule required username='unused' password='token:<API KEY>';
        mechanism: PLAIN
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Create a directory for the Java files in this project:

mkdir -p src/main/java/examples

We will use SpringBootApplication annotation for ease of use, auto-configuration and component scanning. Paste the following Java code into a file located at src/main/java/examples/SpringBootWithKafkaApplication.java.

package examples;

import examples.Producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.context.annotation.Bean;


@SpringBootApplication
public class SpringBootWithKafkaApplication {

    private final Producer producer;

    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringBootWithKafkaApplication.class);
        application.setWebApplicationType(WebApplicationType.NONE);
        application.run(args);
    }

    @Bean
    public CommandLineRunner CommandLineRunnerBean() {
        return (args) -> {
            for (String arg : args) {
                switch (arg) {
                  case "--producer":
                        this.producer.sendMessage("awalther", "t-shirts");
                        this.producer.sendMessage("htanaka", "t-shirts");
                        this.producer.sendMessage("htanaka", "batteries");
                        this.producer.sendMessage("eabara", "t-shirts");
                        this.producer.sendMessage("htanaka", "t-shirts");
                        this.producer.sendMessage("jsmith", "book");
                        this.producer.sendMessage("awalther", "t-shirts");
                        this.producer.sendMessage("jsmith", "batteries");
                        this.producer.sendMessage("jsmith", "gift card");
                        this.producer.sendMessage("eabara", "t-shirts");
                        break;
                  case "--consumer":
                        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("myConsumer");
                        listenerContainer.start();
                        break;
                  default:
                        break;
                }
            }
        };
    }

    @Autowired
    SpringBootWithKafkaApplication(Producer producer) {
        this.producer = producer;
    }

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

}

You can test the syntax before proceding by running the following command:

gradle build

And you should see the following output:

BUILD SUCCESSFUL in 1s
Previous
Create Topic