kafka复习:(20):消费者拦截器的使用
一、定义消费者拦截器(只消费含"sister"的消息)
package com.cisdi.dsp.modules.metaAnalysis.rest;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>();
Set<TopicPartition> partitionSet = records.partitions();
for(TopicPartition topicPartition: partitionSet){
List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition);
List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>();
for(ConsumerRecord<String,String> record: partitionRecordList){
if(record.value().contains("sister")){
newPartitionRecordList.add(record);
}
}
finalResult.put(topicPartition,newPartitionRecordList);
}
return new ConsumerRecords<>(finalResult);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp,meta) -> {
System.out.println("消费者拦截器:"+tp.topic()+":"+meta.offset());
});
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
二、定义消费者,配置消费者拦截器
package com.cisdi.dsp.modules.metaAnalysis.rest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerInterceptorTest {
public static void main(String[] args) {
String topic="testTopic2";
String server="xx.xx.xx.xx:9092";
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());
KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties);
myConsumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));
for(ConsumerRecord consumerRecord: records){
System.out.println(consumerRecord.value());
}
//myConsumer.commitSync();
}
}
}