Spring Boot Tutorial
Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)
Prerequisite (Spring Core Concepts)
Spring Boot Core
Spring Boot with REST API
Spring Boot with Database and Data JPA
Spring Boot with Kafka
Spring Boot with AOP
To consume JSON objects from Kafka topics in a Spring Boot application, you'll need to integrate the Spring Kafka library and properly set up your Kafka listener to deserialize JSON messages. Here's a step-by-step guide:
Include the required Spring Kafka and Jackson dependencies in your pom.xml
:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency>
application.properties
or application.yml
:Add Kafka configuration details:
# Kafka properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=*
In the above configuration:
bootstrap-servers
is the address of your Kafka broker(s).group-id
represents the consumer group ID.value-deserializer
is set to the JSON deserializer provided by Spring Kafka.Suppose we're dealing with a User
JSON object:
public class User { private String name; private int age; // Getters, setters, and other boilerplate... }
Create a Kafka listener to consume messages from a particular topic:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = "userTopic", groupId = "myGroup", containerFactory = "userKafkaListenerFactory") public void consumeJson(User user) { System.out.println("Consumed user: " + user); } }
Because we're consuming JSON messages, we need to configure a ConcurrentKafkaListenerContainerFactory
to use the JsonDeserializer
.
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; @Configuration public class KafkaConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerFactory() { ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(userConsumerFactory()); return factory; } public DefaultKafkaConsumerFactory<String, User> userConsumerFactory() { return new DefaultKafkaConsumerFactory<>(Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092", ConsumerConfig.GROUP_ID_CONFIG, "myGroup", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class ), new StringDeserializer(), new JsonDeserializer<>(User.class)); } }
Now, whenever a new JSON message arrives in the userTopic
, the consumeJson
method will be invoked, and the JSON message will be deserialized into a User
object.
Remember, you need a running Kafka instance, and the topic userTopic
should already exist. Ensure proper error handling and any other configurations specific to your use case.
Consume JSON from Kafka topic in Spring Boot example:
@KafkaListener(topics = "my-topic", groupId = "my-group") public void consumeJsonMessage(MyJsonModel jsonModel) { // Process the JSON message }
Deserialize JSON messages in Kafka consumer with Spring Boot:
public class MyJsonModel { // Model representing JSON structure }
Configuring Kafka consumer for JSON in Spring Boot:
application.properties
or application.yml
.application.properties
):spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Handling JSON payloads in KafkaListener with Spring Boot:
@Payload
to access the payload of a Kafka message.@KafkaListener(topics = "my-topic", groupId = "my-group") public void consumeJsonMessage(@Payload MyJsonModel jsonModel) { // Process the JSON message }
Using KafkaTemplate to produce JSON messages in Spring Boot:
KafkaTemplate
to send JSON messages to a Kafka topic.@Autowired private KafkaTemplate<String, MyJsonModel> kafkaTemplate; public void produceJsonMessage(MyJsonModel jsonModel) { kafkaTemplate.send("my-topic", jsonModel); }
Customizing JSON deserialization in Spring Boot Kafka consumer:
public class CustomJsonDeserializer implements Deserializer<MyJsonModel> { // Custom deserialization logic }
Kafka listener adapter for JSON messages in Spring Boot:
@KafkaListener
with a custom listener adapter.@KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "jsonListenerContainerFactory") public void consumeJsonMessage(MyJsonModel jsonModel) { // Process the JSON message }
Error handling for JSON deserialization in Kafka consumer:
ErrorHandlingDeserializer2
.application.properties
):spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2