Flink迷你集群及默认参数简单介绍:
Flink迷你集群是一个轻量级的本地集群,可用于在本地环境中快速开发和测试Flink应用程序。迷你集群不需要任何复杂的配置和管理,只需要提供一个简单的配置即可使用。在启动迷你集群时,必须指定Flink应用程序所需的资源,如TaskManager的数量、内存大小、并行度等。
迷你集群的默认参数通常由Flink框架的配置文件控制。以下是一些常见的默认值:
- singleRpcService: SHARED(默认情况下,所有的RPC服务都共享同一个线程池)
- numTaskManagers: 1(默认情况下使用一个TaskManager节点)
- commonBindAddress: null(默认情况下使用本地地址)
- taskmanager.memory.network.min: 64 mb(TaskManager网络内存的最小值)
- taskmanager.memory.network.max: 64 mb(TaskManager网络内存的最大值)
- taskmanager.memory.managed.size: 128 mb(TaskManager管理内存的默认大小)
- taskmanager.numberOfTaskSlots: 12(每个TaskManager节点的默认任务插槽数量)
- parallelism.default: 1(操作符默认并行度)
- execution.target: local(默认情况下在本地运行)
- execution.runtime-mode: AUTOMATIC(默认情况下自适应选择执行模式)
- taskmanager.cpu.cores: 1.7976931348623157E308(TaskManager可以使用的CPU核心数最大值)
- taskmanager.memory.task.heap.size: 9223372036854775807 bytes(TaskManager堆内存的最大大小)
- taskmanager.memory.task.off-heap.size: 9223372036854775807 bytes(TaskManager堆外内存的最大大小)
- rest.bind-port: 0(默认情况下,REST API将绑定到系统可用端口)
- rest.address: localhost(REST API的默认地址)。
这些参数可以在Flink框架的配置文件中进行修改。在使用迷你集群时,可以使用命令行选项或Java代码覆盖这些默认设置。其中,最常用的命令行选项是-D和-yT。-D选项可以在JVM启动参数中设置要覆盖的配置参数,-yT选项可以从外部文件中加载YAML格式的配置文件。
简单demo代码如下:
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlinkDemo {
public static void main(String[] args) throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.加载数据源
DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
"java,scala,php", "java,scala", "java,c,c++,python,go");
// 3.数据转换
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String element, Collector<String> out) throws Exception {
String[] wordArr = element.split(",");
for (String word : wordArr) {
out.collect(word);
}
}
});
//DataStream 下边为DataStream子类
SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value.toUpperCase();
}
}).returns(String.class);
// 4.数据输出
source.print();
// 5.执行程序
env.execute("FlinkDemo");
}
}