大数据之Kafka————java来实现kafka相关操作
一、在java中配置pom
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
二、生产者方法
(1)、Producer
Java中写在生产者输入内容在kafka中可以让消费者提取
[root@kb144 config]# kafka-console-consumer.sh --bootstrap-server 192.168.153.144:9092 --topic kb22
package nj.zb.kb22.Kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
/**
* 用java生产消息 在xshell消费消息
*/
public class MyProducer {
public static void main(String[] args) {
Properties properties = new Properties();
//生产者的配置文件
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
//key的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//value的序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
/**
* ack应答机制
* 0
* 1
* all
*/
properties.put(ProducerConfig.ACKS_CONFIG,"1");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
Scanner scanner = new Scanner(System.in);
while (true){
System.out.println("请输入kafka的内容");
String msg =scanner.next();
ProducerRecord<String,String> record = new ProducerRecord<String, String>("kb22",msg);
producer.send(record);
}
}
}
(2)、Producer进行多线程操作
生产者多线程是一种常见的技术实践,可以提高消息生产的并发性和吞吐量。通过将消息生产任务分配给多个线程来并行地发送消息,可以有效地利用系统资源,加快消息的发送速度。
package nj.zb.kb22.Kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyProducer2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {//i代表线程
Thread thread =new Thread(new Runnable() {
@Override
public void run() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
properties.put(ProducerConfig.ACKS_CONFIG,"0");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//多线程操作 j代表消息
for (int j = 0; j < 100; j++) {
String msg=Thread.currentThread().getName()+" "+ j;
System.out.println(msg);
ProducerRecord<String, String> re = new ProducerRecord<String, String>("kb22", msg);
producer.send(re);
}
}
});
executorService.execute(thread);
}
executorService.shutdown();
while (true){
if (executorService.isTerminated()){
System.out.println("game over");
break;
}
}
}
}
三、消费者方法
(1)、Consumer
通过java来实现消费者
package nj.zb.kb22.Kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
//设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
/**
* earliest 第一条数据开始拉取(当前应该没有获取过此topic信息)
* latest 获取最新的数据(当前没有获取过此topic信息)
* none
* group消费者分组的概念
*/
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//创建好kafka消费者对象后,订阅消息,指定消费的topic
consumer.subscribe(Collections.singleton("kb22"));
while (true){
Duration mills = Duration.ofMillis(100);
ConsumerRecords<String, String> records = consumer.poll(mills);
for (ConsumerRecord<String,String> record:records){
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
System.out.println("topic:"+topic+"tpartition"+partition+"toffset"+offset+"tkey"+key+"tvalue"+value+"ttimestamp"+timestamp);
}
//consumer.commitAsync();//手动提交
}
}
}
(2)、设置多人访问
package nj.zb.kb22.Kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyConsumerThread {
//模仿多人访问
public static void main(String[] args) {
for (int i = 0; i <3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
//设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
/**
* earliest 第一条数据开始拉取(当前应该没有获取过此topic信息)
* latest 获取最新的数据(当前没有获取过此topic信息)
* none
* group消费者分组的概念
*/
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("kb22"));
while (true){
//poll探寻数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String>record:records){
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
long timestamp = record.timestamp();
String name = Thread.currentThread().getName();
System.out.println("name"+name
+"ttopic:"+topic
+"tpartition" +partition
+"toffset"+offset
+"tkey"+key
+"tvalue"+value
+"ttimestamp"+timestamp
);
}
}
}
}).start();
}
}
}