Spring Boot Tutorial

Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)

Prerequisite (Spring Core Concepts)

Spring Boot Core

Spring Boot with REST API

Spring Boot with Database and Data JPA

Spring Boot with Kafka

Spring Boot with AOP

Spring Boot - How to consume JSON messages using Apache Kafka

Apache Kafka is a distributed event streaming platform. Spring Boot, through the Spring Kafka project, provides excellent integration support for working with Kafka. If you want to consume JSON messages from a Kafka topic, follow the steps below.

1. Dependencies:

First, you need to add the required dependencies to your project. If you are using Maven, add the following to your pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. Configuration:

Update your application.properties or application.yml with the necessary Kafka configuration:

# Kafka properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3. Create a Model:

Suppose you're going to consume a JSON message representing a User:

public class User {
    private String name;
    private int age;

    // Getters, setters, constructors...
}

4. Configure Kafka Consumer:

To consume JSON messages, you'll need to configure the Kafka consumer to use the JsonDeserializer.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, User> userConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(User.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        return factory;
    }
}

5. Consume the JSON message:

Create a Kafka listener to consume messages from the desired topic:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "user-topic", groupId = "my-group", containerFactory = "userKafkaListenerContainerFactory")
    public void consumeUser(User user) {
        System.out.println("Consumed user: " + user.getName());
    }
}

In this example, the method consumeUser will be triggered each time a new message arrives at the user-topic Kafka topic.

6. Run Your Application:

With the above configurations and code, run your Spring Boot application. Ensure your Kafka server is running and that the topic user-topic exists (or adjust the topic name in your configuration accordingly).

Now, when JSON messages representing a User object are sent to the user-topic, your Spring Boot application will consume them and print out the consumed user's name.

Remember to handle exceptions and potential issues related to consuming messages in a real-world scenario, such as incorrect message formats or deserialization issues.

  1. Consume JSON messages from Kafka topic in Spring Boot:

    • Description: This involves creating a Kafka consumer in a Spring Boot application to receive JSON messages from a Kafka topic.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId")
      public void consumeJsonMessage(String jsonMessage) {
          // Process the JSON message
      }
      
  2. Deserialize JSON messages in Kafka consumer with Spring Boot:

    • Description: Deserialize the JSON messages received by the Kafka consumer into Java objects.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId")
      public void consumeJsonMessage(@Payload YourObject yourObject) {
          // Process the deserialized object
      }
      
  3. Configuring Kafka consumer for JSON in Spring Boot:

    • Description: Configure the Kafka consumer properties and settings for handling JSON messages.
    • Code Example:
      spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
      spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      spring.kafka.consumer.value-deserializer.delegate=org.springframework.kafka.support.serializer.JsonDeserializer
      
  4. Handling JSON payloads in KafkaListener with Spring Boot:

    • Description: Implement logic within the KafkaListener method to handle the JSON payloads.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId")
      public void handleJsonPayload(YourObject yourObject) {
          // Your logic to handle the JSON payload
      }
      
  5. Using KafkaTemplate to produce JSON messages in Spring Boot:

    • Description: Utilize KafkaTemplate to produce JSON messages to a Kafka topic.
    • Code Example:
      @Autowired
      private KafkaTemplate<String, YourObject> kafkaTemplate;
      
      public void produceJsonMessage(YourObject yourObject) {
          kafkaTemplate.send("yourTopic", yourObject);
      }
      
  6. Customizing JSON deserialization in Spring Boot Kafka consumer:

    • Description: Customize the JSON deserialization process, e.g., by providing custom deserializer configurations.
    • Code Example:
      @Bean
      public JsonDeserializer<YourObject> jsonDeserializer() {
          // Customize and return your JsonDeserializer
          return new YourCustomJsonDeserializer<>();
      }
      
  7. Kafka listener adapter for JSON messages in Spring Boot:

    • Description: Use Kafka listener adapter to further customize the message handling process.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId", containerFactory = "yourContainerFactory")
      public void handleJsonMessage(ConsumerRecord<String, YourObject> record) {
          // Your handling logic
      }
      
  8. Error handling for JSON deserialization in Kafka consumer with Spring Boot:

    • Description: Implement error handling mechanisms for JSON deserialization errors.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId")
      public void handleJsonMessageWithErrorHandling(ConsumerRecord<String, YourObject> record,
                                                    @Header(KafkaHeaders.ERROR_EXCEPTION) Exception exception) {
          // Your error handling logic
      }