第一步:引入maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
第二步:新增配置文件
以下为大致结构,供参考
spring:
kafka:
first:
bootstrap-servers: xxx.xxx.xxx.xxx:xxxx
producer:
retries: x
acks: -1
consumer:
enable-auto-commit: false
group-id: first-consumer
listener:
ack-mode: xx
second:
bootstrap-servers: xxx.xxx.xxx.xxx:xxxx
producer:
batch-size: xxxx
buffer-memory: xxxxxx
consumer:
auto-offset-reset: earliest
group-id: second-consumer
listener:
concurrency: xx
第三步:新增配置类
第一个kafka的配置类
@Configuration
public class FirstKafkaConfig {
@Primary
@ConfigurationProperties(prefix = "spring.kafka.first")
@Bean
public KafkaProperties firstKafkaProperties() {
return new KafkaProperties();
}
@Primary
@Bean
public KafkaTemplate<String, String> firstKafkaTemplate(
@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {
return new KafkaTemplate<>(firstProducerFactory(firstKafkaProperties));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
firstKafkaListenerContainerFactory(@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties));
return factory;
}
private ConsumerFactory<? super Integer, ? super String> firstConsumerFactory(KafkaProperties firstKafkaProperties) {
return new DefaultKafkaConsumerFactory<>(firstKafkaProperties.buildConsumerProperties());
}
private DefaultKafkaProducerFactory<String, String> firstProducerFactory(KafkaProperties firstKafkaProperties) {
return new DefaultKafkaProducerFactory<>(firstKafkaProperties.buildProducerProperties());
}
}
第二个kafka的配置类
@Configuration
public class SecondKafkaConfig {
@ConfigurationProperties(prefix = "spring.kafka.second")
@Bean
public KafkaProperties secondKafkaProperties() {
return new KafkaProperties();
}
@Bean
public KafkaTemplate<String, String> secondKafkaTemplate(
@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties));
return factory;
}
private ConsumerFactory<? super Integer, ? super String> secondConsumerFactory(KafkaProperties secondKafkaProperties) {
return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties());
}
private DefaultKafkaProducerFactory<String, String> secondProducerFactory(KafkaProperties secondKafkaProperties) {
return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties());
}
}
第四步:用
生产者用法
@Resource
private KafkaTemplate<String, String> firstKafkaTemplate;
@Resource(name = "secondKafkaTemplate")
private KafkaTemplate<String, String> secondKafkaTemplate;
消费者用法
@KafkaListener(
containerFactory = "secondKafkaListenerContainerFactory",
topics = {"xxxx"},
groupId = "second-consumer")
public void testConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
}