Spark如何对源端数据做切分?

天天见闻 天天见闻 2022-08-27 科技 阅读: 207
摘要: 简介: 典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。 引言 典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。

简介: 典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。

引言

典型的Spark作业读取位于OSS的Parquet外表时,源端的并发度(task/partition)如何确定?特别是在做TPCH测试时有一些疑问,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行分析进而解答这些疑问。

分析

数据源读取对应的物理执行节点为FileSourceScanExec,读取数据代码块如下

lazy val inputRDD: RDD[InternalRow] = {

val readFile: (PartitionedFile) = Iterator[InternalRow] =

relation.fileFormat.buildReaderWithPartitionValues(

sparkSession = relation.sparkSession,

dataSchema = relation.dataSchema,

partitionSchema = relation.partitionSchema,

requiredSchema = requiredSchema,

filters = pushedDownFilters,

options = relation.options,

hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

val readRDD = if (bucketedScan) {

createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,

relation)

} else {

createReadRDD(readFile, dynamicallySelectedPartitions, relation)

sendDriverMetrics

readRDD

主要关注非bucket的处理,对于非bucket的扫描调用createReadRDD方法定义如下

* Create an RDD for non-bucketed reads.

* The bucketed variant of this function is [[createBucketedReadRDD]].

* @param readFile a function to read each (part of a) file.

* @param selectedPartitions Hive-style partition that are part of the read.

* @param fsRelation [[HadoopFsRelation]] associated with the read.

private def createReadRDD(

readFile: (PartitionedFile) = Iterator[InternalRow],

selectedPartitions: Array[PartitionDirectory],

fsRelation: HadoopFsRelation): RDD[InternalRow] = {

// 文件打开开销,每次打开文件最少需要读取的字节

val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes

// 最大切分分片大小

val maxSplitBytes =

FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)

logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +

s"open cost is considered as scanning $openCostInBytes bytes.")

// Filter files with bucket pruning if possible

val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled

val shouldProcess: Path = Boolean = optionalBucketSet match {

case Some(bucketSet) if bucketingEnabled =

// Do not prune the file if bucket file name is invalid

filePath = BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)

case _ =

_ = true

// 对分区下文件进行切分并按照从大到小进行排序

val splitFiles = selectedPartitions.flatMap { partition =

partition.files.flatMap { file =

// getPath is very expensive so we only want to call it once in this block:

val filePath = file.getPath

if (shouldProcess(filePath)) {

// 文件是否可split,parquet/orc/avro均可被split

val isSplitable = relation.fileFormat.isSplitable(

relation.sparkSession, relation.options, filePath)

// 切分文件

PartitionedFileUtil.splitFiles(

sparkSession = relation.sparkSession,

file = file,

filePath = filePath,

isSplitable = isSplitable,

maxSplitBytes = maxSplitBytes,

partitionValues = partition.values

} else {

Seq.empty

}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val partitions =

FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

可以看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件非常重要,其核心逻辑如下

def maxSplitBytes(

sparkSession: SparkSession,

selectedPartitions: Seq[PartitionDirectory]): Long = {

// 读取文件时打包成最大的partition大小,默认为128MB,对应一个block大小

val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes

// 打开每个文件的开销,默认为4MB

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes

// 建议的(不保证)最小分割文件分区数,默认未设置,从leafNodeDefaultParallelism获取

// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism - SparkContext#defaultParallelism

// - TaskSchedulerImpl#defaultParallelism - CoarseGrainedSchedulerBackend#defaultParallelism

// - 总共多少核max(executor core总和, 2),最少为2

val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum

.getOrElse(sparkSession.leafNodeDefaultParallelism)

// 总共读取的大小

val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum

// 单core读取的大小

val bytesPerCore = totalBytes / minPartitionNum

// 计算大小,不会超过设置的128MB

Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

对于PartitionedFileUtil#splitFiles,其核心逻辑如下,较为简单,直接按照最大切分大小切分大文件来进行分片

def splitFiles(

sparkSession: SparkSession,

file: FileStatus,

filePath: Path,

isSplitable: Boolean,

maxSplitBytes: Long,

partitionValues: InternalRow): Seq[PartitionedFile] = {

if (isSplitable) {

// 切分为多个分片

(0L until file.getLen by maxSplitBytes).map { offset =

val remaining = file.getLen - offset

val size = if (remaining maxSplitBytes) maxSplitBytes else remaining

val hosts = getBlockHosts(getBlockLocations(file), offset, size)

PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)

} else {

Seq(getPartitionedFile(file, filePath, partitionValues))

在获取到Seq[PartitionedFile]列表后,还并没有完成对文件的切分,还需要调用FilePartition#getFilePartitions做最后的处理,方法核心逻辑如下

def getFilePartitions(

sparkSession: SparkSession,

partitionedFiles: Seq[PartitionedFile],

maxSplitBytes: Long): Seq[FilePartition] = {

val partitions = new ArrayBuffer[FilePartition]

val currentFiles = new ArrayBuffer[PartitionedFile]

var currentSize = 0L

/** Close the current partition and move to the next. */

def closePartition: Unit = {

if (currentFiles.nonEmpty) {

// Copy to a new Array.

// 重新生成一个新的PartitionFile

val newPartition = FilePartition(partitions.size, currentFiles.toArray)

partitions += newPartition

currentFiles.clear

currentSize = 0

// 打开文件开销,默认为4MB

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes

// Assign files to partitions using "Next Fit Decreasing"

partitionedFiles.foreach { file =

if (currentSize + file.length maxSplitBytes) {

// 如果累加的文件大小大于的最大切分大小,则关闭该分区,表示完成一个Task读取的数据切分

closePartition

// Add the given file to the current partition.

currentSize += file.length + openCostInBytes

currentFiles += file

// 最后关闭一次分区,文件可能较小

closePartition

partitions.toSeq

可以看到经过这一步后,会把一些小文件做合并,生成maxSplitBytes大小的PartitionFile,这样可以避免拉起太多task读取太多小的文件。

生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度,也即最后Spark生成的Task个数

override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray

整体流程图如下图所示

拆分、合并过程如下图所示

实战

对于TPCH 10G生成的customer parquet表

共8个Parquet文件,总文件大小为113.918MB

Spark作业配置如下,executor只有1core

conf spark.driver.resourceSpec=small;

conf spark.executor.instances=1;

conf spark.executor.resourceSpec=small;

conf spark.app.name=Spark SQL Test;

conf spark.adb.connectors=oss;

use tpcd;

select * from customer order by C_CUSTKEY desc limit 100;

根据前面的公式计算

defaultMaxSplitBytes = 128MB

openCostInBytes = 4MB

minPartitionNum = max(1, 2) = 2

totalBytes = 113.918 + 8 * 4MB = 145.918MB

bytesPerCore = 145.918MB / 2 = 72.959MB

maxSplitBytes = 72.959MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

得到maxSplitBytes为72.959MB,从日志中也可看到对应大小

经过排序后的文件顺序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007),再次经过合并后得到3个FilePartitioned,分别对应

FilePartitioned 1: 00000, 00001, 00002

FilePartitioned 2: 00003, 00004, 00006

FilePartitioned 3: 00005, 00007

即总共会生成3个Task

从Spark UI查看确实生成3个Task

从日志查看也是生成3个Task

变更Spark作业配置,5个executor共10core

conf spark.driver.resourceSpec=small;

conf spark.executor.instances=5;

conf spark.executor.resourceSpec=medium;

conf spark.app.name=Spark SQL Test;

conf spark.adb.connectors=oss;

use tpcd;

select * from customer order by C_CUSTKEY desc limit 100;

根据前面的公式计算

defaultMaxSplitBytes = 128MB

openCostInBytes = 4MB

minPartitionNum = max(10, 2) = 10

totalBytes = 113.918 + 8 * 4MB = 145.918MB

bytesPerCore = 145.918MB / 10 = 14.5918MB

maxSplitBytes = 14.5918MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

查看日志

此时可以看到14.5918MB会对源文件进行切分,会对00001, 00002,00003,00004,00005,00006进行切分,切分成两份,00007由于小于14.5918MB,因此不会进行切分,经过PartitionedFileUtil#splitFiles后,总共存在7 * 2 + 1 = 15个PartitionedFile

00000(0 - 14.5918MB), 00000(14.5918MB - 15.698MB)

00001(0 - 14.5918MB), 00001(14.5918MB - 15.632MB)

00002(0 - 14.5918MB), 00002(14.5918MB - 15.629MB)

00003(0 - 14.5918MB), 00003(14.5918MB - 15.624MB)

00004(0 - 14.5918MB), 00004(14.5918MB - 15.617MB)

00005(0 - 14.5918MB), 00005(14.5918MB - 15.536MB)

00006(0 - 14.5918MB), 00006(14.5918MB - 15.539MB)

00007(0 - 4.634MB)

经过排序后得到如下以及合并后得到10个FilePartitioned,分别对应

FilePartitioned 1: 00000(0 - 14.5918MB)

FilePartitioned 2: 00001(0 - 14.5918MB)

FilePartitioned 3: 00002(0 - 14.5918MB)

FilePartitioned 4: 00003(0 - 14.5918MB)

FilePartitioned 5: 00004(0 - 14.5918MB)

FilePartitioned 6: 00005(0 - 14.5918MB)

FilePartitioned 7: 00006(0 - 14.5918MB)

FilePartitioned 8: 00007(0 - 4.634MB),00000(14.5918MB - 15.698MB)

FilePartitioned 9: 00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)

FilePartitioned 10: 00004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB)

即总共会生成10个Task

通过Spark UI也可查看到生成了10个Task

查看日志,000004(14.5918MB - 15.617MB),00005(14.5918MB - 15.536MB),00006(14.5918MB - 15.539MB)在同一个Task中

00007(0 - 4.634MB),00000(14.5918MB - 15.698MB)

00001(14.5918MB - 15.632MB),00002(14.5918MB - 15.629MB),00003(14.5918MB - 15.624MB)在同一个Task中

总结

通过源码可知Spark对于源端Partition切分,会考虑到分区下所有文件大小以及打开每个文件的开销,同时会涉及对大文件的切分以及小文件的合并,最后得到一个相对合理的Partition。

原文链接:/

本文为阿里云原创内容,未经允许不得。

其他相关
德国对谷歌数据处理条款提出异议:未给予用户充分选择权

德国对谷歌数据处理条款提出异议:未给予用户充分选择权

作者: 天天见闻 时间:2023-01-12 阅读: 153
天天科技讯 北京时间1月11日下午消息,据报道,德国反垄断机构“联邦卡特尔局”(FCO)今日表示,已对谷歌的数据处理条款提出异议,并预计该公司将做出相应调整。 联邦卡特尔局在一份声明中称,该机构已于12月23日向谷歌母公司Alphabet、谷歌爱尔兰有限公司和谷歌德国有限公司发出了一份初步的法律评估。 联邦卡特尔局表示,谷歌目前并没有向用户提供充分的选择权,让他们决定是否同意,以及在多大程度上同意谷歌处理其数据。...
全新TECNO SPARK 9 Pro 运动版完美演绎格调、速度与激情

全新TECNO SPARK 9 Pro 运动版完美演绎格调、速度与激情

作者: 天天见闻 时间:2022-10-18 阅读: 180
传音旗下中高端品牌TECNO近日推出全新的SPARK 9 Pro 运动版智能手机。这款手机由宝马集团旗下全球创意咨询公司Designworks设计,从颜色、材质和饰面对SPARK 9 Pro 运动版进行了创意灵感创作,将其经典的汽车设计标志元素和奢华格调,与SPARK的新潮时尚属性相融合,打造了一款属于年轻人的标志性“掌上明机”,全新演绎年轻人喜欢的格调、速度和激情。SPARK 9 Pro运动版的三角形纹理,也让它从寻常的大功率手机变成可与一流豪华轿车媲美的轻潮科技代表。...
网络数据处理安全要求

网络数据处理安全要求

作者: 天天见闻 时间:2022-08-27 阅读: 329
本文件规定了网络运营者开展网络数据收集、存储、使用、加工、传输、提供、公开等数据处理的安全技术与管理要求。本文件适用于网络运营者规范网络数据处理,以及监管部门、第三方评估机构对网络数据处理进行监督管理和评估。应建立数据安全管理责任和评价考核制度,制定数据安全保护计划,开展安全风险评估,及时处置安全事件,组织开展教育培训。...
字节跳动开源自研 Shuffle 框架——Cloud Shuffle Service

字节跳动开源自研 Shuffle 框架——Cloud Shuffle Service

作者: 天天见闻 时间:2022-08-27 阅读: 204
8 月 25 日,字节跳动宣布, 正式开源 Cloud Shuffle Service。 Cloud Shuffle Service(以下简称 CSS) 是字节自研的通用 Remote Shuffle Service 框架,支持 Spark/FlinkBatch/MapReduce 等计算引擎,提供了相比原生方案 稳定性更好、性能更高、更弹性的数据 Shuffle 能力,同时也 为存算分离 / 在离线混部等场景提供了 Remote Shuffle 解决方案。 目前,CSS 已在 Github 上开源,欢迎感兴趣的同学一起参与共建!...
电脑卡怎么办简单步骤 电脑慢怎么处理详细解决办法介绍

电脑卡怎么办简单步骤 电脑慢怎么处理详细解决办法介绍

作者: 天天见闻 时间:2022-02-23 阅读: 785
最近DIY市场对我们消费者不太友好,除开上年开始就贵的飞起的内存不谈,今年显卡涨价也涨的飞起,幸好CPU和SSD价格回落不少,让我们的装机成本不会高的离谱。但是许多消费者可能还是囊中羞涩,希望等到市场价格更合理的时候再装机,此时我们可能要继续面对那又卡又慢的旧电脑了。既然如此,今天笔者就介绍一些花小钱就能能让旧电脑告别卡顿的小技巧,让你的旧电脑新年也有新气象。...
字母的起源之 拉丁字母

字母的起源之 拉丁字母

作者: 天天见闻 时间:2022-02-24 阅读: 597
因为罗马人最初寓居于台伯河Tiber岸的拉丁姆Latium地区,所以他们所使用的字母被称为拉丁Latin字母,其语言也被称为拉丁语。罗马的军队和官吏将拉丁字母带到他们所征服的土地上,并推行拉丁语以取代当地的语言。拉丁字母也因为罗马帝国的影响,被欧洲各地纷纷采用。拉丁字母现在已经成为了世界上使用最广的官方文字,如今使用拉丁字母作为母语文字的语言在全世界约有30亿人。...
我来说两句

年度爆文