Spring Boot Tutorial

Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)

Prerequisite (Spring Core Concepts)

Spring Boot Core

Spring Boot with REST API

Spring Boot with Database and Data JPA

Spring Boot with Kafka

Spring Boot with AOP

Spring Boot - Consume JSON Object From Kafka Topics

To consume JSON objects from Kafka topics in a Spring Boot application, you'll need to integrate the Spring Kafka library and properly set up your Kafka listener to deserialize JSON messages. Here's a step-by-step guide:

1. Add Dependencies:

Include the required Spring Kafka and Jackson dependencies in your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-json</artifactId>
</dependency>

2. Configure application.properties or application.yml:

Add Kafka configuration details:

# Kafka properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

In the above configuration:

  • bootstrap-servers is the address of your Kafka broker(s).
  • group-id represents the consumer group ID.
  • value-deserializer is set to the JSON deserializer provided by Spring Kafka.

3. Create the JSON Object:

Suppose we're dealing with a User JSON object:

public class User {
    private String name;
    private int age;

    // Getters, setters, and other boilerplate...
}

4. Set Up the Kafka Listener:

Create a Kafka listener to consume messages from a particular topic:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "userTopic", groupId = "myGroup", containerFactory = "userKafkaListenerFactory")
    public void consumeJson(User user) {
        System.out.println("Consumed user: " + user);
    }
}

5. Configure KafkaListenerFactory:

Because we're consuming JSON messages, we need to configure a ConcurrentKafkaListenerContainerFactory to use the JsonDeserializer.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        return factory;
    }

    public DefaultKafkaConsumerFactory<String, User> userConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
            ConsumerConfig.GROUP_ID_CONFIG, "myGroup",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class
        ), new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
}

Now, whenever a new JSON message arrives in the userTopic, the consumeJson method will be invoked, and the JSON message will be deserialized into a User object.

Remember, you need a running Kafka instance, and the topic userTopic should already exist. Ensure proper error handling and any other configurations specific to your use case.

  1. Consume JSON from Kafka topic in Spring Boot example:

    • Set up a Kafka consumer to receive JSON messages.
    • Example:
      @KafkaListener(topics = "my-topic", groupId = "my-group")
      public void consumeJsonMessage(MyJsonModel jsonModel) {
          // Process the JSON message
      }
      
  2. Deserialize JSON messages in Kafka consumer with Spring Boot:

    • Spring Boot automatically deserializes JSON messages to Java objects.
    • Example:
      public class MyJsonModel {
          // Model representing JSON structure
      }
      
  3. Configuring Kafka consumer for JSON in Spring Boot:

    • Configure Kafka consumer properties in application.properties or application.yml.
    • Example (application.properties):
      spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
      
  4. Handling JSON payloads in KafkaListener with Spring Boot:

    • Use @Payload to access the payload of a Kafka message.
    • Example:
      @KafkaListener(topics = "my-topic", groupId = "my-group")
      public void consumeJsonMessage(@Payload MyJsonModel jsonModel) {
          // Process the JSON message
      }
      
  5. Using KafkaTemplate to produce JSON messages in Spring Boot:

    • Use KafkaTemplate to send JSON messages to a Kafka topic.
    • Example:
      @Autowired
      private KafkaTemplate<String, MyJsonModel> kafkaTemplate;
      
      public void produceJsonMessage(MyJsonModel jsonModel) {
          kafkaTemplate.send("my-topic", jsonModel);
      }
      
  6. Customizing JSON deserialization in Spring Boot Kafka consumer:

    • Customize deserialization by providing a custom deserializer.
    • Example:
      public class CustomJsonDeserializer implements Deserializer<MyJsonModel> {
          // Custom deserialization logic
      }
      
  7. Kafka listener adapter for JSON messages in Spring Boot:

    • Use @KafkaListener with a custom listener adapter.
    • Example:
      @KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "jsonListenerContainerFactory")
      public void consumeJsonMessage(MyJsonModel jsonModel) {
          // Process the JSON message
      }
      
  8. Error handling for JSON deserialization in Kafka consumer:

    • Handle errors during deserialization using ErrorHandlingDeserializer2.
    • Example (application.properties):
      spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2