侧边栏壁纸
  • 累计撰写 307 篇文章
  • 累计阅读 104.3万

Spark学习笔记

TOTC
2018-08-11 / 735 阅读 / 正在检测是否收录...

Spark Streaming

KafkaUtils.createStream:PUSH,Kafka高级API,数据漏处理或者多处理状况,主题分区与RDD的分区不相关,可开启WAL(数据复制两次)。

KafkaUtils.createDirectStream:PULL,Kafka低级API,Kafka和RDD分区之间有一对一的映射关系,不会更新Zookeeper中的偏移量。

内存管理

spark on yarn (yarn-cluster)

Resource Manager接收到申请后在集群中选择一个Node Manager分配Container,并在Container中启动ApplicationMaster进程,在ApplicationMaster中初始化SparkContext,生成一系列task,ApplicationMaster向Resource Manager申请资源后通知Node Manager在获得的Container中启动Excutor进程,SparkContext分配task给Excutor,Excutor发送运行状态给Driver。

一个Container对应一个JVM进程,也就是一个executor,所以JVM的Heap Size取决于spark.executor.memory。

堆内存90%以上作为安全空间,如果内存大,可以调高95%。

缓存空间是60%(Heep 90%60%)(safetyFraction 和 memoryFraction) ,会负责存储 Persist、Unroll 以及 Broadcast 的数据。

Unroll序列化空间是20%(Heep 90%60%*20%)。

shuffle空间的安全比例是80%,spark.shuffle.memeoryFraction 0.2(Heep 80% 20%)。

Spark1.6之后 联合内存 加入Heap 4G

预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。默认是300M,Java Heap大小至少为*1.5=450M。

用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。(Heap-300M)*25%=949M,每个 Executor 分配 1G 的数据就会OOM。

Spark Memory, (Heap-300M)*75%,各50%,动态占用机制。

Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据。

Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据。

Storage占用对方内存可能被淘汰,如果没有再磁盘存储会丢失(storage_level ),Execution占用对方内存只能等释放。

先借用,再溢写到磁盘,内存优先。

启用静态内存管理的方式是:

spark.memory.useLegacyMode true

RDD有多少分区就有多少task,因为一个task只能处理一个partition上的数据。

Spark算子分为transform、action,只有action算子才触发计算(延迟计算)如:countbykey、reduce、count、take(n)、foreach、collect。一个action算子提交一个job、一个job包含一个或者多个stage、stage是根据RDD宽依赖(一个RDD分到两个不同的子RDD)、窄依赖(一个子RDD可以依赖多个父RDD)划分的。Excuotor(包含一个或者多个task,每个task一个虚拟core),Excuotor、task数量可以在submit中设置。总task数量一般设置成总core数的2-3倍,因为有的task可能先执行完。

map一条记录变一条记录,function函数返回Object;faltmap一条记录变多条记录,function函数返回Iterable<Object>迭代器。

join操作时,可以将reduce join转换成map join,并广播大变量。

SHUFFLE

有hashshuffle、sortshuffle、钨丝shuffle,后者会进行排序,一个task一个文件,钨丝shuffle效果跟sort差不多, 使用了自己实现的一套内存管理机制,性能上有很大的提升。hashshuffle,map端写文件时每个task都会创建下一个stage总task数量的文件,可以设置合并,这样,每个excutor中的task就会公用一批文件,先往内存缓存写,再溢出到磁盘,调整大一些可以减少io次数,reduce端拉取文件时有buffer缓冲区,每次都只能拉取与buffer缓冲相同大小的数据。

调优

Spark优化还包括,设置kyro序列化方式 ,性能更高、调整RDD持久化内存比例、调整shuffle时reduce端拉取数据重试次数,等待时长(因为在JVM full gc时是stop the world,多尝试几次)、数据本地化等待时长。

32

评论

博主关闭了所有页面的评论