Spring Boot Tutorial
Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)
Prerequisite (Spring Core Concepts)
Spring Boot Core
Spring Boot with REST API
Spring Boot with Database and Data JPA
Spring Boot with Kafka
Spring Boot with AOP
Apache Kafka is a distributed event streaming platform. Spring Boot, through the Spring Kafka project, provides excellent integration support for working with Kafka. If you want to consume JSON messages from a Kafka topic, follow the steps below.
First, you need to add the required dependencies to your project. If you are using Maven, add the following to your pom.xml
:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Update your application.properties
or application.yml
with the necessary Kafka configuration:
# Kafka properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Suppose you're going to consume a JSON message representing a User
:
public class User { private String name; private int age; // Getters, setters, constructors... }
To consume JSON messages, you'll need to configure the Kafka consumer to use the JsonDeserializer
.
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ConsumerFactory<String, User> userConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(userConsumerFactory()); return factory; } }
Create a Kafka listener to consume messages from the desired topic:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = "user-topic", groupId = "my-group", containerFactory = "userKafkaListenerContainerFactory") public void consumeUser(User user) { System.out.println("Consumed user: " + user.getName()); } }
In this example, the method consumeUser
will be triggered each time a new message arrives at the user-topic
Kafka topic.
With the above configurations and code, run your Spring Boot application. Ensure your Kafka server is running and that the topic user-topic
exists (or adjust the topic name in your configuration accordingly).
Now, when JSON messages representing a User
object are sent to the user-topic
, your Spring Boot application will consume them and print out the consumed user's name.
Remember to handle exceptions and potential issues related to consuming messages in a real-world scenario, such as incorrect message formats or deserialization issues.
Consume JSON messages from Kafka topic in Spring Boot:
@KafkaListener(topics = "yourTopic", groupId = "yourGroupId") public void consumeJsonMessage(String jsonMessage) { // Process the JSON message }
Deserialize JSON messages in Kafka consumer with Spring Boot:
@KafkaListener(topics = "yourTopic", groupId = "yourGroupId") public void consumeJsonMessage(@Payload YourObject yourObject) { // Process the deserialized object }
Configuring Kafka consumer for JSON in Spring Boot:
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 spring.kafka.consumer.value-deserializer.delegate=org.springframework.kafka.support.serializer.JsonDeserializer
Handling JSON payloads in KafkaListener with Spring Boot:
@KafkaListener(topics = "yourTopic", groupId = "yourGroupId") public void handleJsonPayload(YourObject yourObject) { // Your logic to handle the JSON payload }
Using KafkaTemplate to produce JSON messages in Spring Boot:
@Autowired private KafkaTemplate<String, YourObject> kafkaTemplate; public void produceJsonMessage(YourObject yourObject) { kafkaTemplate.send("yourTopic", yourObject); }
Customizing JSON deserialization in Spring Boot Kafka consumer:
@Bean public JsonDeserializer<YourObject> jsonDeserializer() { // Customize and return your JsonDeserializer return new YourCustomJsonDeserializer<>(); }
Kafka listener adapter for JSON messages in Spring Boot:
@KafkaListener(topics = "yourTopic", groupId = "yourGroupId", containerFactory = "yourContainerFactory") public void handleJsonMessage(ConsumerRecord<String, YourObject> record) { // Your handling logic }
Error handling for JSON deserialization in Kafka consumer with Spring Boot:
@KafkaListener(topics = "yourTopic", groupId = "yourGroupId") public void handleJsonMessageWithErrorHandling(ConsumerRecord<String, YourObject> record, @Header(KafkaHeaders.ERROR_EXCEPTION) Exception exception) { // Your error handling logic }