kafka配合ElasticStack技术栈的搭配使用

今日内容:
    - kafka生产环境调优;
    - kafka配合ElasticStack技术栈的搭配使用;
    - zookeeper集群部署;
    - zookeeper的ACL;
    - zookeeper的调优;
    - PB级别项目;
    - ES8集群搭建/elk; (待定...)
    


订阅1个的topic:
    老男孩: 10
    
多个不同的主题分配如下: (linux82-elk: 3)
    c1: 0 3 6 9
    c2: 1 4 7
    c3: 2 5 8


订阅多个的topic:
    老男孩: 10 t1
    Linux: 10 t2 
    Python: 10  t3
    
多个不同的主题分配如下: (linux82-elk: 3)
    c1: t1_0 ...  t2_0 ...  t3_0 ...
    c2: t1_1 ...  t2_1 ...
    c3: t1_2  ...  t2_2 ...
    
    
    
    

订阅1个的topic:
    老男孩: 10
    
多个不同的主题分配如下: (linux82-elk: 3)
    c1: 0 1 2
    c2: 3 4 5
    c3: 6 7 8 9
    
10 / 3 = 3  .. 1 ===> 
999 / 100 = 9 .. 99

JBOD ---> RAID 0
---> RAID

BOND ---> ...


注意的参数:
    log.dirs
    auto.create.topics.enable
    zookeeper.connect
    num.io.threads
    
    
ElasticStack集成kafka实战案例:
    1.创建topic
kafka-topics.sh --bootstrap-server 10.0.0.103:9092 --create --topic oldboyedu-linux82-kafka-000001

    2.使用filebeat收集日志到kafka集群:
cat > config/28-nginx-to-kafka.yaml <<EOF
filebeat.inputs:
- type: log
  paths:
    - /var/log/nginx/access.log*
  json.keys_under_root: true


output.kafka:
  # 写入kafka集群的地址
  hosts: 
  - 10.0.0.102:9092
  - 10.0.0.103:9092

  # 写入集群的topic
  topic: "oldboyedu-linux82-kafka-000001"
EOF


    3.使用logstash收集kafka日志
cat > config/18-kafka-to-es.conf <<EOF
input { 
   kafka {
     # 指定kafka的集群
     bootstrap_servers => "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092"
     # 从哪个topic消费数据
     topics => ["oldboyedu-linux82-kafka-000001"]
     # 指定消费者组
     group_id => "oldboyedu-linux82-logstash"
   }


filter {
  json {
    source => "message"
    remove_field => ["tags","@version","ecs","agent","input","message"]
  }
  
  geoip {
     source => "clientip"
  }

  date {
     match => [
        "@oldboyedu-timestamp",
        "yyyy-MM-dd'T'HH:mm:ssZ"
     ]
  }

  useragent {
    source => "http_user_agent"
    target => "oldboyedu-linux82-useragent"
  }

  
}

output { 
  stdout {}

  elasticsearch {
      hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
      index => "oldboyedu-linux82-project-kafka"
      user => "elastic"
      password => "123456"
  }
}
EOF

    
    
zookeeper集群部署:
    1.创建zookeeper的数据目录
install -d /oldboyedu/data/zk
data_rsync.sh /oldboyedu/data/zk/


    2.修改单点zk的配置文件
vim /oldboyedu/softwares/zk/conf/zoo.cfg                          
...
# 定义最小单元的时间范围tick。
tickTime=2000
# 启动时最长等待tick数量。
initLimit=5
# 数据同步时最长等待的tick时间进行响应ACK
syncLimit=2
# 指定数据目录
dataDir=/oldboyedu/data/zk
# 监听端口
clientPort=2181
# 开启四字命令允许所有的节点访问。
4lw.commands.whitelist=*
# server.ID=A:B:C[:D]
# ID:
#    zk的唯一编号。
# A:
#    zk的主机地址。
# B:
#    leader的选举端口,是谁leader角色,就会监听该端口。
# C: 
#    数据通信端口。
# D:
#    可选配置,指定角色。
server.101=10.0.0.101:2888:3888
server.102=10.0.0.102:2888:3888
server.103=10.0.0.103:2888:3888

    3.同步数据
data_rsync.sh /oldboyedu/softwares/apache-zookeeper-3.8.0-bin/
    
    
    4.创建myid文件
for ((host_id=101;host_id<=103;host_id++)) do ssh 10.0.0.${host_id} "echo ${host_id} > /oldboyedu/data/zk/myid";done
    
    5.所有节点启动zk服务
zkServer.sh start
zkServer.sh  status

    
    6.链接方式
zkCli.sh -server 10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181

zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181/oldboyedu-linux82-kafka3.2.1
    
zookeeper的leader选举流程:    
    myid:
        唯一标识一个zookeeper节点。
    zxid:
        唯一事务的标识。用于记录唯一的写操作!
    
    选举流程:
        (1)相比较zxid,若zxid较大,则会成为新的leader;
        (2)如果zxid比较不出来,则比较myid,myid较大者会有限成为新的leader;
    
    
使用zkWeb管理zookeeper集群:
    1.运行zkWeb
java -jar zkWeb-v1.2.1.jar &>/dev/null &

    2.访问webUI
略。

临时znode:
    当前的会话退出时,znode会默认等待30秒后自动消失,等待时间是可以修改的哟。
    
持久的znode:
    不会随着客户端的退出而删除znode。


docker