Spring Boot Tutorial

Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)

Prerequisite (Spring Core Concepts)

Spring Boot Core

Spring Boot with REST API

Spring Boot with Database and Data JPA

Spring Boot with Kafka

Spring Boot with AOP

Spring Boot - How to consume string messages using Apache Kafka

Consuming string messages from a Kafka topic using Spring Boot is quite straightforward. Here's a step-by-step guide:

1. Dependencies:

First, you need to add the required dependencies. If you're using Maven, add these dependencies to your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. Configuration:

Update your application.properties or application.yml with the necessary Kafka configurations:

# Kafka properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

The above configuration sets the Kafka bootstrap servers (brokers) to localhost:9092, which is the default for a local Kafka instance. The group-id is used to identify the consumer group, which is useful for scaling your application and for ensuring message delivery semantics.

3. Consume the String message:

Next, you'll create a service that contains a Kafka listener method to consume string messages from a specified topic:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "string-topic", groupId = "my-group")
    public void consumeStringMessage(String message) {
        System.out.println("Consumed message: " + message);
    }
}

In this example, the method consumeStringMessage will be triggered each time a new message arrives on the string-topic Kafka topic.

4. Run Your Application:

With the above configurations and code in place, run your Spring Boot application. Ensure your Kafka server is running and that the topic string-topic exists (or adjust the topic name in your configuration accordingly).

When string messages are published to the string-topic, your Spring Boot application will consume and print them out.

Additional Notes:

  1. The above example uses default configurations and serializers. You can extend and customize as needed.
  2. Ensure that your Kafka server (broker) is running and accessible.
  3. It's a good idea to add error handling and manage exceptions, especially for cases where message processing might fail.
  4. If your consumer needs to process messages in batches, you can adjust the @KafkaListener settings and the method signature accordingly.
  5. You can use the @KafkaListener's concurrency attribute to define the number of threads that should be used for consumption if you need to scale the message processing.
  1. Consume string messages from Kafka topic in Spring Boot:

    • Description: Receive and process string messages from a Kafka topic in a Spring Boot application.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId")
      public void consumeStringMessage(String message) {
          // Process the string message
      }
      
  2. Configuring Kafka consumer for string messages in Spring Boot:

    • Description: Configure the Kafka consumer properties and settings to handle string messages.
    • Code Example:
      spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
      spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
      
  3. Handling string payloads in KafkaListener with Spring Boot:

    • Description: Implement logic within the KafkaListener method to handle string payloads.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId")
      public void handleStringPayload(String message) {
          // Your logic to handle the string payload
      }
      
  4. Using KafkaTemplate to produce string messages in Spring Boot:

    • Description: Utilize KafkaTemplate to produce string messages to a Kafka topic.
    • Code Example:
      @Autowired
      private KafkaTemplate<String, String> kafkaTemplate;
      
      public void produceStringMessage(String message) {
          kafkaTemplate.send("yourTopic", message);
      }
      
  5. Customizing string deserialization in Spring Boot Kafka consumer:

    • Description: Customize the string deserialization process, e.g., by providing a custom deserializer.
    • Code Example:
      @Bean
      public StringDeserializer stringDeserializer() {
          // Customize and return your StringDeserializer
          return new YourCustomStringDeserializer();
      }
      
  6. Kafka listener adapter for string messages in Spring Boot:

    • Description: Use Kafka listener adapter to further customize the message handling process for string messages.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId", containerFactory = "yourContainerFactory")
      public void handleStringMessage(ConsumerRecord<String, String> record) {
          // Your handling logic for string messages
      }
      
  7. Error handling for string message consumption in Kafka consumer with Spring Boot:

    • Description: Implement error handling mechanisms for issues during string message consumption.
    • Code Example:
      @KafkaListener(topics = "yourTopic", groupId = "yourGroupId")
      public void handleStringMessageWithErrorHandling(String message, @Header(KafkaHeaders.ERROR_EXCEPTION) Exception exception) {
          // Your error handling logic for string message consumption
      }