Spring Boot Tutorial
Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)
Prerequisite (Spring Core Concepts)
Spring Boot Core
Spring Boot with REST API
Spring Boot with Database and Data JPA
Spring Boot with Kafka
Spring Boot with AOP
Integrating Apache Kafka with Spring Boot can be straightforward using the Spring Kafka project. Below is a step-by-step guide to create a basic Spring Boot Kafka Consumer.
Firstly, add the necessary dependencies to your Maven pom.xml
:
<!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Inside your application.properties
or application.yml
, add the Kafka configurations:
# Kafka spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Create a Kafka consumer service that listens to a topic:
package com.example.demo.service; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = "myTopic", groupId = "myGroup") public void consumeMessage(String message) { System.out.println("Consumed message: " + message); } }
Your main Spring Boot application class can remain unchanged:
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaConsumerDemoApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerDemoApplication.class, args); } }
Make sure your Kafka server is running. If you're running Kafka locally, it should be accessible at localhost:9092
.
Start your Spring Boot application.
Send a message to the Kafka topic "myTopic" using Kafka's producer or another tool.
Observe the console of your Spring Boot application. You should see the message "Consumed message: [YOUR_MESSAGE]".
With these steps, you have a basic Spring Boot Kafka Consumer set up. The consumer will listen for messages on the "myTopic" topic and print them to the console. You can extend this to process the consumed messages as per your business requirements.
Configuring Kafka consumer in Spring Boot:
// Application properties spring.kafka.consumer.bootstrap-servers=your-kafka-server spring.kafka.consumer.group-id=your-group-id
Deserializing messages with Kafka consumer in Spring Boot:
// Consumer configuration @Bean public ConsumerFactory<String, YourMessageClass> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(YourMessageClass.class)); }
Handling different data formats in Kafka consumer with Spring Boot:
// Consumer configuration for Avro @Bean public ConsumerFactory<String, SpecificRecord> avroConsumerFactory() { // configure Avro deserializer }
Error handling in Spring Boot Kafka consumer:
// Kafka listener with error handling @KafkaListener(topics = "your-topic", groupId = "your-group-id") public void listen(YourMessageClass message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { try { // Process the message } catch (Exception e) { // Handle the exception } }
Grouping and partitioning in Kafka consumer with Spring Boot:
// Consumer configuration with group and partitions spring.kafka.consumer.group-id=your-group-id spring.kafka.listener.concurrency=3
Customizing Kafka consumer properties in Spring Boot:
// Customizing consumer properties spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.max-poll-records=100
Asynchronous and synchronous Kafka message consumption in Spring Boot:
// Asynchronous Kafka listener @KafkaListener(topics = "your-topic", groupId = "your-group-id") public void listenAsync(YourMessageClass message) { // Asynchronous processing }
Integration testing for Kafka consumer in Spring Boot:
// Kafka consumer integration test @SpringBootTest public class KafkaConsumerIntegrationTest { // Test methods }