PowerJob 应对庞大任务的锦囊妙计:MapReduce



MapReduce 概念介绍

需求背景

示例用法
在 PowerJob 中,MapReduce 不再是高高在上、难以触碰的概念。得益于强大的底层实现和优雅的 API 设计,开发者仅需要寥寥数行代码便可完成大型任务的分布式计算,具体示例如下。
对于有分布式计算需求的任务,我们需要继承特定的抽象类 MapReduceProcessor 来开启分布式计算能力,该接口要求开发者实现两个方法,分别是 process 和 reduce。前者负责任务的具体执行,后者负责汇集所有子任务得出具体的结果。同时,该抽象类默认提供两个可用方法:isRootTask 和 map。通过调用 isRootTask 方法可以判断出当前 Task 是否为根任务,如果是根任务,则进行任务的切分(PowerJob 支持任意级 map,并不只有在根任务才能切分任务),然后调用 map 方法分发子任务。

下面放一段简单的代码示例帮助大家理解。下面这段代码模拟了目前市面上主流的“静态分片”式分布式处理,即通过控制台指定分片数量和参数(比如分3片,分片参数为:1=a&2=b&3=c)来控制参与计算的机器数量和起始参数。虽然是“杀鸡焉用牛刀”的示例,不过还是能帮助大家很好理解 PowerJob MapReduce 处理器的强大之处!
首先,我们通过 context 的 getJobParams 方法获取控制台配置的参数,即分片参数 1=a&2=b&3=c。这个分片参数代表现在需要有 3 台机器参与执行,每台机器上子任务的起始参数分别为 a、b、c。因此,我们可以根据该规则创建子任务对象 SubTask,传入分片索引 index 和 分片参数 params。
完成子任务的切分后,即可调用 map 方法完成任务的分发。
分发后该子任务会再次进入 process 方法,只不过本次是以 SubTask 而不是 RootTask 的身份进入。我们可以通过 context.getSubTask() 方法获取之前 map 出去的对象,该方法的返回值是 Object,因此我们需要使用 Java instaneof 关键字判断类型(当然,如果没有多级 map,那么该对象只可能是 SubTask 类型,直接强转即可),如果该对象为 SubTask 类型,即进行了子任务处理阶段,开始编写子任务处理逻辑即可。
当所有子任务执行完毕后,PowerJob 会调用 reduce 方法,传入所有子任务的运行结果,便于开发者构建该任务的最终结果。
@Componentpublic class StaticSliceProcessor extends MapReduceProcessor {@Overridepublic ProcessResult process(TaskContext context) throws Exception {OmsLogger omsLogger = context.getOmsLogger();// root task 负责分发任务if (isRootTask()) {// 从控制台传递分片参数,假设格式为KV:1=a&2=b&3=cString jobParams = context.getJobParams();Map<String, String> paramsMap = Splitter.on("&").withKeyValueSeparator("=").split(jobParams);List<SubTask> subTasks = Lists.newLinkedList();paramsMap.forEach((k, v) -> subTasks.add(new SubTask(Integer.parseInt(k), v)));return map(subTasks, "SLICE_TASK");}Object subTask = context.getSubTask();if (subTask instanceof SubTask) {// 实际处理// 当然,如果觉得 subTask 还是很大,也可以继续分发哦return new ProcessResult(true, "subTask:" + ((SubTask) subTask).getIndex() + " process successfully");}return new ProcessResult(false, "UNKNOWN BUG");}@Overridepublic ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {// 按需求做一些统计工作... 不需要的话,直接使用 Map 处理器即可return new ProcessResult(true, "xxxx");}@Getter@NoArgsConstructor@AllArgsConstructorprivate static class SubTask {private int index;private String params;}}

原理实现
PowerJob 的 MapReduce 思想主要来源于《Schedulerx2.0 分布式计算原理&最佳实践》这篇文章。
出于功能职责的划分(powerjob-server 仅负责任务的调度和运维),整个 MapReduce 任务的计算由执行器 powerjob-worker 自主完成。
为了便于模型的设计和功能的划分,PowerJob 为执行器节点分配了 3 种角色,分别是 TaskTracker、ProcessorTracker 和 Processor。
TaskTracker 是每一个任务的主节点,充当集群中的 master 角色,因此每个任务每次只会产生一个 TaskTracker。它负责子任务的分发、状态监控和集群中各执行节点的健康检查,并定期将任务的运行时信息上报给 server。 ProcessorTracker 是每一个执行器节点中负责执行器管理的角色,每个任务在每个执行器节点(JVM 实例)上都会产生一个 ProcessorTracker。它负责管理执行器节点任务的执行,包括接受来自 TaskTracker 的任务、上报本机任务执行情况和执行状态等功能。 Processor 是每一个执行器节点中负责具体执行任务的角色,也就是真正的执行单元,每个任务在每个执行器节点都会生成若干个 Processor(没错!就是控制台“实例并发数”所决定的数量)。它接受来自 ProcessorTracker 派发的执行任务并完成计算。

当需要执行分布式任务时,powerjob-server 会根据集群中各个 worker 节点的内存占用、CPU 使用率和磁盘使用率进行健康度计算,得分最高的节点将作为本次任务的 master 节点,即承担 TaskTracker 的职责。TaskTracker 在接收到来自 server 的任务执行请求时被创建,并完成三个阶段的初始化:
首先需要初始化内嵌的 H2 数据库,用于存储所有子任务的派发情况和执行情况。 存储就位后,TaskTracker 会根据 server 派发下来的任务内容,构建根任务,并将其持久化到内嵌数据库。 最后 TaskTracker 会创建一系列定时任务,包括子任务定时派发、子任务执行状态检查、worker 健康度检查和任务整体执行状态上报。



更多精彩推荐
☞程序员删库被判 6 年,公司损失近亿,云原生时代如何打造安全防线?
☞曾是谷歌程序员,抛下百万年薪创业,4 年成就 7 亿用户,今身价百亿!
☞首次在手机端不牺牲准确率实现BERT实时推理,比TensorFlow-Lite快近8倍,每帧只需45ms
☞比特币背后的技术,是否已成为科技领军代表?
点分享 点点赞 点在看
关注公众号:拾黑(shiheibook)了解更多
[广告]赞助链接:
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/
关注网络尖刀微信公众号随时掌握互联网精彩
赞助链接
排名
热点
搜索指数
- 1 中法友谊蕴山水 7904885
- 2 你以为的进口尖货 其实早已国产了 7808616
- 3 劲酒如何成了年轻女性的神仙水 7714072
- 4 盘点2025大国重器新突破 7617307
- 5 美国称将调整与中国经济关系 7520045
- 6 大雪吃三宝是指哪三宝 7426587
- 7 美军承认:击落美军战机 7334067
- 8 “两人挑一担 养活半栋楼” 7235406
- 9 黑龙江水库冰面下现13匹冰冻马 7141576
- 10 周末去哪玩?雪场“不打烊” 7041983











CSDN
