使用 Flink Hudi 构建流式数据湖平台

作者阿里云代理 文章分类 分类:linux图文教程 阅读次数 已被围观 808

摘要:本文收拾自阿里巴巴技术专家陈玉兆 (玉兆)、阿里巴巴开发工程师刘大龙 (风离) 在 Flink Forward Asia 2021 的分享。首要内容包含:

  1. Apache Hudi 101
  2. Flink Hudi Integration
  3. Flink Hudi Use Case
  4. Apache Hudi Roadmap

FFA 2021 直播回放 & 讲演 PDF 下载

一、Apache Hudi 101

提到数据湖,大家都会有这样的疑问,什么是数据湖?为什么数据湖近两年热度很高?数据湖其实不是一个新的概念,最早的数据湖概念在 80 时代就现已提出,其时对数据湖的界说是原始数据层,能够寄存各种结构化、半结构化乃至非结构化的数据。像机器学习、实时剖析等很多场景都是在查询的时分确定数据的 Schema。

img

湖存储本钱低、灵活性高的特性,十分适用于做查询场景的中心化存储。伴随着近年来云服务的鼓起,尤其是方针存储的老练,越来越多的企业挑选在云上构建存储服务。数据湖的存算别离架构十分适合其时的云服务架构,经过快照阻隔的方法,供给根底的 acid 业务,一起支撑对接多种剖析引擎适配不同的查询场景,能够说湖存储在本钱和开放性上占了极大优势。

img

其时的湖存储现已开端承担数仓的功能,经过和核算引擎对接完成湖仓一体的架构。湖存储是一种 table format,在原有的 data format 根底上封装了 table 的高档语义。Hudi 从 2016 年开端将数据湖投入实践,其时是为了解决大数据场景下文件体系上的数据更新问题,Hudi 类 LSM 的 table format 其时在湖格局中是别出心裁的,对近实时更新比较友好,语义也相对完善。

Table format 是其时盛行的三种数据湖格局的根底特点,而 Hudi 从项目之初就一向朝着平台方向去演化,具有比较完善的数据治理和 table service,比方用户在写入的时分能够并发地优化文件的布局,metadata table 能够大幅优化写入时查询端的文件查找功率。

下面介绍一些 Hudi 的根底概念。

img

Timeline service 是 Hudi 业务层的中心笼统,Hudi 一切数据操作都是围绕着 timeline service 来展开的,每次操作经过 instant 笼统绑定一个特定的时刻戳,一连串的 instant 构成了 timeline service,每一个 instance 记录了对应的 action 和状况。经过 timeline service,Hudi 能够知道其时表操作的状况,经过一套文件体系视图的笼统结合 timeline service,能够对 table 其时的 reader 和 writer 暴露特定时刻戳下的文件布局视图。

img

file group 是 Hudi 在文件布局层的中心笼统,每一个 file group 相当于一个 bucket,经过文件巨细来来区分,它的每次写入行为都会发生一个新的版别,一个版别被笼统为一个 file slice,file slice 内部保护了相应版别的数据文件。当一个 file group 写入到规定的文件巨细的时分,就会切换一个新的 file group。

Hudi 在 file slice 的写入行为能够笼统成两种语义, copy on write 和 merge on read。

img

copy on write 每次都会写全量数据,新数据会和上一个 file slice 的数据 merge,然后再写一个新的 file slice,发生一个新的 bucket 的文件。

img

而 merge on read 则比较复杂一些,它的语义是追加写入,即每次只写增量数据,所以不会写新的 file slice。它首先会测验追加之前的 file slice,只有当该写入的 file slice 被归入压缩计划之后,才会切新的 file slice。

二、Flink Hudi Integration

img

Flink Hudi 的写入 pipeline 由几个算子构成。第一个算子担任将 table 层的 rowdata 转换成 Hudi 的音讯格局 HudiRecord。 接着经过一个 Bucket Assigner,它首要担任将现已转好的 HudiRecord 分配到特定的 file group 中,接着分好 file group 的 record 会流入 Writer 算子履行真实的文件写入。最终还有一个 coordinator,担任 Hudi table 层的 table service 调度以及新业务的发起和提交。此外,还有一些后台的整理角色担任整理老版别的数据。

img

其时的规划中,每一个 bucket assign task 都会持有一个 bucket assigner,它独立保护自己的一组 file group。在写入新数据或非更新 insert 数据的时分,bucket assign task 会扫描文件视图,优先将这一批新的数据写入到被判定为小 bucket 的 file group 里。

比方上图, file group 默认巨细是 120M,那么左图的 task1 会优先写到 file group1和 file group2,注意这儿不会写到 file group3,这是由于 file group3 现已有 100M 数据,关于比较挨近方针阈值的 bucket 不再写入能够防止过度写扩大。而右图中的 task2 会直接写一个新的 file group,不会去追加那些现已写的比较大的 file group 了。

img

接下来介绍 Flink Hudi 写流程的状况切换机制。 作业刚发动时,coordinator 会先测验去文件体系上新建这张表,假如其时表不存在,它就会去文件目录上写一些 meta 信息,也便是构建一个表。 收到一切 task 的初始化 meta 信息后,coordinator 会敞开一个新的 transaction,write task 看到 transaction 的发起后,就会解锁其时数据的 flush 行为。

Write Task 会先积累一批数据,这儿有两种 flush 策略,一种是其时的数据 buffer 达到了指定的巨细,就会把内存中的数据 flush 出去;另一种是当上游的 checkpoint barrier 抵达需求做快照的时分,会把一切内存中的数据 flush 到磁盘。每次 flush 数据之后都会把 meta 信息发送给 coordinator。coordinator 收到 checkpoint 的 success 事情后,会提交对应的业务,并且发起下一个新的业务。writer task 看到新业务后,又会解锁下一轮业务的写入。这样,整个写入流程就串起来了。

img

Flink Hudi Write 供给了十分丰富的写入场景。其时支撑对 log 数据类型的写入,即非更新的数据类型,一起支撑小文件兼并。别的关于 Hudi 的中心写入场景比方更新流、CDC 数据也都是 Hudi 重点支撑的。一起,Flink Hudi 还支撑历史数据的高功率批量导入,bucket insert 模式能够一次性将比方 Hive 中的离线数据或者数据库中的离线数据,经过批量查询的方法,高效导入 Hudi 格局中。别的,Flink Hudi 还供给了全量和增量的索引加载,用户能够一次性将批量数据高效导入湖格局,再经过对接流的写入程序,完成全量接增量的数据导入。

img

Flink Hudi read 端也支撑了十分丰富的查询视图,现在首要支撑的有全量读取、历史时刻 range 的增量读取以及流式读取。

img

上图是一段经过 Flink sql 写 Hudi 的比如,Hudi 支撑的 use case 十分丰富,也尽量简化了用户需求装备的参数。经过简略装备表 path、 并发以及 operation type,用户能够十分方便地将上游的数据写入到 Hudi 格局中。

三、Flink Hudi Use Case

下面介绍 Flink Hudi 的经典应用场景。

img

第一个经典场景是 DB 导入数据湖。现在 DB 数据导入数据湖有两种方法:能够经过 CDC connector 一次性将全量和增量数据导入到 Hudi 格局中;也能够经过消费 Kafka 上的 CDC changelog,经过 Flink 的 CDC format 将数据导入到 Hudi 格局。

img

第二个经典场景是流核算的 ETL (近实时的 olap 剖析)。经过对接上游流核算简略的一些 ETL,比方双流 join 或双流 join 接一个 agg,直接将改变流写入到 Hudi 格局中,然后下游的 read 端能够对接传统经典的 olap 引擎比方 presto、spark 来做端到端的近实时查询。

img

第三个经典场景和第二个有些相似, Hudi 支撑原生的 changelog,也便是支撑保存 Flink 核算中行级别的改变。基于这个才能,经过流读消费改变的方法,能够完成端到端的近实时的 ETL 出产。

img

未来,社区两个大版别首要的精力还是放在流读和流写方向,并且会加强流读的语义;别的在 catalog 和 metadata 方面会做自管理;咱们还会在近期推出一个 trino 原生的 connector 支撑,替代其时读 Hive 的方法,进步功率。

四、Apache Hudi Roadmap

下面是一个 MySql 到 Hudi 千表入湖的演示。

(刺进demo视频)

首先数据源这儿咱们准备了两个库,benchmark1 和 benchmark2,benchmark1 下面有 100 张表,benchmark2 下面有 1000 张表。由于千表入湖强依赖于 catalog,所以咱们首先要创立 catalog,关于数据源咱们要创立 MySql catalog,关于方针咱们要创立 Hudi catalog。MySql catalog 用于获取一切源表相关的信息,包含表结构、表的数据等。Hudi catalog 用于创立方针。

img

履行两条 sql 句子今后,两条 catalog 就创立成功了。

img

接下来到作业开发页面创立一个千表入湖的作业。只需求简略的 9 行 SQL,第一种语法是 create database as database,它的作用是把 MySql benchmark1 库下一切的表结构和表数据一键同步到 Hudi CDS demo 库,表的联系是一对一映射。第二条语法是 create table as table,它的作用是把 MySql benchmark2 库下一切匹配 sbtest. 正则表达式的表同步到 Hudi 的 DB1 下的 ctas_dema 表里面,是多对一的映射联系,会做分库分表的兼并。

接着咱们运行并上线,然后到作业运维的页面去发动作业,能够看到装备信息现已更新了,说明现已从头上线过。接着点击发动按钮,发动作业。然后就能够到作业总览页面检查作业相关的状况信息。

img

上图是作业的拓扑,十分复杂,有 1100 张源表和 101 张方针表。这儿咱们做了一些优化 —— source merge,把一切的表兼并到一个节点里,能够在增量 binlog 拉取阶段只拉取一次,减轻对 MySql 的压力。

img

接下来刷新 oss 页面,能够看到现已多了一个 cdas_demo 途径,进入 subtest1 途径,能够看到现已有元数据在写入,表明数据其实在写入过程中。

img

再到作业开发页面写一个简略的 SQL 查询某张表,来验证一下数据是否真的在写入。履行上图 SQL 句子,能够看到数据现已能够查询到,这些数据与刺进的数据是一致的。

咱们使用 catalog 供给的元数据才能,结合 CDS 和 CTS 语法,经过几行简略的 SQL,就能轻松完成几千张表的数据入湖,极大简化了数据入湖的流程,降低了开发运维的工作量。

本公司销售:阿里云新/老客户,只要购买阿里云,即可享受折上折优惠!>

我有话说: