Spring Boot Tutorial
Spring Boot - Software Setup and Configuration (STS/Eclipse/IntelliJ)
Prerequisite (Spring Core Concepts)
Spring Boot Core
Spring Boot with REST API
Spring Boot with Database and Data JPA
Spring Boot with Kafka
Spring Boot with AOP
Creating and configuring topics in Apache Kafka through a Spring Boot application can be done using the AdminClient API provided by the Kafka library. Here's a guide on how to set this up:
First, add the Spring Kafka dependency to your pom.xml
:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
In your application.properties
or application.yml
, add the Kafka broker details:
spring.kafka.bootstrap-servers=localhost:9092
Define a KafkaAdmin
bean to be used by the Kafka AdminClient API:
@Configuration public class KafkaTopicConfig { @Value(value = "${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return new KafkaAdmin(configs); } }
Define the topics you want to create as beans:
@Bean public NewTopic topic1() { return NewTopic.builder() .name("topic1") .partitions(1) .replicas(1) // Ensure you have enough brokers to support the replica count .build(); } @Bean public NewTopic topic2() { return NewTopic.builder() .name("topic2") .partitions(2) .replicas(1) .build(); }
You can also add custom configurations for your topics, such as cleanup policies, retention time, etc.:
@Bean public NewTopic topicWithConfigs() { Map<String, String> configs = new HashMap<>(); configs.put("cleanup.policy", "compact"); configs.put("retention.ms", "1200000"); return NewTopic.builder() .name("configTopic") .partitions(1) .replicas(1) .configs(configs) .build(); }
When the Spring Boot application starts, it will create the defined topics with the specified configurations.
Replica Count: Ensure you have enough Kafka brokers to satisfy the number of replicas you've specified for your topic. If you have only one broker and specify more than one replica, topic creation will fail.
Idempotence: The defined configuration is idempotent. If the topic already exists with the same configurations, it won't be changed. However, if there's a difference in configurations, Spring might not be able to alter an existing topic. For altering existing topics, you might have to use the Kafka AdminClient directly or use the Kafka command-line utilities.
Kafka Version: The ability to create topics automatically using the AdminClient API is available in relatively newer versions of Kafka. Ensure you're using a supported version.
By following this guide, you should be able to create and configure Kafka topics directly from your Spring Boot application.
Configuring Kafka topics in Spring Boot application.properties:
application.properties
or application.yml
.application.properties
):spring.kafka.producer.topic=my-topic spring.kafka.consumer.topic=my-topic
Programmatically creating Kafka topics in Spring Boot:
@Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); return new KafkaAdmin(configs); }
Setting replication factors for Kafka topics in Spring Boot:
spring.kafka.producer.properties.replication.factor=3 spring.kafka.consumer.properties.replication.factor=3
Configuring partitioning for Kafka topics in Spring Boot:
spring.kafka.producer.properties.partitions=5 spring.kafka.consumer.properties.partitions=5
Managing topic configurations with KafkaTemplate in Spring Boot:
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send("my-topic", message); }
Topic creation and configuration with KafkaAdmin in Spring Boot:
@Bean public NewTopic myTopic() { return new NewTopic("my-topic", 3, (short) 2); }
Creating and configuring multiple Kafka topics in Spring Boot:
NewTopic
beans to create and configure multiple Kafka topics.@Bean public NewTopic topic1() { return new NewTopic("topic-1", 3, (short) 2); } @Bean public NewTopic topic2() { return new NewTopic("topic-2", 5, (short) 3); }
Handling topic configuration changes in a Spring Boot Kafka application:
NewTopic
beans.@Autowired private KafkaAdmin kafkaAdmin; public void updateTopicConfigurations() { NewTopic newTopic = new NewTopic("my-topic", 5, (short) 3); kafkaAdmin.createOrModifyTopics(Collections.singletonList(newTopic)); }