利用谷歌云Pub/Sub 实现多任务并行分发处理方案

背景

目前老梁团队负责的Global Data Integration Platform每天有大量文件需要从来自不同地区的上游下载文件并进行处理后再发送到不同下游。老梁的数据集成平台集群有6个服务器节点,老梁希望所有机器的资源都能利用上,提升大量文件并行处理能力,并且不同机器节点的任务必须不能重复,否则可能造成文件下载或处理失败。

原有的服务是使用Quarz集群,通过定时调度去下载,但是Quartz调度框架虽然本身支持负载均衡,但是其Cluster每个节点都不是均衡分配任务,假如某一节点具有竞争资源优势,有机会一直持有任务,导致其他节点空闲下来,服务器可能某天资源消耗过大而导致宕机,这并不是老梁想要的效果。后来也尝试使用生产者消费者模型,通过F5负载均衡+API通知+异步回调方式后,服务多节点并行处理能力有所增强,但由于使用Http方式进行通信导致服务之间存在直接依赖,当消费者服务进行重启或者停机,存在生产者API通知失败的可能,需要做额外的补偿处理。如下图所示:

生产者消费者模型:

解决思路

目前老梁公司已经完成了谷歌云和公司机房的网络搭建,并且公司的自有数据中心跟谷歌云可以直接通过谷歌的Dedicated Interconnect服务,也就是可以通过专线直接进行连接。虽然老梁的数据集成平台还部署在自有数据中心,但相对于文件下载的时间和速度损耗,谷歌云上的服务通过专线进行通信所带来的性能损耗几乎可以忽略(大约几百毫秒),老梁公司的架构战略方向是优先使用云组件,减少On-Premise部署。最后老梁选择采用谷歌云Pub/Sub服务作为事件消息服务,利用Pub/Sub高可用、使用简单并天然支持多消息并行传输的特性,来对现有的数据集成平台进行改造。

Pub/Sub介绍:
Pub/Sub 是一种设计为高度可靠且可伸缩的异步消息传递服务。该服务以十多年来许多 Google 产品都在依赖的核心 Google 基础架构组件为基础而构建。其实可以理解成云上的Kafka。官网:https://cloud.google.com/pubsub/architecture?hl=zh-cn

  • Pub/Sub 是一种可扩缩的异步消息传递服务,可将生成消息的服务与处理这些消息的服务分离开来。
  • Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。
  • Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处理任务的队列,它都非常有效。
  • 通过 Pub/Sub,您可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者通过广播事件而不是同步远程过程调用 (RPC) 与订阅者异步通信。
  • 发布者将事件发送到 Pub/Sub 服务,而不考虑如何或何时处理这些事件。然后,Pub/Sub 会将事件传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的灵活性和稳健性。

** 基于Pub/Sub改造后的模型: **
各个消费者节点所拿到的事件都不会重复

大概实施方案

这里只使用模拟场景展示大概思路,具体细节还需要根据各自项目进行优化。

注意事项:

  1. 首先你要创建你应用要使用的TopicSubscription,这里需要注意的是SubscriptionACK截止时间建议设置大点,否者假如你消费者如果消费事件所消耗的时间>ACK截止时间,Pub/Sub将会对消息进行重发,这时候会存在重复事件消息。也就是说,你要确保你的消费节点能在ACK截止时间之前处理好事件并且响应ACKPub/SUb
  2. 建议你服务使用Pull方式从Pub/SubSubscription拉取消息,因为这样可以在你Consumer代码里自由配置你请求所需要的参数,例如setMaxMessages方法可以让你自由定义你每次拉取多少事件,更好地基于你服务器的能力去配置,并且也可以避免在做负载均衡的时候某些机器节点所拿到的任务事件太多导致服务器节点的资源没办法充分利用。

  3. 使用Pub/Sub的自定义Event(事件)必须要自定义一个唯一标识,这样可以在Consumer逻辑加上幂等控制,否则当刚好消费者没有及时处理事件而Pub/Sub因为消费者ACK超时进行补偿重发,这可能会因为重复处理事件给业务带来严重后果。GCP Pub/Sub采用的是至少一次投递的策略,也就是可能对同一消息投递多次,虽然实际应用中不常见,以下官方文档说明了会重复投递的情况,通常就是上面所说的ACK超时导致的

完成流程

这里只截取小部分文件下载的流程作为示范,其他类似需要并行处理的任务都可以参考。

  1. File Watch Dog 从上游远程服务器基于File Pattern去监测有没有新文件
  2. File Watch Dog 把监测到的新文件信息组装成事件分别推送到GCP Pub/Sub Topic
  3. File Process Engine所有节点并行从GCP Pub/Sub Subscription拉取任务,分别拉到不同的事件消息
  4. File Process Engine所有节点分别基于事件消息里的DatafeedId去配置中心查找该Datafeed的连接信息
  5. File Process Engine所有节点分别去上游远程服务器下载自己接收到的事件对应的文件

简单测试

这里使用官方提供的示例代码,简单测试下发布多个消息,看看消费者代码是否会重复消费相同事件。
参考示例:https://cloud.google.com/pubsub/docs/pull#java

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Received MessageId: " + message.getMessageId()+"Data: " + message.getData().toStringUtf8());
          consumer.ack();
          System.out.println("Message has been acknowledge")
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

这里我在Topic发布了5条带有序号的消息,分别是:test:1test:2test:3test:4test:5,然后开了三个进程去监听Subscription,看看会不会每个进程会不会出现重复的消息

进程1

进程2

进程3