Spring Boot Tutorial

Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)

Prerequisite (Spring Core Concepts)

Spring Boot Core

Spring Boot with REST API

Spring Boot with Database and Data JPA

Spring Boot with Kafka

Spring Boot with AOP

Spring Boot Kafka Consumer Example

Integrating Apache Kafka with Spring Boot can be straightforward using the Spring Kafka project. Below is a step-by-step guide to create a basic Spring Boot Kafka Consumer.

1. Dependencies:

Firstly, add the necessary dependencies to your Maven pom.xml:

<!-- Spring Boot Starter Web -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. Configuration:

Inside your application.properties or application.yml, add the Kafka configurations:

# Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3. Kafka Consumer:

Create a Kafka consumer service that listens to a topic:

package com.example.demo.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void consumeMessage(String message) {
        System.out.println("Consumed message: " + message);
    }
}

4. Main Application:

Your main Spring Boot application class can remain unchanged:

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaConsumerDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerDemoApplication.class, args);
    }
}

5. Running the Application:

  1. Make sure your Kafka server is running. If you're running Kafka locally, it should be accessible at localhost:9092.

  2. Start your Spring Boot application.

  3. Send a message to the Kafka topic "myTopic" using Kafka's producer or another tool.

  4. Observe the console of your Spring Boot application. You should see the message "Consumed message: [YOUR_MESSAGE]".

With these steps, you have a basic Spring Boot Kafka Consumer set up. The consumer will listen for messages on the "myTopic" topic and print them to the console. You can extend this to process the consumed messages as per your business requirements.

  1. Configuring Kafka consumer in Spring Boot:

    • Description: Setting up a Kafka consumer in a Spring Boot application involves configuring the necessary properties and dependencies.
    • Code:
      // Application properties
      spring.kafka.consumer.bootstrap-servers=your-kafka-server
      spring.kafka.consumer.group-id=your-group-id
      
  2. Deserializing messages with Kafka consumer in Spring Boot:

    • Description: Deserialization is crucial to convert Kafka messages into usable objects in your Spring Boot application.
    • Code:
      // Consumer configuration
      @Bean
      public ConsumerFactory<String, YourMessageClass> consumerFactory() {
          return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                  new JsonDeserializer<>(YourMessageClass.class));
      }
      
  3. Handling different data formats in Kafka consumer with Spring Boot:

    • Description: Exploring options to handle various data formats like JSON, Avro, or others in a Kafka consumer.
    • Code:
      // Consumer configuration for Avro
      @Bean
      public ConsumerFactory<String, SpecificRecord> avroConsumerFactory() {
          // configure Avro deserializer
      }
      
  4. Error handling in Spring Boot Kafka consumer:

    • Description: Implementing error handling mechanisms to deal with exceptions and errors during Kafka message consumption.
    • Code:
      // Kafka listener with error handling
      @KafkaListener(topics = "your-topic", groupId = "your-group-id")
      public void listen(YourMessageClass message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
          try {
              // Process the message
          } catch (Exception e) {
              // Handle the exception
          }
      }
      
  5. Grouping and partitioning in Kafka consumer with Spring Boot:

    • Description: Understanding how Kafka consumers can be grouped and messages can be partitioned for parallel processing.
    • Code:
      // Consumer configuration with group and partitions
      spring.kafka.consumer.group-id=your-group-id
      spring.kafka.listener.concurrency=3
      
  6. Customizing Kafka consumer properties in Spring Boot:

    • Description: Fine-tuning Kafka consumer properties to match the specific requirements of your application.
    • Code:
      // Customizing consumer properties
      spring.kafka.consumer.auto-offset-reset=earliest
      spring.kafka.consumer.max-poll-records=100
      
  7. Asynchronous and synchronous Kafka message consumption in Spring Boot:

    • Description: Exploring both asynchronous and synchronous approaches to consume Kafka messages based on application needs.
    • Code:
      // Asynchronous Kafka listener
      @KafkaListener(topics = "your-topic", groupId = "your-group-id")
      public void listenAsync(YourMessageClass message) {
          // Asynchronous processing
      }
      
  8. Integration testing for Kafka consumer in Spring Boot:

    • Description: Writing integration tests to ensure the correct functioning of the Kafka consumer in a Spring Boot application.
    • Code:
      // Kafka consumer integration test
      @SpringBootTest
      public class KafkaConsumerIntegrationTest {
          // Test methods
      }