dinky+flink+doris实时架构全流程demo

一、版本

doris:doris-1.2.3-rc02
flink:flink1.4.6
dinky:0.7.2
jdk:1.8.0_191
mysql:5.7

二、安装doris

官网下载地址:https://archive.apache.org/dist/doris/1.2/1.2.3-rc02/

 

#doris单机部署

#创建doris目录
mkdir /opt/module/doris
tar zxvf apache-doris-fe-1.2.3-bin-x86_64.tar.xz -C /opt/module/doris
tar zxvf apache-doris-be-1.2.3-bin-x86_64.tar.xz -C /opt/module/doris

#修改be、fe目录名称
cd /opt/module/doris/
mv apache-doris-be-1.2.3-bin-x86_64 doris_be
mv apache-doris-fe-1.2.3-bin-x86_64 doris_fe


#配置FE

#FE 配置文件 conf/fe.conf,这里我们主要修改两个参数:priority_networks 及 meta_dir

meta_dir = /opt/module/doris/doris-fe/doris-meta
priority_networks=192.168.20.0/24

#端口根据自己需求调整(以防端口冲突)

http_port = 18030
rpc_port = 19020
query_port = 19030
edit_log_port = 19010

#启动FE
/opt/module/doris/doris-fe/bin/start_fe.sh --daemon

#查看 FE 运行状态
#你可以通过下面的命令来检查 Doris 是否启动成功
curl http://127.0.0.1:18030/api/bootstrap
返回如下代表成功:
{"msg":"success","code":0,"data":{"replayedJournalId":0,"queryPort":0,"rpcPort":0,"version":""},"count":0}


#你也可以通过 Doris FE 提供的Web UI 来检查,在浏览器里输入地址
#默认用户 root 进行登录,密码是空
http:// fe_ip:18030

 #配置 BE

#修改 BE 配置文件 conf/be.conf ,这里我们主要修改两个参数:priority_networks 及 storage_root

vim /opt/module/doris/doris-be/conf/be.conf
#修改如下内容
priority_networks=192.168.20.0/24
storage_root_path = /opt/module/doris/doris-be/storage

#端口根据自己需求调整(以防端口冲突)
be_port = 19060
webserver_port = 18040
heartbeat_service_port = 19050
brpc_port = 18060

#启动FE
/opt/module/doris/doris-be/bin/start_be.sh --daemon

#添加 BE 节点到集群
#通过MySQL 客户端连接到 FE 之后执行下面的 SQL,将 BE 添加到集群中

mysql -uroot -P19030 -h127.0.0.1

mysql>ALTER SYSTEM ADD BACKEND "be_host_ip:19050";
mysql>SHOW BACKENDSG

#Alive : true表示节点运行正常

 三、安装Flink

#下载地址:Index of /dist/flink/flink-1.14.6

#Flink单机部署

#解压:
tar -zxvf flink-1.14.6-bin-scala_2.12.tgz -C /opt/module


#配置flink
vim /opt/module/flink-1.14.6/conf/flink-conf.yaml
#修改配置如下:
rest.bind-address: 0.0.0.0


#配置环境变量
vim /etc/profile
#FLINK_HOME
export FLINK_HOME=/opt/module/flink-1.14.6
export PATH=$PATH:$FLINK_HOME/bin

#相关依赖包下载

#基础依赖包下载:
https://download.csdn.net/download/qq_41060328/87818060
#其余依赖按需自行下载
https://mvnrepository.com/

#将flink自带lib目录备份并用下载的目录替换
mv /opt/module/flink-1.14.6/lib /opt/module/flink-1.14.6/lib_bak
mv flink_lib/ /opt/module/flink-1.14.6/lib

#启动

/opt/module/flink-1.14.6/bin/start-cluster.sh

#访问

flink默认的web ui界面的端口为8081
浏览器访问:http://ip:8081

 四、安装Dinky

#下载

http://www.dlink.top/download/dinky-0.7.2

 #Dinky单机部署

#解压
tar zxvf dlink-release-0.7.2.tar.gz -C /opt/module/

#将解压文件修改为dinky


#Mysql创建数据库,root用户登陆
mysql>create database dinky;

mysql>grant all privileges on dinky.* to 'dinky'@'%' identified by '密码' with grant option;
mysql>flush privileges;

#dinky用户登陆Mysql
mysql -h xx.xx.xx.xx -udinky -p密码
#初始化数据
mysql>use dinky;
mysql> source /opt/module/dinky/sql/dinky.sql

#配置dinky

#修改 Dinky 连接 mysql 的配置文件。
cd /opt/module/dinky/config/
vim application.yml


spring:
  datasource:
    url: jdbc:mysql://xx.xx.xx.xx:3306/dinky?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
    username: dinky
    password: xxxx
#如果mysql是8.0版本。需要改为com.mysql.cj.jdbc.Driver
    driver-class-name: com.mysql.jdbc.Driver

#添加依赖

依赖下载:https://download.csdn.net/download/qq_41060328/87817727

备份opt/module/dinky/plugins/flink1.14,将下载后的文件上传至/opt/module/dinky/plugins

#启动:

sh /opt/module/dinky/auto.sh start 1.14

浏览器访问 ip:8888

五、Dinky+Flink+Doris构建流计算

数据流程:

dinky添加flink集群

 

 

mysql建表

-- Mysql学生表
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student`  (
  `sid` int(11) NOT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `student` VALUES (1, '小红');
INSERT INTO `student` VALUES (2, '小黑');
INSERT INTO `student` VALUES (3, '小黄');


-- Mysql成绩表
DROP TABLE IF EXISTS `score`;
CREATE TABLE `score`  (
  `cid` int(11) NOT NULL,
  `sid` int(11) NULL DEFAULT NULL,
  `cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `score` int(11) NULL DEFAULT NULL,
  PRIMARY KEY (`cid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `score` VALUES (1, 1, 'chinese', 90);
INSERT INTO `score` VALUES (2, 1, 'math', 95);
INSERT INTO `score` VALUES (3, 1, 'english', 93);
INSERT INTO `score` VALUES (4, 2, 'chinese', 92);
INSERT INTO `score` VALUES (5, 2, 'math', 75);
INSERT INTO `score` VALUES (6, 2, 'english', 80);
INSERT INTO `score` VALUES (7, 3, 'chinese', 100);
INSERT INTO `score` VALUES (8, 3, 'math', 60);

 doris建表

-- Doris学生成绩宽表
CREATE TABLE scoreinfo
(
    cid INT,
    sid INT,
    name VARCHAR(32),
    cls VARCHAR(32),
    score INT
)
UNIQUE KEY(cid)
DISTRIBUTED BY HASH(cid) BUCKETS 10
PROPERTIES("replication_num" = "1");

配置作业:

DROP TABLE IF EXISTS student;
CREATE TABLE student (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xx.xx.xx.xx',
'port' = '3306',
'username' = 'root',
'password' = 'xxxxxx',
'database-name' = 'flink_test',
'table-name' = 'student'
);

DROP TABLE IF EXISTS score;
CREATE TABLE score (
    cid INT,
    sid INT,
    cls STRING,
    score INT,
    PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xx.xx.xx.xx',
'port' = '3306',
'username' = 'root',
'password' = 'xxxxxx',
'database-name' = 'flink_test',
'table-name' = 'score');

-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';
DROP TABLE IF EXISTS scoreinfo;
CREATE TABLE scoreinfo (
    cid INT,
    sid INT,
    name STRING,
    cls STRING,
    score INT,
    PRIMARY KEY (cid) NOT ENFORCED
) WITH (       
    'connector' = 'doris',
    'fenodes' = 'xx.xx.xx.xx:18030' ,
    'table.identifier' = 'flink_test.scoreinfo',
    'username' = 'root',
    'password'='xxxxxx',
    'sink.label-prefix' = 'doris_label'
  
);
insert into scoreinfo
select a.cid,a.sid,b.name,a.cls,a.score from score a left join student b on a.sid = b.sid;

运行后在flink端查看任务状态

 doris结果验证

增量测试

在 Mysql 中执行新增语句:

INSERT INTO `score` VALUES (9, 3, 'english', 100);

变动测试

在 Mysql 中执行新增语句:

update score set score = 100 where cid = 1