大数据高级面试题

大数据高级面试题

Kafka的producer如何实现幂等性?

Producer 幂等性

Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重,但是这里的幂等性是有条件的:

  • 只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

幂等性要解决的问题

在 0.11.0 之前,Kafka 通过 Producer 端和 Server 端的相关配置可以做到数据不丢,也就是 at least once,但是在一些情况下,可能会导致数据重复,比如:网络请求延迟等导致的重试操作,在发送请求重试时 Server 端并不知道这条请求是否已经处理(没有记录之前的状态信息),所以就会有可能导致数据请求的重复发送,这是 Kafka 自身的机制(异常时请求重试机制)导致的数据重复。

对于大多数应用而言,数据保证不丢是可以满足其需求的,但是对于一些其他的应用场景(比如支付数据等),它们是要求精确计数的,这时候如果上游数据有重复,下游应用只能在消费数据时进行相应的去重操作,应用在去重时,最常用的手段就是根据唯一 id 键做 check 去重。

在这种场景下,因为上游生产导致的数据重复问题,会导致所有有精确计数需求的下游应用都需要做这种复杂的、重复的去重处理。试想一下:如果在发送时,系统就能保证 exactly once,这对下游将是多么大的解脱。这就是幂等性要解决的问题,主要是解决数据重复的问题,正如前面所述,数据重复问题,通用的解决方案就是加唯一 id,然后根据 id 判断数据是否重复,Producer 的幂等性也是这样实现的,这一小节就让我们看下 Kafka 的 Producer 如何保证数据的 exactly once 的。

幂等性的实现原理(at least once + 幂等 = exactly once)

Kafka Producer 在实现时有以下两个重要机制:

  • PID(Producer ID),用来标识每个 producer client;
  • sequence numbers,client 发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复。

每个 Producer 在初始化时都会被分配一个唯一的 PID,broker端会为producer每个Partition维护一个sequence number映射。sequence number时从0开始单调递增的。Producer 在发送数据时,将会给每条 msg 标识一个 sequence number,Server 也就是通过这个来验证数据是否重复。这里的 PID 是全局唯一的,Producer 故障后重新启动后会被分配一个新的 PID,这也是幂等性无法做到跨会话的一个原因。

新消息的sequence number - broker端维护的sequence number大1,说broker会接受处理这条消息。
新消息的sequence number比broker端维护的sequence number要小,说明时重复消息,broker可以将其直接丢弃
新消息的sequence number比broker端维护的sequence number要大过1,说明中间存在了丢数据的情况,那么会响应该情况,对应的Producer会抛出OutOfOrderSequenceException。

Producer如何开启幂等性

Properties.put(“enable.idempotence”, ture)

Properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

Kafka的ISR和OSR的作用分别是什么?

在Kafka中,生产者和消费者只与leader 副本进行交互,而 follow 副本只负责消息的同步,很多时候 follower 副本中的消息相对 leader 副本而言会有一定的滞后。根据同步情况的不同,kafka将副本分为了以下几种集合:

  • AR ( Assigned Replicas ):分区中的所有副本统称为 AR
  • ISR(On-Sync Replicas):所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内〕组成
  • OSR (Out-of-Sync Replicas ):与 leader 副本同步滞后过多的副本(不包 leader 副本)组成

ISR 与 OSR并不是固定的:

leader 副本负责维护和跟踪 ISR 集合中所有 follower 的滞后状态, follower 副本落后太多或失效时, leader 副本会把它从 ISR 集合中剔除
如果 OSR 集合中有 follower 副本 ”追上“ leader 副本,那么 leader 副本它从 OSR 集合转移至 ISR 集合

Kafka生产者如何实现幂等性写入和事务?

Kafka事务 Transactions

Kafka 的事务处理,主要是允许应用可以把消费和生产的 batch 处理(涉及多个 Partition)在一个原子单元内完成,操作要么全部完成、要么全部失败。为了实现这种机制,我们需要应用能提供一个唯一 id,即使故障恢复后也不会改变,这个 id 就是 TransactionnalId(也叫 txn.id,后面会详细讲述),txn.id 可以跟内部的 PID 1:1 分配,它们不同的是 txn.id 是用户提供的,而 PID 是 Producer 内部自动生成的(并且故障恢复后这个 PID 会变化),有了 txn.id 这个机制,就可以实现多 partition、跨会话的 EOS 语义。

当用户使用 Kafka 的事务性时,Kafka 可以做到的保证:

跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;
跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);

跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态。

Kafka 事务的主要特点如下:

  1. 原子性:Kafka 事务中的所有操作要么全部成功完成,要么全部回滚。这确保了消息的一致性。
  2. 事务性消息:Kafka 事务允许将多个消息作为一个事务进行发送。只有当事务中的所有消息都成功写入到 Kafka 集群中时,事务才会被提交。
  3. 跨分区和跨主题事务:Kafka 事务支持在多个分区和主题之间进行事务性操作。这使得可以在不同的分区和主题之间保持一致性。
  4. 事务协调器:Kafka 使用一个事务协调器来管理和协调事务的执行。事务协调器负责分配事务 ID、管理事务的状态和元数据,并确保事务的原子性。

Kafka消费者位置提交方式有哪些?分别什么场景下使用?

  1. 自动提交(Auto Commit):消费者通过设置enable.auto.committrue来启用自动提交消费位置的功能。在使用自动提交时,消费者会定期将当前消费的最新 offset 自动提交到 Kafka。该方式适用于简单的消费场景,不需要太精确的消费位置控制,且可以容忍一定程度的数据重复或丢失。
  2. 手动提交(Manual Commit):消费者通过显式调用commitSync()commitAsync()方法来手动提交消费位置。手动提交可以根据业务逻辑控制何时提交消费位置,并且可以指定具体的消费位置进行提交。这种方式适用于需要更精确的消费位置控制以及对数据的准确性要求较高的场景。

使用自动提交和手动提交的场景如下:

  • 自动提交适用场景:

  • 简单消费逻辑:对于简单的消费逻辑,不需要过多地控制消费位置,也不需要精确控制数据的准确性。

  • 较低的可靠性要求:可以容忍一定程度的数据重复或丢失,如日志收集等场景。

  • 手动提交适用场景:

  • 精确消费位置控制:需要精确控制消费位置,例如按照一定的条件进行批量消费、跳过某些消息等。

  • 较高的可靠性要求:对数据的准确性要求较高,不能容忍数据重复或丢失。

  • 批量提交消费位置:通过手动提交可以灵活控制提交消费位置的时机,可以根据实际情况进行批量提交。

需要注意的是,手动提交消费位置时需要注意提交的顺序和频率,以免引入不必要的延迟或增加系统负载。同时,手动提交也需要处理可能出现的提交失败或错误的情况,并进行相应的重试和异常处理。

Kafka消息丢失场景有哪些?如何避免?

Kafka 可能发生消息丢失的场景有以下几种:

  1. Producer 端丢失
  • 目前 Kafka Producer 是异步发送消息的,如果你的 Producer 客户端使用了 producer.send(msg) 方法来发送消息,方法会立即返回,但此时并不能代表消息已经发送成功了。
  • 如果消息再发送的过程中发生了网络抖动,那么消息可能没有传递到 Broker,那么消息可能会丢失。
  • 如果发送的消息本身不符合,如大小超过了 Broker 的承受能力等。
  1. Broker 端丢失
  • Leader Broker 宕机了,触发选举过程,集群选举了一个落后 Leader 太多的 Broker 作为 Leader,那么落后的那些消息就会丢失了。
  • Kafka 为了提升性能,使用页缓存(Page Cache)机制,将消息写入页缓存而非直接持久化至磁盘,采用了异步批量刷盘机制,也就是说,按照一定的消息量和时间间隔去刷盘,刷盘的动作由操作系统来调度的,如果刷盘之前,Broker 宕机了,重启后在页缓存的这部分消息则会丢失。
  1. Consumer 端丢失
  • Consumer 没有正确消费消息,就把位移提交了,导致 Kafka 认为该消息已经被消费了,从而导致消息丢失。
  • 场景1:获取到消息后直接提交位移了,然后再处理消息。这样在提交位移后,处理完消息前,如果程序挂掉,这部分消息就算是丢失了。
  • 多线程并发消费消息,且开启了自动提交,导致消费完成之前程序就自动提交了位移,如果程序挂掉也会出现消息丢失。
  1. 避免 Producer 端丢失
  • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  • 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  1. 避免 Broker 端丢失
  • 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  • 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  • 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  • 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  1. 避免 Consumer 端丢失
  • 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

Kafka参数调优的注意事项

  1. Broker 参数调优:
  • num.network.threads 和 num.io.threads:调整网络和I/O线程的数量,以适应集群的负载。
  • socket.send.buffer.bytes 和 socket.receive.buffer.bytes:调整TCP Socket的发送和接收缓冲区大小,以提高网络吞吐量。
  • log.segment.bytes:设置每个日志段的大小,以平衡磁盘空间和读写性能。
  • log.roll.hours 和 log.retention.hours:控制日志段的滚动和保留策略,以适应数据的存储和保留需求。
  1. Consumer 参数调优:
  • max.poll.records 和 fetch.max.bytes:调整消费者每次拉取的消息数量和总字节数,以提高拉取性能。
  • fetch.min.bytes 和 fetch.max.wait.ms:控制消费者的拉取策略,以平衡吞吐量和延迟。
  • enable.auto.commit 和 auto.commit.interval.ms:配置消费者的自动提交偏移量的策略,以确保消费的消息不会重复或丢失。
  1. Producer 参数调优:
  • acks:设置生产者的消息确认机制(0、1或all),以平衡消息的可靠性和性能。
  • batch.size 和 linger.ms:控制生产者的消息批量发送行为,以提高发送的吞吐量。
  • compression.type:设置消息的压缩算法,以减小网络传输和磁盘存储的开销。
  1. JVM 参数调优:
  • Xmx 和 Xms:调整Kafka Broker、Consumer和Producer的Java堆内存大小,以适应不同的负载和数据量。
  • XX:+UseG1GC:选择适合的垃圾回收器,以提高JVM性能和内存管理效率。
  1. 网络和硬件调优:
  • 网络带宽和延迟:确保Kafka集群的网络带宽和延迟满足实际需求。
  • 磁盘和存储:选择高速和可靠的磁盘存储,以确保数据的持久性和读写性能。

在调优 Kafka 参数时,需要根据实际情况进行测试和性能分析,以找到最优的参数配置。此外,还可以使用Kafka提供的监控工具和指标来实时监测集群的性能和健康状态,进一步进行调优和优化。

Kafka消费组重新平衡流程

  • 重平衡的作用
  • 让消费者分组内消费者消费哪些主题分区达成一致。重平衡需要借助Kafka Broker端的Coordinator组件,在Coordinator的帮助下完成消费者组的分区重新分配。
  • 触发重平衡的三个条件
  • 组内成员数量发生变化
  • 消费者组订阅主题的数量发生变化
  • 订阅主题的分区数量发生变化。
  • 重平衡的流程
  • 简约版
    • 加入组(JoinGroup):当消费者心跳包响应 REBALANCE_IN_PROGRESS 时,说明消费组正在重平衡,此时消费者会停止消费,并且发送请求加入消费组;
    • 同步更新分配方案:当 Coordinator 收到所有组内成员的加入组请求后,会选出一个consumer Leader,然后让consumer Leader进行分配,分配完后会将分配方案放入SyncGroup请求中发送会Coordinator,Coordinator根据分配方案发送给每个消费者。
  • 加入组
    • 当组内成员加入组时,它会向 coordinator 发送JoinGroup请求。
    • 在该请求中,每个成员都要将自己订阅的主题上报, 这样协调者就能收集到所有成员的订阅信息。
    • 一旦收集了全部成员的JoinGroup请求后, Coordinator 会从这些成员中选择一个担任这个消费者组的领导者。
    • 通常情况下,第一个发送JoinGroup请求的成员自动成为领导者。
    • 领导者消费者的任务是收集所有成员的订阅信息, 然后根据这些信息,制定具体的分区消费分配方案。
    • 选出领导者之后, Coordinator 会把消费者组订阅信息封装进JoinGroup请求的 响应体中, 然后发给领导者,由领导者统一做出分配方案后, 进入到下一步。
  • 等待领导者消费者
    • 领导者消费者(Leader Consumer)分配方案。
    • 领导者向 Coordinator 发送SyncGroup请求, 将刚刚做出的分配方案发给协调者。
    • 值得注意的是,其他成员也会向 Coordinator 发送SyncGroup请求, 只不过请求体中并没有实际的内容。
    • 这一步的主要目的是让 Coordinator 接收分配方案, 然后统一以 SyncGroup 响应的方式分发给所有成员, 这样组内所有成员就都知道自己该消费哪些分区了。
  • 重平衡的影响
  • Rebalance过程的表现有些类似JVM FGC的情况,期间整个应用都会阻塞,所有Consumer实例都会停止消费,等待Rebalance完成。
  • Rebalance过程中,所有Consumer实例都会参与重新分配。会导致TCP重新建立连接,是一个比较慢的操作,浪费资源。
  • Rebalance的耗时取决于Consumer Group下的实例数量,一旦实例数过多,耗时极长,会造成大量消费延迟。
  • 如何避免重平衡
  • 高峰期尽量避免对于kafka消费者组进行分区扩容操作,以免触发Rebalance流程。
  • 消费者里面的业务逻辑尽量轻量化,避免一些重的业务逻辑操作,触发消费者heartbeat超时,造成消费者下线,从而触发Rebalance流程。
  • 合理调整consumer端的相关参数:
    • a. session.timeout.ms:Consumer Group内实例的心跳超时时间,默认值是 10s。
    • b. heartbeat.interval.ms即心跳请求频率,频繁发送心跳请求会额外消耗带宽资源,但是能够更及时的触发Rebalance,默认值为 3s。
    • c. max.poll.interval.ms调用poll方法的时间间隔,默认值为 5min。期间没消费完poll回的消息,Coordinator会开启新一轮Rebalance。

Kafka消费者分区分配策略

  1. Range分配策略
  • Range分配策略是面向每个主题的,首先会对同一个主题里面的分区按照序号进行排序,并把消费者线程按照字母顺序进行排序。然后用分区数除以消费者线程数量来判断每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
  1. RoundRobin分配策略
  • 将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询算法逐个将分区以此分配给每个消费者。
    • 如果同一消费组内,所有的消费者订阅的消息都是相同的,那么 RoundRobin 策略的分区分配会是均匀的。
    • 如果同一消费者组内,所订阅的消息是不相同的,那么在执行分区分配的时候,就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 topic,那么在分配分区的时候,此消费者将不会分配到这个 topic 的任何分区。
  1. Sticky分配策略
  2. 这种分配策略是在kafka的0.11.X版本才开始引入的,是目前最复杂也是最优秀的分配策略。
  3. 分配策略实现了两个目的
    1. 分区的分配要尽可能的均匀;
    2. 分区的分配尽可能的与上次分配的保持相同。

ClickHouse中的ReplicatedMergeTree是什么?有什么优点?

ReplicatedMergeTree引擎是MergeTree的派生引擎,它在MergeTree的基础上加入了分布式协同的能力,只有使用了ReplicatedMergeTree复制表系列引擎,才能应用副本的能力。或者用一种更为直接的方式理解,即使用ReplicatedMergeTree的数据表就是副本。

ReplicatedMergeTree的核心逻辑中,大量运用了ZooKeeper的能力,以实现多个ReplicatedMergeTree副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和BlockID去重判断等。在执行INSERT数据写入、MERGE分区和MUTATION操作的时候,都会涉及与ZooKeeper的通信。

优点:

  1. 高可靠性:ReplicatedMergeTree使用多个副本(Replica)存储数据,确保数据的冗余备份。当某个节点发生故障或不可用时,系统可以从其他副本中获取数据,保证数据的可靠性和可用性。
  2. 数据冗余和容错性:每个副本都是完整的数据副本,即使某些副本不可用,数据仍然可从其他副本中读取。该冗余机制提供了容错性,防止数据丢失和服务中断。
  3. 强一致性:ReplicatedMergeTree引擎采用分布式一致性协议,确保所有副本中的数据保持一致。这意味着数据更新和变更操作在所有副本上都会同步进行,保证了数据的强一致性。
  4. 数据合并和压缩:ReplicatedMergeTree通过合并相邻的数据块来减少磁盘空间使用。此外,它还支持数据压缩算法,可以进一步减小数据的存储空间,提高存储效率。
  5. 可扩展性:ReplicatedMergeTree引擎支持水平扩展,即向集群中添加更多的节点或副本,以增加数据处理能力和存储容量。这使得系统可以满足不断增长的数据量和负载需求。
  6. 实时性能:ReplicatedMergeTree引擎在处理大规模数据集时表现出色,能够快速执行聚合、过滤和排序等操作。它适用于需要实时分析和查询的场景,如日志分析和实时报表生成

ClickHouse的分布式查询流程是什么样的?

ClickHouse的分布式查询流程如下:

  1. 查询解析
    用户提交一个SQL查询请求,该请求被ClickHouse接收并解析。查询包括选择要从数据库中检索的数据、筛选条件、排序方式等等。

  2. 查询计划生成
    一旦查询被解析,ClickHouse会生成一个查询计划。这个计划决定了如何在分布式环境中执行查询,包括选择使用哪些分布式表、如何分割数据和并行执行查询。

  3. 分布式查询优化
    ClickHouse在生成查询计划后,会进行一系列优化操作,以提高查询性能。这可能包括重新排序操作、剪枝、推测执行等。

  4. 查询分发
    生成的查询计划告诉ClickHouse如何将查询分发到各个分布式节点上。这些节点通常是集群中的多个物理或虚拟机器,它们存储着数据的分片。

  5. 分片扫描
    一旦查询到达各个分布式节点,这些节点会扫描它们所负责的数据分片,以满足查询的条件。

  6. 局部计算
    分布式节点在本地执行计算,这可以包括过滤、聚合和计算表达式。每个节点只需要处理自己负责的数据分片。

  7. 局部结果汇总
    每个分布式节点生成部分查询结果。这些局部结果会被收集并汇总,以生成最终的查询结果。这通常涉及将各个节点的结果合并、合计、排序等操作。

  8. 查询结果返回
    最终的查询结果会被返回给用户,用户可以在客户端应用程序中处理这些结果。

整个过程中,ClickHouse利用其列式存储引擎、数据压缩技术和并行处理能力,以高效处理大规模数据集。该数据库系统还提供了许多高级功能,如分区表、合并树引擎等,以优化查询性能和数据管理。这使得ClickHouse在数据分析场景中非常受欢迎。

ClickHouse存储结构及优势

ClickHouse的存储结构和其优势是为了支持高性能大数据分析而设计的。以下是ClickHouse的存储结构和相关优势:

存储结构:

  1. 列式存储:ClickHouse采用列式存储,即将表中的每个列单独存储,而不是按行存储。这种存储方式使得数据压缩和检索效率更高。

  2. 分区表:ClickHouse支持分区表,允许您将数据按时间或其他列的值划分为不同的分区。这有助于加速查询和数据管理,特别是在大数据集上。

  3. MergeTree引擎:ClickHouse的核心存储引擎是MergeTree,它支持数据合并和剪枝操作,使其非常适合时间序列数据的存储和分析。

  4. 数据压缩:ClickHouse使用多种数据压缩算法,减小存储空间占用,并加速数据的读取。数据在存储时经过压缩,但在查询时仍然可以进行快速解压缩。

  5. 分布式存储:ClickHouse支持分布式存储,允许数据分布在多个节点上。这提高了可伸缩性和容错性。

  6. 合并树:ClickHouse使用合并树技术来加速查询,通过将数据在不同层次进行合并,减少了需要扫描的数据量。

优势:

  1. 高性能查询:ClickHouse的列式存储和压缩技术使其能够执行高性能的查询,特别适合于聚合、过滤和分析操作。合并树技术进一步提高了查询性能。

  2. 快速数据加载:ClickHouse可以高效地加载大量数据,尤其是在数据采用列式存储和分区表的情况下,数据加载速度很快。

  3. 可伸缩性:ClickHouse支持分布式架构,允许在需要时添加更多节点,以应对不断增长的数据量和查询负载。

  4. 低存储成本:由于数据压缩和列式存储,ClickHouse通常需要较少的存储空间,从而降低了存储成本。

  5. 容错性:ClickHouse的分布式架构具有高可用性和容错性,可以自动处理节点故障并保持数据一致性。

  6. 支持复杂查询:ClickHouse支持SQL查询语言,可以处理复杂的分析查询,包括聚合、连接、筛选等。

总的来说,ClickHouse是一种强大的列式数据库系统,特别适用于大规模数据分析。其存储结构和优势使其成为处理大数据集的理想选择,提供高性能、可伸缩性和容错性。

ClickHouse各种索引的区别和使用场景

ClickHouse支持不同类型的索引,每种索引都有其自身的优点和适用场景。以下是ClickHouse中一些常见的索引类型、其区别以及适用场景:

  1. Primary Key(主键索引)
  • 区别:主键索引是最基本的索引类型,用于唯一标识表中的每一行。它是一个列或列的组合,确保表中没有重复的主键值。

  • 使用场景:主键索引适用于需要快速查找特定行或执行合并操作的场景。它还用于确保表中数据的唯一性。

  1. MergeTree索引
  • 区别:MergeTree索引是ClickHouse的默认存储引擎,主要用于时间序列数据。它将数据按时间分区,并支持数据合并操作,以加速查询。

  • 使用场景:适用于时间序列数据,如日志、指标数据等。它支持按时间范围快速筛选数据。

  1. Bitmap索引
  • 区别:Bitmap索引是一种位图索引,它使用位图来表示行是否包含特定值。它通常用于高基数列(唯一值较多的列)。

  • 使用场景:Bitmap索引适用于高基数列,如性别、国家等,其中值的唯一性较高。它可以快速筛选包含特定值的行。

  1. Range索引
  • 区别:Range索引允许您指定列的范围,并且只存储该范围内的数据。它有助于减小索引的大小,提高查询性能。

  • 使用场景:适用于需要在特定范围内查询数据的情况,如按日期范围查询或按数值范围查询。

  1. Set索引
  • 区别:Set索引用于处理具有有限值集合的列,它将列中的不同值映射到一个有限的编号集合中,以节省存储空间。

  • 使用场景:适用于列中只有有限的不同值,如枚举值或特定状态的标志。

  1. Distributed索引
  • 区别:Distributed索引是ClickHouse中用于分布式查询的索引,它指导查询在分布式环境中执行。

  • 使用场景:适用于分布式查询,确保查询可以在集群中的多个节点上并行执行。

不同类型的索引可以根据具体的数据和查询需求进行选择。ClickHouse允许在表创建时选择合适的索引类型,以便更好地支持不同的查询和分析操作。索引的选择应该根据数据量、查询模式、数据分布和性能需求等因素来决定。

  1. 一级索引
  2. ClickHouse 的表使用主键索引,才能让数据查询有更好的性能,这是因为数据和索引会按主键进行排序存储,用主键索引查询数据可以很快地处理数据并返回结果。
  3. 二级索引/跳数索引
  4. min_max:存储每个块的索引表达式的最小值和最大值;
  5. set:可以理解为列出字段内所有出现的枚举值,可以设置取多少条;
  6. Bloom Filter:允许对集合成员进行高效的是否存在测试,但代价是有轻微的误报;

ClickHouse查询性能优化

  1. 单表查询(RBO、CBO)

prewhere替代where

prewhere 和 where 语句的作用相同,用来过滤数据。不同之处在于 prewhere 只支持*MergeTree族系列引擎的表,首先会读取指定的列数据,来判断数据过滤,等待数据过滤之后再读取 Select 声明的列字段来补全其余属性。

当查询列明显多于筛选列时使用 prewhere 可十倍提升查询性能,prewhere 会自动优化执行过滤阶段的数据读取方式,降低 IO 操作。

数据采样

通过采样运算可极大提升数据分析的性能,采样修饰符(SAMPLE)只有在 MergeTree engine 表中才有效,且在创建表时需要指定采样策略。

列裁剪与分区裁剪

数据量太大时应避免使用select *操作,查询的性能会与查询的字段大小和数量成反比,字段越少,消耗的 IO 资源越少,性能就会越高。分区裁剪就是只读取需要的分区,在过滤条件中指定。

order by结合where、limit

千万以上数据集进行 order by 查询时需要搭配 where 条件和 limit 语句一起使用。

避免构建虚拟列

如非必须,不要在结果集上构建虚拟列,虚拟列非常消耗资源浪费性能,可以考虑在前端进行处理,或者在表中构造实际字段进行额外存储。

uniqCombined替代distinct

uniqCombined 相比 distinct 性能可提升 10 倍以上,uniqCombined 底层采用类似 HyperLogLog 算法实现,能接收 2% 左右的数据误差,可直接使用这种去重方式提升查询性能。Count(distinct) 会使用 uniqExact 精确去重。

不建议在千万级不同数据上执行 distinct 去重查询,改为近似去重 uniqCombined。

  1. 多表优化

用IN代替JOIN

大小表JOIN

多表 join 时要满足小表在右的原则,右表关联时被加载到内存中与左表进行比较,ClickHouse 中无论是 Left join 、Right join 还是 Inner join 永远都是拿着右表中的每一条记录到左表中查找该记录是否存在,所以右表必须是小表。

注意谓词下推

  1. 其他优化

(1)关闭虚拟内存,物理内存和虚拟内存的数据交换,会导致查询变慢

(2)为每一个账户添加join_use_nulls配置,左表中的一条记录在右表中不存在,右表的相应字段会返回该字段相应数据类型的默认值,而不是标准SQL中的Null值

(3)对 ClickHouse 数据的增删改操作都会产生新的临时分区,会给 MergeTree 带来额外的合并任务。因此,数据变更操作不宜太频繁,这样会产生非常多的临时分区。一次操作的数据也不能太快。临时分区写入过快会导致 Merge 速度跟不上而报错。  官方一般建议一秒钟发起一次左右的写入操作,每次操作写入的数据量保持在 2W~5W 之间,具体根据服务器性能而定。

(4)分布式表使用GLOBAL,两张分布式表上的 IN 和 JOIN 之前必须加上GLOBAL关键字,右表只会在接收查询请求的那个节点查询一次,并将其分发到其他节点上。如果不加 GLOBAL 关键字的话,每个节点都会单独发起一次对右表的查询,而右表又是分布式表,就导致右表一共会被查询N^2次(N是该分布式表的分片数量),这就是查询放大,会带来很大开销。

  1. EXPLAIN命令查询执行计划进行优化

  2. 查找执行计划中的性能瓶颈,如全表扫描、文件排序等。

  3. 根据瓶颈,调整查询语句、创建或修改索引、优化表结构等。

  4. 重新执行查询,并比较执行计划和性能。

  5. 优化表连接和子查询

  • 尽量避免笛卡尔积连接,使用JOIN条件过滤无关记录。
  • 优先使用INNER JOIN,避免使用OUTER JOIN。
  • 将子查询替换为JOIN或EXISTS子句,提高性能。
  1. 合理使用聚合函数和窗口函数
  • 避免在大表上使用聚合函数,如COUNT()、SUM()等。
  • 使用窗口函数进行分组和排序操作,提高查询性能。
  1. 避免全表扫描和降低数据读取量
  • 尽量使用索引进行查询,避免全表扫描。
  • 使用WHERE子句过滤无关记录,减少数据读取量。
  1. 优化数据过滤和排序操作
  • 使用索引进行过滤和排序操作。
  • 避免在ORDER BY子句中使用函数和表达式。
  1. 使用分区和索引进行查询优化
  • 调整并发设置和内存限制:

    • 根据系统资源和查询需求,调整ClickHouse的并发设置,如max_threads参数。
    • 调整内存限制参数,如max_memory_usage,以保证查询能在限定的资源下高效运行。
  • 处理大数据量和复杂查询场景:

    • 对于大数据量查询,可以使用LIMIT子句分批查询,降低内存消耗。

    • 对于复杂查询,可以将查询拆分为多个简单查询,使用临时表或物化视图存储中间结果,降低查询复杂度。

  • SQL查询优化的最佳实践和常见问题解决方案:

    • 使用EXPLAIN命令查看查询执行计划,找到性能瓶颈。
    • 合理设计表结构、索引和分区,以提高查询性能。
    • 避免使用不必要的聚合函数、窗口函数和JOIN操作。
    • 避免全表扫描,尽量使用索引进行查询。
    • 使用WHERE子句过滤无关记录,降低数据读取量。
    • 调整ClickHouse的并发设置和内存限制,提高查询性能。
    • 对于大数据量和复杂查询场景,采用分批查询、拆分查询和使用临时表等策略降低查询复杂度。

Flink窗口机制有哪些?应用场景分别是什么?

  1. 滚动窗口(Tumbling Windows)
  • 滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建。
  • **适用场景:**适合做每个时间段的聚合计算,BI分析。例如统计某页面每分钟点击的pv。
  • 场景1:我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被称为翻滚时间窗口(Tumbling Time Window)。
  1. 滑动窗口(Sliding Windows)
  • 与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
  • **适用场景:**对最近一段时间段内进行统计(如某接口近几分钟的失败调用率)
  • 场景:比如:每隔3秒计算最近5秒内,每个基站的日志数量、每30秒计算一次最近一分钟用户购买的商品总数。
  1. 会话窗口(Session Windows)
  • 会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
  • 会话窗口并没有固定的开始或结束时间,所以它的计算方法与滑动窗口和滚动窗口不同。在 Flink 内部,会话窗口的算子会为每一条数据创建一个窗口, 然后将距离不超过预设间隔的窗口合并。 想要让窗口可以被合并,会话窗口需要拥有支持合并的 TriggerWindow Function, 比如说 ReduceFunctionAggregateFunctionProcessWindowFunction
  • **适用场景:**在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。
  • 场景一:如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。
  • 场景二:3秒内如果没有数据进入,则计算每个基站的日志数量
  • 场景三:比如音乐 app 听歌的场景,我们想统计一个用户在一个独立的 session 中听了多久的歌曲(如果超过15分钟没听歌,那么就是一个新的 session 了)
  1. 全局窗口(Global Windows)
  • 全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

Flink窗口函数的应用

  1. ReduceFunction 和AggregateFunction:
  • 应用示例:用于聚合窗口内的数据,例如计算窗口内元素的总和、平均值、最大值、最小值等。
  1. WindowFunction:
  • 应用示例:用于执行自定义操作,如在窗口内执行复杂计算或将窗口内的数据写入外部存储系统。
  1. ProcessWindowFunction:
  • 应用示例:适用于需要访问窗口内所有元素的场景,可以实现更灵活的操作,如窗口内的元素排序、分组、过滤等。
  1. FoldFunction:
  • 应用示例:用于在窗口内构建累积的结果,可以实现自定义的累积逻辑
  1. ApplyFunction:
  • 应用示例:用于将窗口内的数据传递给外部系统,如调用外部服务或发送数据到外部数据存储。
  1. CoProcessFunction (双流处理):
  • 应用示例:适用于处理多个输入流的情况,可以实现各种复杂的联合操作,如事件匹配数据关联等。
  1. WindowProcessFunction:
  • 应用示例:类似于 ProcessWindowFunction,但提供更丰富的上下文信息,可用于更复杂的处理逻辑,如窗口触发条件的动态调整
  1. KeyedProcessFunction:
  • 应用示例:适用于需要在键控窗口上执行自定义操作的场景,如处理超时事件、实现复杂的状态机等。
  1. SideOutput:
  • 应用示例:可用于将窗口内的部分数据输出到不同的侧输出流,用于实现分流和筛选操作。

Flink中数据倾斜原因和解决方案

  1. 数据倾斜的定位
  2. 定位反压
    1. Flink Web UI 自带的反压监控
    2. Flink Task Metrics
    3. 通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask
  3. 确定数据倾斜
    1. Flink Web UI 自带Subtask 接收和发送的数据量
  4. 倾斜原因
  5. Flink 任务出现数据倾斜的直观表现是任务节点频繁出现反压,但是增加并行度后并不能解决问题;
  6. 部分节点出现 OOM 异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。
  7. 场景:业务上有严重的数据热点、技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生数据热点
  • key 分布不均匀的无统计场景。例如上游数据分布不均匀,使用keyBy来打散数据。
  • 解决思路: 通过添加随机前缀,打散 key 的分布,使得数据不会集中在几个 Subtask。
  • key 分布不均匀的统计场景
  • 解决思路:聚合统计前,先进行预聚合,例如两阶段聚合(加盐局部聚合+去盐全局聚合)。

问题1: flink实时程序在线上环境上运行遇到一个很诡异的问题,flink使用eventtime读取kafka数据发现无法触发计算。—>事件时间倾斜

问题描述:

watermark的传递机制当并行执行的情况下,每次接受的水印发送的水印都是最小的,木桶效应。但是,当某个分区始终无数据的时候,就不会更新该分区的watermark值,那么窗口就一直不会被触发计算。这种现象在某些hash极端导致数据倾斜很普遍。

场景:Flink消费Kafka上下游并行度不一致导致的数据倾斜

解决方案:我们 Flink 消费 Kafka 的数据时,是推荐上下游并行度保持一致,即 Kafka 的分区数等于 Flink Consumer 的并行度。但是会有一种情况,为了加快数据的处理速度,来设置 Flink 消费者的并行度大于 Kafka 的分区数。如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据。需要设置 Flink 的 Redistributing,也就是数据重分配。Rebalance 分区策略,数据会以 round-robin 的方式对数据进行再次分区,可以全局负载均衡。Rescale 分区策略基于上下游的并行度,会将数据以循环的方式输出到下游的每个实例中。

方案二:

flink 1.11新增了支持watermark空闲检测WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲,从而进一步支持 Flink 正确处理多个并发之间的事件时间倾斜的问题,并且避免了空闲的并发延迟整个系统的事件时间。通过将 Kafka 连接器迁移至新的接口(FLINK-17669),用户可以受益于针对单个并发的空闲检测。

Flink Watermark机制

Apache Flink中的Watermark机制是用于处理事件时间(event time)数据的一种重要机制,用于解决乱序事件和迟到事件等与时间相关的问题。Watermark是一种特殊类型的记录,它包含了一个时间戳,用于表示事件时间流中的时间进展。Watermark机制有助于Flink处理有序性和时序性的数据流。

以下是关于Flink Watermark机制的一些关键概念和作用:

  1. Watermark生成
  • Watermark是由数据源(如Kafka、Kinesis等)生成的,或者可以在Flink程序中手动创建。Watermark通常与事件时间字段的时间戳相关联,表示事件时间的进度。
  • Watermark会周期性地或基于数据的实际时间戳生成,并随着事件时间的增长而递增。
  1. Watermark传播
  • Watermark在数据流中随事件一起传播。Flink的操作符会接收Watermark,并将其传递到下游操作符。在传递过程中,Watermark的时间戳通常是逐步递增的。
  1. Watermark的作用
  • Watermark用于告知Flink何时认为特定时间戳之前的数据已经全部到达,即事件时间窗口可以安全地关闭和触发。
  • Watermark还用于处理迟到事件。当事件时间窗口已关闭时,Flink可以继续接收后续迟到的事件,并将它们分配到已关闭的窗口中。
  1. Watermark与窗口操作
  • 在窗口操作中,Flink会根据Watermark的时间戳来触发窗口计算。当Watermark达到窗口结束时间时,Flink将关闭窗口并进行计算。
  • Watermark也用于处理乱序事件。当Watermark到达某个时间戳时,Flink可以安全地关闭该时间戳之前的窗口。
  1. Watermark的延迟
  • Watermark的延迟是指从数据生成到Watermark生成的时间间隔。这个延迟是为了处理迟到事件和乱序事件,以确保数据流中的数据能够被正确处理。
  • 延迟的设置需要权衡,较长的延迟可以处理更多的迟到事件,但可能会导致延迟窗口关闭。较短的延迟可以更快地关闭窗口,但可能错过迟到事件。

Flink的Watermark机制有助于处理事件时间数据,确保数据在流处理中的正确处理和窗口操作的准确触发。它使Flink能够处理乱序事件、迟到事件以及具有时间维度的流处理需求。在Flink应用程序中,合理设置Watermark机制是至关重要的,以确保流处理的正确性和性能。

FlinkCEP复杂事件处理应用

概述:FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。

  1. 模式API

  2. 单个模式

    一个模式可以是一个单例或者循环模式。单例模式只接受一个事件,循环模式可以接受多个事件。

  3. 组合模式

    单个模式连接起来组成一个完整的模式序列

    • 严格连续:期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
      • next(),指定严格连续
    • 松散连续: 忽略匹配的事件之间的不匹配的事件。
      • followedBy(),指定松散连续
    • 不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。
      • followedByAny(),指定不确定的松散连续。
  4. 模式组

  5. 检测模式

在指定了要寻找的模式后,该把它们应用到输入流上来发现可能的匹配了。为了在事件流上运行你的模式,需要创建一个PatternStream。 给定一个输入流input,一个模式pattern和一个可选的用来对使用事件时间时有同样时间戳或者同时到达的事件进行排序的比较器comparator

场景:假设您有一个网络服务器的访问日志,您想要检测一种情况:在过去5分钟内,某个IP地址(或一组IP地址)在不到1秒内发起了10次或更多的请求。您可以使用Flink CEP来实现此目标。

Flink SQL和Table API的区别是什么?

Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。

Table API是一套内嵌在Java和Scala语言中的查询API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询(比如select、filter和join)。而对于Flink SQL,就是直接可以在代码中写SQL,来实现一些查询(Query)操作。Flink的SQL支持,基于实现了SQL标准的Apache Calcite(Apache开源SQL解析工具)。

SQL风格的查询语言

  • Flink SQL:Flink SQL是一种用于查询和处理数据的SQL语言,类似于传统关系型数据库的SQL。您可以编写SQL查询来操作数据流或批处理作业。这使得它对熟悉SQL的用户来说更加直观和易于使用。
  • Table API:Flink Table API是一种基于API的方式,使用类似SQL的查询表达式来操作数据。它允许以编程方式构建查询,而不是使用纯SQL。

编程模型

  • Flink SQL:Flink SQL更适合那些更习惯SQL查询语言的数据分析师和SQL开发人员,因为它更接近传统SQL。它通常用于简单的查询和数据分析任务。
  • Table API:Flink Table API提供了更多的编程灵活性,允许开发人员以编程方式构建查询,执行更复杂的数据操作,并进行更灵活的数据处理。

集成和扩展性

  • Flink SQL:Flink SQL具有更强的生态系统集成,可以轻松与外部系统进行连接,如Kafka、Elasticsearch、Hive等。
  • Table API:Table API提供更多的编程扩展性,可以用于自定义表函数和表源。

Flink Streaming如何处理迟到事件?

事件时间数据经常包含乱序事件或延迟到达的事件。处理迟到事件需要合理配置水印(Watermark)以及使用特定的窗口和触发策略。

  1. 配置Watermark生成
  • 在Flink中,Watermark用于表示事件时间的进度。Watermark会告知系统事件时间的最大进度,以便触发窗口操作。
  • 在数据流中,要定期生成Watermark并将其传递给Flink。水印的生成通常基于事件时间字段的时间戳。
  1. 窗口配置
  • 选择合适的窗口类型,例如滚动窗口、滑动窗口或会话窗口,以根据业务需求划分事件时间。
  • 设置窗口的大小和滑动间隔,这将影响窗口的划分。
  1. 窗口分配策略
  • 使用allowedLateness方法来为窗口分配策略设置最大允许的迟到时间。这将定义在触发窗口之后允许事件时间的延迟。
  • 对于迟到的事件,它们可以被分配给窗口,只要它们的事件时间小于窗口结束时间加上allowedLateness
  1. 迟到事件处理
  • 当窗口触发时,系统将根据窗口内的数据执行聚合操作。然而,迟到事件也可能已经到达。
  • 使用sideOutputLateData来将迟到事件流输出到另一个数据流,然后您可以对这些迟到事件执行自定义操作。侧输出
  • 迟到事件会通过sideOutputLateData方法输出到late-events数据流中,以供后续处理。正常事件则由MyWindowFunction进行处理。

通过合理配置Watermark和窗口,以及使用allowedLatenesssideOutputLateData,您可以有效地处理迟到事件并保持事件时间处理的正确性。

Flink与Spark Streaming的技术选型对比

一、设计理念

Spark的技术理念是使用微批来模拟流的计算,基于Micro-batch,数据流以时间为单位被切分为一个个批次,通过分布式数据集RDD进行批量处理,是一种伪实时。

Flink是基于事件驱动的,是面向流的处理框架, Flink基于每个事件一行一行地流式处理,是真正的流式计算. 另外他也可以基于流来模拟批进行计算实现批处理。

二、运行架构

Spark Streaming 运行时的角色(standalone 模式)主要有:

  • Master:主要负责整体集群资源的管理和应用程序调度;
  • Worker:负责单个节点的资源管理,driver 和 executor 的启动等;
  • Driver:用户入口程序执行的地方,即 SparkContext 执行的地方,主要是 DGA 生成、stage 划分、task 生成及调度;
  • Executor:负责执行 task,反馈执行状态和执行结果。

Flink 运行时的角色(standalone 模式)主要有:

  • Jobmanager: 协调分布式执行,他们调度任务、协调 checkpoints、协调故障恢复等。至少有一个 JobManager。高可用情况下可以启动多个 JobManager,其中一个选举为 leader,其余为 standby;
  • Taskmanager: 负责执行具体的 tasks、缓存、交换数据流,至少有一个 TaskManager;
  • Slot: 每个 task slot 代表 TaskManager 的一个固定部分资源,Slot 的个数代表着 taskmanager 可并行执行的 task 数。

三、任务调度

Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,根据DAG中的action操作形成job,每个job有根据窄宽依赖生成多个stage。(流程:构建 DGA 图——划分 stage——生成 taskset——调度 task)

Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度。

四、时间机制

Spark Streaming 支持的时间机制有限,只支持处理时间。使用processing time模拟event time必然会有误差, 如果产生数据堆积的话,误差则更明显。

flink支持三种时间机制:事件时间,注入时间,处理时间,同时支持 watermark 机制处理迟到的数据,说明Flink在处理乱序大实时数据的时候,更有优势。

Flink Savepoint和Checkpoint的区别

Apache Flink中的Savepoint和Checkpoint都是用于保证数据流处理的一致性和容错性的机制,但它们有不同的目的和使用场景,以下是它们的区别:

Checkpoint(检查点)和Savepoint(保存点):

  1. 目的
  • Checkpoint的主要目的是实现容错性。它用于将应用程序的状态定期保存到持久性存储中,以便在发生故障时能够从故障点之前的状态继续进行处理,确保数据不会丢失。
  • Savepoint的主要目的是应用程序的升级、降级、变更或迁移。它允许将应用程序的状态保存到持久性存储中,然后在新的应用程序版本上重新加载该状态。
  1. 触发方式
  • Checkpoint可以根据时间间隔或数据条数等规则定期触发,或者可以在代码中显式触发。
  • Savepoint通常是由用户手动触发的,而不是定期触发。用户可以根据需要创建Savepoint,以备将来的用途。
  1. 数据存储
  • Checkpoint的状态数据通常存储在分布式文件系统(如HDFS)或分布式存储系统中,以确保高可用性和可靠性。
  • Savepoint的状态数据通常存储在分布式文件系统或分布式存储系统中。
  1. 恢复操作
  • 当应用程序失败时,Flink可以使用最近成功的检查点进行恢复,从最近的一次检查点开始继续处理数据流。
  • Savepoint通常用于将状态从一个应用程序版本迁移到另一个应用程序版本,而不是简单的故障恢复。它允许在不同版本的应用程序之间共享状态。
  1. 开销
  • Checkpoint需要额外的存储和网络开销,因为它涉及将状态数据写入外部存储。
  • Savepoint也需要额外的存储和网络开销。但它通常不会在常规故障恢复中使用。

总的来说,Checkpoint和Savepoint都有状态保存和恢复的作用,但它们的使用场景和触发方式不同。Checkpoint用于实现容错性,由Flink自动触发,用于故障恢复。Savepoint由用户手动触发,用于应用程序升级、变更或迁移,它更多是一种版本迁移工具。在实际应用中,它们通常会一起使用,以保证数据的完整性和应用程序的可维护性。

Flink任务链和任务槽的作用是什么?

一、任务链

对于分布式执行,Flink 会将算子的 Subtasks 链接成 Tasks。每个 Task 由一个线程执行。将算子链接成 Task 形成任务链(task chain) 是个有用的优化:能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少延迟的同时提高整体的吞吐量。

二、任务槽

是 Flink 集群资源分配和管理的机制。每个任务槽相当于一个独立的资源池,可以分配给一个或多个任务执行。通过任务槽,可以对 Flink 集群内的资源进行合理的分配和管理,从而更好地利用硬件资源,提高任务的执行效率和吞吐量。

具体来说,一个 Flink 作业(Job)可以由多个 TaskManager 组成,并通过网络连接相互通信。每个 TaskManager 可以包含多个任务槽,每个任务槽可以执行一个或多个任务。任务链可以将多个算子连接在一起,让它们在同一个任务槽中运行。这样就能够避免将数据从一个任务槽发送到另一个任务槽的开销,从而提高处理效率。

总的来说,任务链和任务槽都是 Flink 的优化措施,可以帮助用户更好地利用硬件资源,提高 Flink 作业的执行效率和吞吐量。

Flink内存模型与内存优化

官网版本

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

一、Flink JVM 进程的进程总内存(Total Process Memory)

  • Flink 应用使用的内存(Flink 总内存 Total Flink Memory)
  • JVM 堆内存(Heap Memory)
  • 堆外内存(Off-Heap Memory)
    • 直接内存(Direct Memory)
    • 本地内存(Native Memory)
  • Flink 运行过程中的 JVM 使用的内存
  • 元数据区(JVM MetaSpace)
  • 日常消耗(JVM Overhead)

一、内存模型

  1. Flink总体内存主要包含 JobManager 内存模型和 TaskManager 内存模型。

img

  1. JobManager 内存模型

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • Flink 总内存 Total Flink Memory

  • #JobManager总进程内存

  • jobmanager.memory.process.size:4096m

  • JVM 堆内存(Heap Memory)

    • # 作业管理器的 JVM 堆内存大小
    • jobmanager.memory.heap.size:2048m
  • 堆外内存(Off-Heap Memory)

    • #作业管理器的堆外内存大小。
    • jobmanager.memory.off-heap.size:1536m
  1. TaskManager 内存模型

img

  • 总体内存
  • **Total Process Memory:**Flink Java 应用程序(包括用户代码)和 JVM 运行整个进程所消耗的总内存。
    • 总进程内存(Total Process Memory) = Flink 总内存 + JVM 元空间 + JVM 执行开销
  • **Total Flink Memory:**仅 Flink Java 应用程序消耗的内存,包括用户代码,但不包括 JVM 为其运行而分配的内存。
    • Flink 总内存 = Framework堆内外 + task 堆内外 + network + managed Memory
  • JVM Heap (JVM 堆上内存)
  • **Framework Heap :**框架堆内存
  • Task Heap : 任务堆内存
  • Off-Heap Mempry(JVM 堆外内存)
  • Managed memory: 托管内存
  • 由 Flink 管理的原生托管内存,保留用于排序、哈希表、中间结果缓存和 RocksDB 状态后端
  • **DirectMemory:**JVM 直接内存
    • **Framework Off-Heap Memory:**Flink 框架堆外内存。
      • TaskManager 本身所占用的对外内存,不计入Slot资源。
    • **Task Off-Heap :**Task 堆外内存。
      • 专用于Flink 框架的堆外直接(或本机)内存。
    • **Network Memory:**网络内存。
      • 网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区。
  • **JVM metaspace:**JVM 元空间。
  • Flink JVM 进程的元空间大小,默认为256MB。
  • **JVM Overhead :**JVM执行开销。
  • JVM 执行时自身所需要的内容,包括线程堆栈、IO、 编译缓存等所使用的内存,这是一个上限分级成分的的总进程内存。

二、内存优化

  1. 为 Standalone 配置内存
  • 建议为 Standalone 配置 Flink 总内存,设置 JobManager 和 TaskManager 的 flink.size 大小,声明为 Flink 本身提供了多少内存。
  1. 为 Containers(容器) 配置内存
  • 建议为容器化部署(Kubernetes或Yarn)配置总进程内存,设置 process.size 大小,它声明了总共应该分配多少内存给 Flink JVM 进程,并对应于请求容器的大小。
  1. 为 state backends(状态后端)配置内存
  • 为 state backends(状态后端)配置内存时,这仅与TaskManager相关。
  • 在部署 Flink 流应用程序时,所使用的状态后端类型将决定集群的最佳内存配置。
  • HashMap 状态后端
    • 运行无状态作业或使用 HashMapStateBackend 时,将托管内存设置为零。这将确保为 JVM 上的用户代码分配最大数量的堆内存。
  • RocksDB 状态后端
    • EmbeddedRocksDBStateBackend 使用本机内存。默认情况下,RocksDB 设置为将本机内存分配限制为托管内存的大小。因此,为你的状态保留足够的托管内存非常重要。如果禁用默认的 RocksDB 内存控制,RocksDB 分配的内存超过请求的容器大小(总进程内存)的限制,则可以在容器化部署中终止 TaskManager 。
  1. 为 batch Job(批处理作业)配置内存
  • Flink 的批处理操作符利用托管内存来更高效地运行。这样做时,可以直接对原始数据执行某些操作,而无需反序列化为 Java 对象。这意味着托管内存配置对应用程序的性能有实际影响。Flink 将尝试分配和使用 为批处理作业配置的尽可能多的托管内存,但不会超出其限制。这可以防止 OutOfMemoryError’s,因为 Flink 准确地知道它必须利用多少内存。如果托管内存不足,Flink 会优雅地溢出到磁盘。

Flink内存管理机制及其参数调优

Flink状态管理内部原理是什么?

一、Flink的State类型

  1. 托管状态(Managed State)由Flink管理的,Flink帮忙存储、恢复和优化。
  2. Keyed State
    1. Keyed State是KeyedStream上的状态。假如输入流按照id为Key进行了keyBy分组,形成一个KeyedStream,数据流中所有id为1的数据共享一个状态,可以访问和更新这个状态,以此类推,每个Key对应一个自己的状态。下图展示了Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务1处理了两种Key,两种Key分别对应自己的状态。
    2. ValueState<T>: 保存一个可以更新和检索的值。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
    3. ListState<T>: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List<T>) 进行添加元素,通过 Iterable<T> get() 获得整个列表。还可以通过 update(List<T>) 覆盖当前的列表。
    4. ReducingState<T>: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
    5. AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
    6. MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries()keys()values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
  3. Operator State
    1. Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图展示了Operator State,算子子任务1上的所有数据可以共享第一个Operator State,以此类推,每个算子子任务上的数据共享自己的状态。
  4. 广播状态 (Broadcast State)
    1. 广播状态是一种特殊的算子状态。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。 该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流,这个例子自然而然地运用了广播状态。
    2. 不同之处
      • 它具有 map 格式,
      • 它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流,
      • 这类算子可以拥有不同命名的多个广播状态
  5. 原生状态(Raw State)开发者自己管理的,需要自己序列化。
  6. 区别
  7. 状态的数据结构
    1. Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。
    2. Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。
  8. 具体的使用场景
    1. 绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。
    2. Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。

二、状态算子的扩缩容

  1. 流应用的一个基本需求是根据输入速率的增加或减少而调整算子的并行性。有状态算子,调整并行度比较难。因为我们需要把状态重新分组,分配到与之前数量不等的并行任务上。
  2. Keyed State
  3. 带有键值分区状态的算子可以通过将键重新划分来进行任务的扩缩容。但是,为了提高效率,Flink 不会以键为单位来进行划分。相反,Flink 以键组作为单位来重新分配,每个键组里面包含了多个键。
  4. Operate State
  5. 带有算子列表状态的算子在扩缩容时会对列表中的条目进行重新分配。所有并行任务的列表项会被统一收集起来,并再均匀重新分配。如果列表项的数量少于算子的新并行度,一些任务将以空状态开始。
  6. 带有算子联合列表状态的算子会在扩缩容时把状态列表中的全部条目广播到全部任务中。然后,任务自己来选择使用哪些项和丢弃哪些项。
  7. 带有算子广播状态的算子在扩缩容时会把状态拷贝到全部新任务上。这样做是因为广播状态要确保所有任务具有相同的状态。在缩容的情况下,直接简单地停掉多余的任务即可。

三、状态后端

  1. 在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(State Backend)。状态后端主要负责管理本地状态的存储方式和位置。有状态的流计算是 Flink 的一大特点,状态本质上是数据,数据是需要维护的,例如数据库就是维护数据的一种解决方案。
  2. 后端分类
  3. Flink 1.13 以前
    1. MemoryStateBackend
      • 基于内存存储。将状态维护在 JVM 堆上的一个内部状态后端。
      • 建议在本地开发或调试时使用 MemoryStateBackend,因为它具有有限的状态大小。这意味着它在处理大规模状态时可能会出现性能问题或内存限制。
    2. FsStateBackend
      • 基于文件存储。配置通过 URL(type, address, path) 等文件系统完成。
      • FsStateBackend 非常适合处理大规模状态、长窗口或大型键值状态的 Apache Flink 有状态流处理作业。它专为高效地处理大量状态数据而设计。
      • 此外,FsStateBackend 确实是高可用性设置的理想选择。它确保将状态数据可靠地存储在文件系统中,以应对故障并提供容错性。同时,在 JobManager 的内存或 ZooKeeper 中存储的最小元数据进一步增强了高可用性能力。
      • 因此,当 Flink 应用程序需要强大的状态数据处理和保持高可用性时,通常建议使用 FsStateBackend。
    3. RocksDBStateBackend
      • 基于 RocksDB 存储。配置通过 URL(type, address, path) 等文件系统完成
      • RocksDBStateBackend 使用 RocksDB 数据库在本地磁盘上保存进行中的数据。
  4. Flink 1.13 以及以后
    1. HashMapStateBackend(Fsstatebackend和MemoryStatebackend整合)系统默认
      • HashMapStateBackend 把状态存放在内存里。
      • 具体实现
        • 哈希表状态后端在内部会直接把状态当作对象(Objects)保存在 Taskmanager 的 JVM 堆内存上。
        • 普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
    2. EmbeddedRocksDBStateBackend(内嵌 RocksDB 状态后端)
      • RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。
      • 配置 EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB 默认存储在 TaskManager 的本地数据目录里。
    3. 区别
      • HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里。
      • HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
      • EmbeddedRocksDBStateBackend 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。
  5. 主要作用
  • Local State Management:本地状态管理;
    • State Management 的主要任务是确保状态的更新和访问。
  • Remote State Checkpointing:远程状态备份。
    • Flink 程序是分布式运行的,而 State 都是存储到各个节点上的,一旦 TaskManager 节点出现问题,就会导致 State 的丢失。State Backend 提供了 State Checkpointing 的功能,可以将 TaskManager 本地的 State 的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。不同的 State Backends 备份的方式不同,会有效率高低的区别。
  1. 状态后端的主要作用为:在每一个 TaskManager 节点上存储和管理状态,以及将状态进行远程备份两个部分。

四、状态持久化

  1. Checkpoint
  • 有状态流应用中的检查点(Checkpoint)其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存档”,将之前处理数据的进度进行保存。
  • 在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 ID 和状态。如果发生故障,Flink会使用最近一次成功保存的检查点来恢复应用的状态,并重新启动处理流程,就如同“读档”一样。
  1. Savepoint
  • Checkpoint 的主要目的是为意外失败的作业提供恢复机制(如 TaskManager/JobManager 进程挂了)。Checkpoint 由Flink 管理,即 Flink 创建、管理和删除。Checkpoint 无需用户交互。
  • Savepoint 的应用场景为升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。Savepoint 的用例是有计划的,需要手动备份和恢复,Savepoint 更多地关注可移植性。Savepoint 由用户管理,即用户创建、管理和删除。

五、状态 TTL

​ 在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用 .clear() 方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(Time-To-Live, TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

Flink状态一致性和容错机制

一、容错机制

  1. Checkpoint
  • 有状态流应用中的检查点(Checkpoint)其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存档”,将之前处理数据的进度进行保存。

  • 在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 ID 和状态。如果发生故障,Flink会使用最近一次成功保存的检查点来恢复应用的状态,并重新启动处理流程,就如同“读档”一样。

  • 保存时机

    • 随时保存(缺点:耗费资源)
    • 周期保存
    • 周期异步增量保存
  • 实现方式

    • 朴素算法

      • 总结下来就是数据源暂停接收数据,待数据流中的数据全部处理完毕,再把数据源偏移,键值状态,算子状态等写入检查点路径中,优点是实现简单,缺点是处理太慢。
    • Chandy-Lamport 算法

  1. Savepoint
  • Savepoint 作为实时任务的全局镜像,其在底层使用的代码和Checkpoint的代码是一样的
  • 触发方式
    • 使用 flink savepoint 命令触发 Savepoint,其是在程序运行期间触发 savepoint。
    • 使用 flink cancel -s 命令,取消作业时,并触发 Savepoint。
    • 使用 Rest API 触发 Savepoint,格式为:/jobs/:jobid /savepoints

二、状态一致性

  1. 处理语义
  2. At-Most-Once(特点:最多计算一次。缺点:有可能会有数据丢失)
    • At-Most-Once 是最简单的恢复方式,直接从失败处的下个数据开始恢复程序,之前的失败数据处理就不管了。可以保证数据或事件最多由应用程序中的所有算子处理一次。这意味着如果数据在被流应用程序完全处理之前发生丢失,则不会进行其他重试或者重新发送。
  3. At-Least-Once(特点:至少计算一次。缺点:有可能重复处理数据)
    • 应用程序中的所有算子都保证数据或事件至少被处理一次。这通常意味着如果事件在流应用程序完全处理之前丢失,则将从源头重放或重新传输事件。然而,由于事件是可以被重传的,因此一个事件有时会被处理多次(至少一次),至于有没有重复数据,不会关心,所以这种场景需要人工干预自己处理重复数据。
  4. Exactly-Once(保证每一条消息只被流处理系统处理一次。)
    • 通过这种机制,流应用程序中每个算子的所有状态都会定期做 Checkpoint。如果是在系统中的任何地方发生失败,每个算子的所有状态都回滚到最新的全局一致 Checkpoint 点。在回滚期间,将暂停所有处理,源也会重置为与最近 Checkpoint 相对应的正确偏移量。整个流应用程序基本上是回到最近一次的一致状态,然后程序可以从该状态重新启动。
  5. End-to-End Exactly-Once(端到端的精确一次)
    • Source:可重设数据的读取位置(如 Kafka)
    • Transformation:Checkpoint 机制(Asynchronous Barrier Snapshotting)
    • Sink:从故障恢复时,数据不会重复写入外部系统
      • 幂等写入:如 HBase、Redis 这样的 KV 数据库按 K 覆盖 V。关系型数据库可以在插入数据时检测是否有重复键,如果有重复键,则执行更新操作。
      • 事务写入:预写日志和二阶段提交。

Flink批流统一的意义及实现

Flink是一个分布式流处理框架,具有批处理和流处理的能力,并且支持将批处理和流处理统一起来,这也是Flink的一个重要特点。

批流统一的意义在于消除了传统上批处理和流处理之间的差异,使得开发人员可以使用相同的编程模型来处理批处理和流处理任务。这样可以带来以下好处:

  1. 简化开发:开发人员只需要学习和实现一种编程模型,就可以同时处理批处理和流处理任务,减少了学习成本和开发复杂度。

  2. 灵活性和实时性:通过将批处理和流处理统一起来,可以实现实时的流处理任务,同时也可以处理离线的批处理任务。这使得数据处理更加灵活,并且可以根据需求实时处理和分析数据。

  3. 资源利用率:批流统一可以将批处理和流处理作业一起提交到同一个执行引擎中执行,从而更好地利用计算资源。例如,在低峰期可以使用空闲资源执行批处理作业,而在高峰期可以动态分配资源进行流处理。

在Flink中,实现批流统一主要依靠以下机制:

  1. 时间语义:Flink使用事件时间和处理时间来处理流数据。事件时间是根据事件真实发生的时间来处理数据,适合于有序或无序事件流;而处理时间是根据数据到达处理节点的时间来处理数据。通过时间语义,Flink可以同时支持批处理和实时流处理任务。

  2. 状态管理:Flink提供了可靠的状态管理机制,用于在流处理和批处理任务中跟踪和维护状态。状态可以用于存储中间结果、缓存数据等,在批处理和流处理任务中都可以使用。

  3. 数据分区和窗口:Flink支持将流数据分为不同的分区,并提供了窗口机制来对数据进行分组和聚合操作。窗口可以基于事件时间或处理时间进行定义,从而支持批处理和流处理的需求。

总之,Flink批流统一的实现主要基于时间语义、状态管理和窗口机制等核心特性,使得开发人员可以以一种统一的方式处理批处理和流处理任务,提高开发效率和数据处理的灵活性。

Kafka和Flink内存参数配置的关系和优化方法

Flink 和 Kafka 都是流处理领域的重要技术。Flink 作为一个流处理引擎,可以和 Kafka 一起使用来实现各种数据处理和分析任务。

  1. Flink 调优建议
  • 并行度调整:Flink 的并行度是决定性能瓶颈的因素之一。可以根据任务的需求和计算资源来调整并行度。

  • 状态后端调整:Flink 支持多种状态后端,比如 RocksDB 和 MemoryStateBackend。不同的状态后端对性能影响很大,可以根据任务的需求和计算资源来选择合适的状态后端。

  • 内存管理调整:Flink 在运行时会占用一定的内存空间,需要合理分配内存资源,以免导致内存溢出等问题。可以通过调整 JVM 的堆内存和堆外内存大小来优化内存管理。

  • 算子链调整:Flink 的算子链可以影响任务的性能。可以通过将一些算子合并成一个算子链来提高性能。

  1. Kafka 调优建议
  • 分区数量调整:Kafka 的性能和分区数量相关,可以根据消息的大小和数量来适当调整分区数量。

  • 消费者数量调整:消费者的数量也会影响 Kafka 的性能。可以根据消费者的数量和计算资源来适当调整消费者数量。

  • 消息压缩设置:Kafka 支持多种消息压缩方式,可以根据消息的类型和大小来选择合适的消息压缩方式。

  • 批量拉取和提交设置:Kafka 支持批量拉取和提交消息,可以通过调整批量大小来优化性能。

总的来说,Flink 和 Kafka 的调优都需要根据具体情况进行调整。可以通过监控系统的性能指标,如 CPU 使用率、内存使用率、网络带宽等指标来优化调整。

(Flink kafka source的并行度需要和kafka topic的分区数一致。最大化利用kafka多分区topic的并行读取能力。)

Kafka&Flink&ClickHouse在联合场景中的优势

Flink读取Kafka数据下沉到Clickhouse

整体流程:

  1. 向kafka特定主题下导入json格式数据
  2. 编写Flink Kafka Comsumer消费主题下的数据
  3. 利用Flink算子对数据进行处理(etl)
  4. 将处理后的数据下沉到Clickhouse数据库中

Why Flink+ClickHouse

  • 指标实现 sql 化描述:分析师提出的指标基本都以 SQL 进行描述。
  • 指标的上下线互不影响:一个 Flink 任务消费 Topic,如果还需要其它指标,可以保证指标的上下线互不影响。
  • 数据可回溯,方便异常排查:当日活下降,需要回溯排查是哪些指标口径的逻辑问题,比如是报的数据差异或是数据流 Kafka 掉了,或者是因为用户没有上报某个指标导致日活下降,而 Flink 则无法进行回溯。
  • 计算快,一个周期内完成所有指标计算:需要在五分钟内将成百上千的所有维度的指标全部计算完成。
  • 支持实时流,分布式部署,运维简单:支持 Kafka 数据实时流。

Why ClickHouse so Fast

  1. ClickHouse 采用列式存储 +LZ4、ZSTD 数据压缩。
  2. 计算存储结合本地化+向量化执行。ClickHouse 计算存储本地化是指每一台计算机器存在本地 SSD 盘,只需要计算自己的数据,再进行节点合并。
  3. LSM merge tree+Index。将数据写入 ClickHouse 之后,会在后台开始一个线程将数据进行 merge,做 Index 索引。如建常见的 DT 索引和小时级数据索引,以提高查询性能。
  4. SIMD+LLVM 优化。SIMD 是单指令多数据集。
  5. SQL 语法及 UDF 完善。

优化性能。

总的来说,Flink 和 Kafka 的调优都需要根据具体情况进行调整。可以通过监控系统的性能指标,如 CPU 使用率、内存使用率、网络带宽等指标来优化调整。

(Flink kafka source的并行度需要和kafka topic的分区数一致。最大化利用kafka多分区topic的并行读取能力。)

Kafka&Flink&ClickHouse在联合场景中的优势

Flink读取Kafka数据下沉到Clickhouse

整体流程:

  1. 向kafka特定主题下导入json格式数据
  2. 编写Flink Kafka Comsumer消费主题下的数据
  3. 利用Flink算子对数据进行处理(etl)
  4. 将处理后的数据下沉到Clickhouse数据库中

Why Flink+ClickHouse

  • 指标实现 sql 化描述:分析师提出的指标基本都以 SQL 进行描述。
  • 指标的上下线互不影响:一个 Flink 任务消费 Topic,如果还需要其它指标,可以保证指标的上下线互不影响。
  • 数据可回溯,方便异常排查:当日活下降,需要回溯排查是哪些指标口径的逻辑问题,比如是报的数据差异或是数据流 Kafka 掉了,或者是因为用户没有上报某个指标导致日活下降,而 Flink 则无法进行回溯。
  • 计算快,一个周期内完成所有指标计算:需要在五分钟内将成百上千的所有维度的指标全部计算完成。
  • 支持实时流,分布式部署,运维简单:支持 Kafka 数据实时流。

Why ClickHouse so Fast

  1. ClickHouse 采用列式存储 +LZ4、ZSTD 数据压缩。
  2. 计算存储结合本地化+向量化执行。ClickHouse 计算存储本地化是指每一台计算机器存在本地 SSD 盘,只需要计算自己的数据,再进行节点合并。
  3. LSM merge tree+Index。将数据写入 ClickHouse 之后,会在后台开始一个线程将数据进行 merge,做 Index 索引。如建常见的 DT 索引和小时级数据索引,以提高查询性能。
  4. SIMD+LLVM 优化。SIMD 是单指令多数据集。
  5. SQL 语法及 UDF 完善。

总结:ClickHouse 对此有很大需求。在数据分析或者维度下拽时需要更高的特性,如时间窗口的一部分功能点。