Spring Boot Tutorial

Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)

Prerequisite (Spring Core Concepts)

Spring Boot Core

Spring Boot with REST API

Spring Boot with Database and Data JPA

Spring Boot with Kafka

Spring Boot with AOP

Spring Boot - How to publish JSON messages on Apache Kafka

Publishing JSON messages to Apache Kafka using Spring Boot involves a few steps, mainly setting up the necessary configurations and using Kafka templates to send the messages. Here's a step-by-step guide:

1. Dependencies:

First, add the required dependencies. If you're using Maven, add these to your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. Configuration:

Update your application.properties or application.yml with the necessary Kafka configurations:

# Kafka properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

The above configuration sets the Kafka bootstrap servers to localhost:9092 and specifies serializers for both key and value. The key will be serialized as a string, and the value (our JSON message) will use Spring Kafka's JsonSerializer.

3. Model:

Suppose you want to publish a JSON message representing a User. Here's a simple model:

public class User {
    private String name;
    private int age;

    // Getters, setters, constructors...
}

4. Kafka Producer:

Now, you'll need a service or component to produce (send) messages to Kafka:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendUser(User user) {
        kafkaTemplate.send("user-topic", user);
    }
}

In this example, the sendUser method will send a User object to the user-topic Kafka topic.

5. Use the Producer:

To use the producer, you can inject KafkaProducerService into another Spring component, like a REST controller, and call the sendUser method:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {

    @Autowired
    private KafkaProducerService producerService;

    @PostMapping("/users")
    public String createUser(@RequestBody User user) {
        producerService.sendUser(user);
        return "User sent to Kafka!";
    }
}

In the above code, a POST request to /users will send the provided user data to the user-topic in Kafka.

6. Run Your Application:

Start your Spring Boot application, ensuring your Kafka server is running and that the topic user-topic exists (or adjust the topic name in your configuration accordingly). When you send a POST request to /users with a valid user JSON, the message will be produced to the Kafka topic.

Additional Notes:

  1. Always add error handling and manage exceptions, especially for potential issues related to publishing messages.
  2. Adjust configurations, topics, and serializers as needed depending on your use case and Kafka setup.
  3. For complex scenarios, consider enhancing the Kafka producer settings for things like retries, acks, etc.
  1. Publish JSON messages to Kafka topic in Spring Boot:

    • Description: Produce and send JSON messages to a Kafka topic in a Spring Boot application.
    • Code Example:
      @Autowired
      private KafkaTemplate<String, YourObject> kafkaTemplate;
      
      public void publishJsonMessage(YourObject yourObject) {
          kafkaTemplate.send("yourTopic", yourObject);
      }
      
  2. Serialize JSON messages with KafkaTemplate in Spring Boot:

    • Description: Serialize Java objects to JSON format when using KafkaTemplate for message production.
    • Code Example:
      @Configuration
      public class KafkaConfig {
          @Bean
          public KafkaTemplate<String, YourObject> kafkaTemplate() {
              return new KafkaTemplate<>(producerFactory(), new StringSerializer(), new JsonSerializer<>());
          }
      }
      
  3. Configuring Kafka producer for JSON in Spring Boot:

    • Description: Configure the Kafka producer properties and settings for handling JSON messages.
    • Code Example:
      spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
      spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
      
  4. Using custom serializers for JSON in Spring Boot Kafka producer:

    • Description: Implement and use custom serializers for more control over the JSON serialization process.
    • Code Example:
      public class YourCustomJsonSerializer extends JsonSerializer<YourObject> {
          // Implement your custom serialization logic
      }
      
  5. Error handling for JSON message production in Kafka with Spring Boot:

    • Description: Implement error handling mechanisms for issues during JSON message production.
    • Code Example:
      @Autowired
      private KafkaTemplate<String, YourObject> kafkaTemplate;
      
      public void publishJsonMessageWithErrorHandler(YourObject yourObject) {
          ListenableFuture<SendResult<String, YourObject>> future = kafkaTemplate.send("yourTopic", yourObject);
          future.addCallback(new ListenableFutureCallback<>() {
              @Override
              public void onSuccess(SendResult<String, YourObject> result) {
                  // Success handling logic
              }
      
              @Override
              public void onFailure(Throwable ex) {
                  // Error handling logic
              }
          });
      }
      
  6. Asynchronous and synchronous JSON message publishing in Spring Boot:

    • Description: Choose between asynchronous and synchronous methods for publishing JSON messages.
    • Code Example (Asynchronous):
      @Autowired
      private KafkaTemplate<String, YourObject> kafkaTemplate;
      
      public void publishJsonMessageAsync(YourObject yourObject) {
          ListenableFuture<SendResult<String, YourObject>> future = kafkaTemplate.send("yourTopic", yourObject);
          future.addCallback(new ListenableFutureCallback<>() {
              // Callback methods
          });
      }
      
      Code Example (Synchronous):
      @Autowired
      private KafkaTemplate<String, YourObject> kafkaTemplate;
      
      public void publishJsonMessageSync(YourObject yourObject) {
          kafkaTemplate.send("yourTopic", yourObject).get();
      }
      
  7. Integration of JSON schema with Kafka in Spring Boot:

    • Description: Integrate JSON schema to enforce a structure for JSON messages in Kafka.
    • Code Example:
      @Value("${spring.kafka.properties.schema.registry.url}")
      private String schemaRegistryUrl;
      
      @Bean
      public KafkaTemplate<String, YourObject> kafkaTemplate() {
          return new KafkaTemplate<>(producerFactory(), new StringSerializer(), new JsonSerializer<>(schemaRegistryUrl));
      }
      
  8. Monitoring and logging JSON message production in Kafka with Spring Boot:

    • Description: Implement monitoring and logging for better visibility into JSON message production.
    • Code Example (Logging):
      @Autowired
      private KafkaTemplate<String, YourObject> kafkaTemplate;
      
      public void publishJsonMessageWithLogging(YourObject yourObject) {
          log.info("Publishing JSON message: {}", yourObject);
          kafkaTemplate.send("yourTopic", yourObject);
      }