wwwjun 2019-10-20
大数据技术与架构点击右侧关注,大数据开发领域最强公众号!
暴走大数据点击右侧关注,暴走大数据!
作者:wwwzw
By 暴走大数据
场景描述:2019年10月16日,在荷兰阿姆斯特丹举行的 Spark+AI 欧洲峰会上,DataBricks 和 Linux 基金会联合宣布,开源项目 Delta Lake 正式成为 Linux 基金会的托管项目。
关键词:Delta Lake 数据湖
2019年10月16日,在荷兰阿姆斯特丹举行的 Spark+AI 欧洲峰会上,DataBricks 和 Linux 基金会联合宣布,开源项目 Delta Lake 正式成为 Linux 基金会的托管项目。
Delta Lake前世今生
2019年4月24日在美国旧金山召开的 Spark+AI Summit 2019 会上,Databricks 的联合创始人及 CEO Ali Ghodsi 宣布将 Databricks Runtime 里面的 Delta Lake 基于 Apache License 2.0 协议开源。
Delta Lake 是一个存储层,为 Apache Spark 和大数据 workloads 提供 ACID 事务能力,其通过写和快照隔离之间的乐观并发控制(optimistic concurrency control),在写入数据期间提供一致性的读取,从而为构建在 HDFS 和云存储上的数据湖(data lakes)带来可靠性。
Delta Lake 还提供内置数据版本控制,以便轻松回滚。目前 Delta Lake 项目地址为 https://delta.io/,代码维护地址 https://github.com/delta-io/delta。
Spark 做为一个计算引擎,应该无须质疑是当前大数据行业的领导者。而 Parquet 做为 Spark 的缺省数据存储格式,其实相当薄弱,缺少了太多关键特性,让Spark的用户不胜其扰,简直是Spark易用性的最大敌人!社区的抱怨可谓绵绵不绝,这种对于技术完美主义者,是无法容忍的!在这种背景下,Delta 开始了设计和实现。Databricks一年多前推出Delta之后,各位客户好评不断,但是只在有限的cloud上提供服务。这个实在无法满足那些大量部署Spark的整个社区!
于是乎,今年Spark Summit,使用Apache license 开源了!
Delta Lake的关键特性
现在很多公司内部数据架构中都存在数据湖,数据湖是一种大型数据存储库和处理引擎。它能够存储大量各种类型的数据,拥有强大的信息处理能力和处理几乎无限的并发任务或工作的能力,最早由 Pentaho 首席技术官詹姆斯迪克森在2011年的时候提出。虽然数据湖在数据范围方面迈出了一大步,但是也面临了很多问题,主要概括如下:
由于存在这些挑战,许多大数据项目无法实现其愿景,有时甚至完全失败。我们需要一种解决方案,使数据从业者能够利用他们现有的数据湖,同时确保数据质量。这就是 Delta Lake 产生的背景。Delta Lake 解决了上述问题,简化了数据湖构建。以下是 Delta Lake 提供的主要功能:
Delta Lake ACID 保证是建立在存储系统的原子性和持久性基础之上的。具体来说,该存储系统需要提供以下特性:
Delta Lake 仅在 HDFS 上提供所有这些保证。通过插件的方式加入 LogStore API 的自定义实现,可以使它与其他存储系统一起工作。
Delta Lake牛刀初试
官网提供了QuickStart方便我们快速学习。
创建一个Maven工程,加入以下依赖:
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.11</artifactId> <version>0.4.0</version> </dependency>
Create a table
创建一个 Delta 类型的表方法很简单,如下。
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; SparkSession spark = ... // create SparkSession Dataset<Row> data = data = spark.range(0, 5); data.write().format("delta").save("/tmp/delta-table");
然后我们到目录下看看:
➜ delta-table tree├── _delta_log│ └── 00000000000000000000.json├── part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet├── part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet├── part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet└── part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet
从上面的结果可以看到,在创建 Delta 表的时候,生成了一个 json 文件,这个文件也是 Delta 的 transaction log,也就是事务日志,所以的事务相关操作都会记录到这个日志中,可以做 replay 使用,后面研究源码的时候会深入分析,和若干 parquet 文件(Delta 底层使用的文件格式)。
Update table data
Dataset<Row> data = data = spark.range(5, 10); data.write().format("delta").mode("overwrite").save("/tmp/delta-table");
Read Data
scala> val df = spark.read.format("delta").load("/tmp/delta-table")df: org.apache.spark.sql.DataFrame = [id: bigint]scala> df.show()+---+| id|+---+| 8|| 9|| 5|| 7|| 6|+---+
Conditional update without overwrite相当于upsert
import io.delta.tables.*; import org.apache.spark.sql.functions; import java.util.HashMap; DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table"); // 所有偶数加100 deltaTable.update( functions.expr("id % 2 == 0"), new HashMap<String, Column>() {{ put("id", functions.expr("id + 100")); }} ); // 删除所有偶数 deltaTable.delete(condition = functions.expr("id % 2 == 0")); // 更新 Dataset<Row> newData = spark.range(0, 20).toDF(); deltaTable.as("oldData") .merge( newData.as("newData"), "oldData.id = newData.id") .whenMatched() .update( new HashMap<String, Column>() {{ put("id", functions.col("newData.id")); }}) .whenNotMatched() .insertExpr( new HashMap<String, Column>() {{ put("id", functions.col("newData.id")); }}) .execute(); deltaTable.toDF().show();
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧!