Spark中scala parallelizee函数和makeRDD函数的区别

主题信息(必填)
主题描述(最多限制在50个字符)
申请人信息(必填)
申请信息已提交审核,请注意查收邮件,我们会尽快给您反馈。
如有疑问,请联系
CSDN &《程序员》研发主编,投稿&纠错等事宜请致邮
你只管努力,剩下的交给时光!
如今的编程是一场程序员和上帝的竞赛,程序员要开发出更大更好、傻瓜都会用到软件。而上帝在努力创造出更大更傻的傻瓜。目前为止,上帝是赢的。个人网站:。个人QQ群:、
个人大数据技术博客:
Spark是基于内存的分布式计算引擎,以处理的高效和稳定著称。然而在实际的应用开发过程中,开发者还是会遇到种种问题,其中一大类就是和性能相关。在本文中,笔者将结合自身实践,谈谈如何尽可能地提高应用程序性能。分布式计算引擎在调优方面有四个主要关注方向,分别是CPU、内存、网络开销和I/O,其具体调优目标如下:
提高CPU利用率。
降低网络开销。
减少I/O操作。
第1章 数据倾斜数据倾斜意味着某一个或某几个Partition中的数据量特别的大,这意味着完成针对这几个Partition的计算需要耗费相当长的时间。如果大量数据集中到某一个Partition,那么这个Partition在计算的时候就会成为瓶颈。图1是Spark应用程序执行并发的示意图,在Spark中,同一个应用程序的不同Stage是串行执行的,而同一Stage中的不同Task可以并发执行,Task数目由Partition数来决定,如果某一个Partition的数据量特别大,则相应的task完成时间会特别长,由此导致接下来的Stage无法开始,整个Job完成的时间就会非常长。要避免数据倾斜的出现,一种方法就是选择合适的key,或者是自己定义相关的partitioner。在Spark中Block使用了ByteBuffer来存储数据,而ByteBuffer能够存储的最大数据量不超过2GB。如果某一个key有大量的数据,那么在调用cache或persist函数时就会碰到spark-1476这个异常。下面列出的这些API会导致Shuffle操作,是数据倾斜可能发生的关键点所在
groupByKey
reduceByKey
aggregateByKey
repartition
10. repartitionAndSortWithinPartitions
图1: Spark任务并发模型
def rdd: RDD[T]
// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class DemoPairRDD[K &% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
// Here we use a single Long to try to ensure the sort is balanced,
// but for really large dataset, we may want to consider
// using a tuple of many Longs or even a GUID
def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
rdd.map(kv =& ((kv._1, Random.nextLong()), kv._2)).sortByKey()
.grouped(numPartitions).map(t =& (t._1._1, t._2))
case class DemoRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
def grouped(size: Int): RDD[T] = {
// TODO Version where withIndex is cached
val withIndex = rdd.mapPartitions(_.zipWithIndex)
val startValues =
withIndex.mapPartitionsWithIndex((i, iter) =&
Iterator((i, iter.toIterable.last))).toArray().toList
.sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)
withIndex.mapPartitionsWithIndex((i, iter) =& iter.map {
case (value, index) =& (startValues(i) + index.toLong, value)
.partitionBy(new Partitioner {
def numPartitions: Int = size
def getPartition(key: Any): Int =
(key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
.map(_._2)
}定义隐式的转换
implicit def toDemoRDD[T: ClassManifest](rdd: RDD[T]): DemoRDD[T] =
new DemoRDD[T](rdd)
implicit def toDemoPairRDD[K &% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]): DemoPairRDD[K, V] = DemoPairRDD(rdd)
implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}在spark-shell中就可以使用了import RDDConversions._
yourRdd.grouped(5)第2章
减少网络通信开销Spark的Shuffle过程非常消耗资源,Shuffle过程意味着在相应的计算节点,要先将计算结果存储到磁盘,后续的Stage需要将上一个Stage的结果再次读入。数据的写入和读取意味着Disk I/O操作,与内存操作相比,Disk I/O操作是非常低效的。使用iostat来查看disk i/o的使用情况,disk i/o操作频繁一般会伴随着cpu load很高。如果数据和计算节点都在同一台机器上,那么可以避免网络开销,否则还要加上相应的网络开销。 使用iftop来查看网络带宽使用情况,看哪几个节点之间有大量的网络传输。
图2是Spark节点间数据传输的示意图,Spark Task的计算函数是通过Akka通道由Driver发送到Executor上,而Shuffle的数据则是通过Netty网络接口来实现。由于Akka通道中参数spark.akka.framesize决定了能够传输消息的最大值,所以应该避免在Spark Task中引入超大的局部变量。图2: Spark节点间的数据传输第1节 选择合适的并发数为了提高Spark应用程序的效率,尽可能的提升CPU的利用率。并发数应该是可用CPU物理核数的两倍。在这里,并发数过低,CPU得不到充分的利用,并发数过大,由于spark是每一个task都要分发到计算结点,所以任务启动的开销会上升。并发数的修改,通过配置参数来改变spark.default.parallelism,如果是sql的话,可能通过修改spark.sql.shuffle.partitions来修改。第1项
Repartition vs. Coalescerepartition和coalesce都能实现数据分区的动态调整,但需要注意的是repartition会导致shuffle操作,而coalesce不会。第2节
reduceByKey vs. groupBygroupBy操作应该尽可能的避免,第一是有可能造成大量的网络开销,第二是可能导致OOM。以WordCount为例来演示reduceByKey和groupBy的差异reduceByKey
sc.textFile(“README.md”).map(l=&l.split(“,”)).map(w=&(w,1)).reduceByKey(_ + _)
图3:reduceByKey的Shuffle过程Shuffle过程如图2所示groupByKey
sc.textFile(“README.md”).map(l=&l.split(“,”)).map(w=&(w,1)).groupByKey.map(r=&(r._1,r._2.sum))
图4:groupByKey的Shuffle过程建议: 尽可能使用reduceByKey, aggregateByKey, foldByKey和combineByKey
假设有一RDD如下所示,求每个key的均值val data = sc.parallelize( List((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )方法一:reduceByKeydata.map(r=&(r._1, (r.2,1))).reduceByKey((a,b)=&(a._1 + b._1, a._2 + b._2)).map(r=&(r._1,(r._2._1/r._2._2)).foreach(println)方法二:combineByKeybineByKey(value=&(value,1),
(x:(Double, Int), value:Double)=& (x._1+value, x._2 + 1),
(x:(Double,Int), y:(Double, Int))=&(x._1 + y._1, x._2 + y._2))第3节
BroadcastHashJoin vs. ShuffleHashJoin在Join过程中,经常会遇到大表和小表的join. 为了提高效率可以使用BroadcastHashJoin, 预先将小表的内容广播到各个Executor, 这样将避免针对小表的Shuffle过程,从而极大的提高运行效率。其实BroadCastHashJoin核心就是利用了BroadCast函数,如果理解清楚broadcast的优点,就能比较好的明白BroadcastHashJoin的优势所在。以下是一个简单使用broadcast的示例程序。val lst = 1 to 100 toList
val exampleRDD = sc.makeRDD(1 to 20 toSeq, 2)
val broadcastLst = sc.broadcast(lst)
exampleRDD.filter(i=&broadcastLst.valuecontains(i)).collect.foreach(println)第4节
map vs. mapPartitions有时需要将计算结果存储到外部数据库,势必会建立到外部数据库的连接。应该尽可能的让更多的元素共享同一个数据连接而不是每一个元素的处理时都去建立数据库连接。
在这种情况下,mapPartitions和foreachPartitons将比map操作高效的多。第5节
数据就地读取移动计算的开销远远低于移动数据的开销。Spark中每个Task都需要相应的输入数据,因此输入数据的位置对于Task的性能变得很重要。按照数据获取的速度来区分,由快到慢分别是:1.PROCESS_LOCAL
2.NODE_LOCAL
3.RACK_LOCALSpark在Task执行的时候会尽优先考虑最快的数据获取方式,如果想尽可能的在更多的机器上启动Task,那么可以通过调低spark.locality.wait的值来实现, 默认值是3s。除了HDFS,Spark能够支持的数据源越来越多,如Cassandra, HBase,MongoDB等知名的NoSQL数据库,随着Elasticsearch的日渐兴起,spark和elasticsearch组合起来提供高速的查询解决方案也成为一种有益的尝试。上述提到的外部数据源面临的一个相同问题就是如何让spark快速读取其中的数据, 尽可能的将计算结点和数据结点部署在一起是达到该目标的基本方法,比如在部署Hadoop集群的时候,可以将HDFS的DataNode和Spark Worker共享一台机器。以cassandra为例,如果Spark的部署和Cassandra的机器有部分重叠,那么在读取Cassandra中数据的时候,通过调低spark.locality.wait就可以在没有部署Cassandra的机器上启动Spark Task。对于Cassandra, 可以在部署Cassandra的机器上部署Spark Worker,需要注意的是Cassandra的compaction操作会极大的消耗CPU,因此在为Spark Worker配置CPU核数时,需要将这些因素综合在一起进行考虑。这一部分的代码逻辑可以参考源码TaskSetManager::addPendingTaskprivate def addPendingTask(index: Int, readding: Boolean = false) {
def addTo(list: ArrayBuffer[Int]) {
if (!readding || !list.contains(index)) {
list += index
for (loc &- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =&
addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
case e: HDFSCacheTaskLocation =& {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =& {
for (e &- set) {
addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None =& logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
case _ =& Unit
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack &- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
if (!readding) {
allPendingTasks += index
}如果准备让spark支持新的存储源,进而开发相应的RDD,与位置相关的部分就是自定义getPreferredLocations函数,以elasticsearch-hadoop中的EsRDD为例,其代码实现如下。override def getPreferredLocations(split: Partition): Seq[String] = {
val esSplit = split.asInstanceOf[EsPartition]
val ip = esSplit.esPartition.nodeIp
if (ip != null) Seq(ip) else Nil
序列化使用好的序列化算法能够提高运行速度,同时能够减少内存的使用。Spark在Shuffle的时候要将数据先存储到磁盘中,存储的内容是经过序列化的。序列化的过程牵涉到两大基本考虑的因素,一是序列化的速度,二是序列化后内容所占用的大小。kryoSerializer与默认的javaSerializer相比,在序列化速度和序列化结果的大小方面都具有极大的优势。所以建议在应用程序配置中使用KryoSerializer.spark.serializer
org.apache.spark.serializer.KryoSerializer默认的cache没有对缓存的对象进行序列化,使用的StorageLevel是MEMORY_ONLY,这意味着要占用比较大的内存。可以通过指定persist中的参数来对缓存内容进行序列化。exampleRDD.persist(MEMORY_ONLY_SER)需要特别指出的是persist函数是等到job执行的时候才会将数据缓存起来,属于延迟执行; 而unpersist函数则是立即执行,缓存会被立即清除。
作者简介:许鹏, 《Apache Spark源码剖析》作者,关注于大数据实时搜索和实时流数据处理,对elasticsearch, storm及drools多有研究,现就职于携程。Spark算子使用示例 - CSDN博客
Spark算子使用示例
1. 算子分类
从大方向来说,Spark 算子大致可以分为以下两类
Transformation:操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
Action:会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
从小方向来说,Spark 算子大致可以分为以下三类:
Value数据类型的Transformation算子。
Key-Value数据类型的Transfromation算子。
Action算子
1.1 Value数据类型的Transformation算子
输入分区与输出分区一对一型
map、flatMap、mapPartitions、glom
输入分区与输出分区多对一型
union、cartesian
输入分区与输出分区多对多型
输出分区为输入分区子集型
filter、distinct、subtract、sample、takeSample
cache、persist
1.2 Key-Value数据类型的Transfromation算子
输入分区与输出分区一对一
combineByKey、reduceByKey、partitionBy
两个RDD聚集
join、leftOutJoin、rightOutJoin
1.3 Action算子
saveAsTextFile、saveAsObjectFile
Scala集合和数据类型
collect、collectAsMap、reduceByKeyLocally、lookup、count、top、reduce、fold、aggregate
2. Transformation
2.1.1 概述
语法(scala):
def map[U: ClassTag](f: T =& U): RDD[U]
将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素
2.1.2 Java示例
* map和foreach算子:
1. 循环map调用元的每一个元素;
2. 执行call函数, 并返回.
private static void map() {
SparkConf conf = new SparkConf().setAppName(JavaOperatorDemo.class.getSimpleName())
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List&String& datas = Arrays.asList(
"{'id':1,'name':'xl1','pwd':'xl123','sex':2}",
"{'id':2,'name':'xl2','pwd':'xl123','sex':1}",
"{'id':3,'name':'xl3','pwd':'xl123','sex':2}");
JavaRDD&String& datasRDD = sc.parallelize(datas);
JavaRDD&User& mapRDD = datasRDD.map(
new Function&String, User&() {
public User call(String v) throws Exception {
Gson gson = new Gson();
return gson.fromJson(v, User.class);
mapRDD.foreach(new VoidFunction&User&() {
public void call(User user) throws Exception {
System.out.println("id: " + user.id
+ " name: " + user.name
+ " pwd: " + user.pwd
+ " sex:" + user.sex);
sc.close();
id: 1 name: xl1 pwd: xl123 sex:2
id: 2 name: xl2 pwd: xl123 sex:1
id: 3 name: xl3 pwd: xl123 sex:2
2.1.3 Scala示例
private def map() {
val conf = new SparkConf().setAppName(ScalaOperatorDemo.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf)
val datas: Array[String] = Array(
"{'id':1,'name':'xl1','pwd':'xl123','sex':2}",
"{'id':2,'name':'xl2','pwd':'xl123','sex':1}",
"{'id':3,'name':'xl3','pwd':'xl123','sex':2}")
sc.parallelize(datas)
.map(v =& {
new Gson().fromJson(v, classOf[User])
.foreach(user =& {
println("id: " + user.id
+ " name: " + user.name
+ " pwd: " + user.pwd
+ " sex:" + user.sex)
2.2 filter
2.2.1 概述
语法(scala):
def filter(f: T =& Boolean): RDD[T]
对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回为false的将过滤掉
2.2.2 Java示例
static void filter() {
SparkConf conf = new SparkConf().setAppName(JavaOperatorDemo.class.getSimpleName())
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List&Integer& datas = Arrays.asList(1, 2, 3, 7, 4, 5, 8);
JavaRDD&Integer& rddData = sc.parallelize(datas);
JavaRDD&Integer& filterRDD = rddData.filter(
new Function&Integer, Boolean&() {
public Boolean call(Integer v) throws Exception {
return v &= 3;
filterRDD.foreach(
new VoidFunction&Integer&() {
public void call(Integer integer) throws Exception {
System.out.println(integer);
sc.close();
2.2.3 Scala示例
def filter {
val conf = new SparkConf().setAppName(ScalaOperatorDemo.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf)
val datas = Array(1, 2, 3, 7, 4, 5, 8)
sc.parallelize(datas)
.filter(v =& v &= 3)
.foreach(println)
2.3 flatMap
2.3.1 简述
语法(scala):
def flatMap[U: ClassTag](f: T =& TraversableOnce[U]): RDD[U]
与map类似,但每个输入的RDD成员可以产生0或多个输出成员
2.3.2 Java示例
static void flatMap() {
SparkConf conf = new SparkConf().setAppName(JavaOperatorDemo.class.getSimpleName())
.setMaster("local")
JavaSparkContext sc = new JavaSparkContext(conf)
List&String& data = Arrays.asList(
"aa,bb,cc",
"cxf,spring,struts2",
"java,C++,javaScript")
JavaRDD&String& rddData = sc.parallelize(data)
JavaRDD&String& flatMapData = rddData.flatMap(
v -& Arrays.asList(v.split(",")).iterator()
new FlatMapFunction&String, String&() {
public Iterator&String& call(String t) throws Exception {
List&String& list= Arrays.asList(t.split(","))
return list.iterator()
flatMapData.foreach(v -& System.out.println(v))
sc.close()
javaScript
2.3.3 Scala示例
sc.parallelize(datas)
.flatMap(line =& line.split(","))
.foreach(println)
2.4 mapPartitions
2.4.1 概述
语法(scala):
def mapPartitions[U: ClassTag](
f: Iterator[T] =& Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
与Map类似,但map中的func作用的是RDD中的每个元素,而mapPartitions中的func作用的对象是RDD的一整个分区。所以func的类型是Iterator&T& =& Iterator&U&,其中T是输入RDD元素的类型。preservesPartitioning表示是否保留输入函数的partitioner,默认false。
2.4.2 Java示例
static void mapPartitions() {
SparkConf conf = new SparkConf().setAppName(JavaOperatorDemo.class.getSimpleName())
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List&String& names = Arrays.asList("张三1", "李四1", "王五1", "张三2", "李四2",
"王五2", "张三3", "李四3", "王五3", "张三4");
JavaRDD&String& namesRDD = sc.parallelize(names, 3);
JavaRDD&String& mapPartitionsRDD = namesRDD.mapPartitions(
new FlatMapFunction&Iterator&String&, String&() {
int count = 0;
public Iterator&String& call(Iterator&String& stringIterator) throws Exception {
List&String& list = new ArrayList&String&();
while (stringIterator.hasNext()) {
list.add("分区索引:" + count++ + "\t" + stringIterator.next());
return list.iterator();
List&String& result = mapPartitionsRDD.collect();
result.forEach(System.out);
sc.close();
分区索引:0
分区索引:1
分区索引:2
分区索引:0
分区索引:1
分区索引:2
分区索引:0
分区索引:1
分区索引:2
分区索引:3
2.4.3 Scala示例
sc.parallelize(datas, 3)
.mapPartitions(
val result = ArrayBuffer[String]()
while (n.hasNext) {
result.append(n.next())
result.iterator
.foreach(println)
2.5 mapPartitionsWithIndex
2.5.1 概述
语法(scala):
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) =& Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
与mapPartitions类似,但输入会多提供一个整数表示分区的编号,所以func的类型是(Int, Iterator&T&) =& Iterator&R&,多了一个Int
2.5.2 Java示例
private static void mapPartitionsWithIndex() {
SparkConf conf = new SparkConf().setAppName(JavaOperatorDemo.class.getSimpleName())
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List&String& names = Arrays.asList("张三1", "李四1", "王五1", "张三2", "李四2",
"王五2", "张三3", "李四3", "王五3", "张三4");
JavaRDD&String& namesRDD = sc.parallelize(names, 3);
JavaRDD&String& mapPartitionsWithIndexRDD = namesRDD.mapPartitionsWithIndex(
new Function2&Integer, Iterator&String&, Iterator&String&&() {
private static final long serialVersionUID = 1L;
public Iterator&String& call(Integer v1, Iterator&String& v2) throws Exception {
List&String& list = new ArrayList&String&();
while (v2.hasNext()) {
list.add("分区索引:" + v1 + "\t" + v2.next());
return list.iterator();
List&String& result = mapPartitionsWithIndexRDD.collect();
result.forEach(System.out);
sc.close();
分区索引:0
分区索引:0
分区索引:0
分区索引:1
分区索引:1
分区索引:1
分区索引:2
分区索引:2
分区索引:2
分区索引:2
2.5.3 Scala示例
sc.parallelize(datas, 3)
.mapPartitionsWithIndex(
(m, n) =& {
val result = ArrayBuffer[String]()
while (n.hasNext) {
result.append("分区索引:" + m + "\t" + n.next())
result.iterator
.foreach(println)
2.6 sample
2.6.1 概述
语法(scala):
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
对RDD进行抽样,其中参数withReplacement为true时表示抽样之后还放回,可以被多次抽样,false表示不放回;fraction表示抽样比例;seed为随机数种子,比如当前时间戳
2.6.2 Java示例
static void sample() {
SparkConf conf = new SparkConf().setAppName(JavaOperatorDemo.class.getSimpleName())
.setMaster("local")
JavaSparkContext sc = new JavaSparkContext(conf)
List&Integer& datas = Arrays.asList(1, 2, 3, 7, 4, 5, 8)
JavaRDD&Integer& dataRDD = sc.parallelize(datas)
JavaRDD&Integer& sampleRDD = dataRDD.sample(false, 0.5, System.currentTimeMillis())
sampleRDD.foreach(v -& System.out.println(v))
sc.close()
2.6.3 Scala示例
sc.parallelize(datas)
.sample(withReplacement = false, 0.5, System.currentTimeMillis)
.foreach(println)
2.7.1 概述
语法(scala):
def union(other: RDD[T]): RDD[T]
合并两个RDD,不去重,要求两个RDD中的元素类型一致
2.7.2 Java示例
static void union() {
SparkConf conf = new SparkConf().setAppName(JavaOperatorDemo.class.getSimpleName())
.setMaster("local")
JavaSparkContext sc = new JavaSparkContext(conf)
List&String& datas1 = Arrays.asList("张三", "李四")
List&String& datas2 = Arrays.asList("tom", "gim")
JavaRDD&String& data1RDD = sc.parallelize(datas1)
JavaRDD&String& data2RDD = sc.parallelize(datas2)
JavaRDD&String& unionRDD = data1RDD
.union(data2RDD)
unionRDD.foreach(v -& System.out.println(v))
sc.close()
2.7.3 Scala示例
// sc.parallelize(datas1)
.union(sc.parallelize(datas2))
.foreach(println)
(sc.parallelize(datas1) ++ sc.parallelize(datas2))
.foreach(println)
2.8 intersection
2.8.1 概述
语法(scala):
def intersection(other: RDD[T]): RDD[T]
返回两个RDD的交集
2.8.2 Java示例
static void intersection(JavaSparkContext sc) {
List&String& datas1 = Arrays.asList("张三", "李四", "tom")
List&String& datas2 = Arrays.asList("tom", "gim")
sc.parallelize(datas1)
.intersection(sc.parallelize(datas2))
.foreach(v -& System.out.println(v))
2.8.3 Scala示例
sc.parallelize(datas1)
.intersection(sc.parallelize(datas2))
.foreach(println)
2.9 distinct
2.9.1 概述
语法(scala):
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
def distinct(): RDD[T]
对原RDD进行去重操作,返回RDD中没有重复的成员
2.9.2 Java示例
static void distinct(JavaSparkContext sc) {
List&String& datas = Arrays.asList("张三", "李四", "tom", "张三")
sc.parallelize(datas)
.distinct()
.foreach(v -& System.out.println(v))
2.9.3 Scala示例
sc.parallelize(datas)
.distinct()
.foreach(println)
2.10 groupByKey
2.10.1 概述
语法(scala):
def groupBy[K](f: T =& K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](
f: T =& K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T =& K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(): RDD[(K, Iterable[V])]
对&key, value&结构的RDD进行类似RMDB的group by聚合操作,具有相同key的RDD成员的value会被聚合在一起,返回的RDD的结构是(key, Iterator&value&)
2.10.2 Java示例
static void groupBy(JavaSparkContext sc) {
List&Integer& datas = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)
sc.parallelize(datas)
.groupBy(new Function&Integer, Object&() {
public Object call(Integer v1) throws Exception {
return (v1 % 2 == 0) ? "偶数" : "奇数"
.collect()
.forEach(System.out::println)
List&String& datas2 = Arrays.asList("dog", "tiger", "lion", "cat", "spider", "eagle")
sc.parallelize(datas2)
.keyBy(v1 -& v1.length())
.groupByKey()
.collect()
.forEach(System.out::println)
(奇数,[1, 3, 5, 7, 9])
(偶数,[2, 4, 6, 8])
(4,[lion])
(6,[spider])
(3,[dog, cat])
(5,[tiger, eagle])
2.10.3 Scala示例
def groupBy(sc: SparkContext): Unit = {
sc.parallelize(1 to 9, 3)
.groupBy(x =& {
if (x % 2 == 0) "偶数"
else "奇数"
.collect()
.foreach(println)
val datas2 = Array("dog", "tiger", "lion", "cat", "spider", "eagle")
sc.parallelize(datas2)
.keyBy(_.length)
.groupByKey()
.collect()
.foreach(println)
2.11 reduceByKey
2.11.1 概述
语法(scala):
def reduceByKey(partitioner: Partitioner, func: (V, V) =& V): RDD[(K, V)]
def reduceByKey(func: (V, V) =& V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(func: (V, V) =& V): RDD[(K, V)]
对&key, value&结构的RDD进行聚合,对具有相同key的value调用func来进行reduce操作,func的类型必须是(V, V) =& V
2.11.2 Java示例
static void reduceByKey(JavaSparkContext sc) {
JavaRDD&String& lines = sc.textFile("file:///Users/zhangws/opt/spark-2.0.1-bin-hadoop2.6/README.md");
JavaRDD&String& wordsRDD = lines.flatMap(new FlatMapFunction&String, String&() {
private static final long serialVersionUID = 1L;
public Iterator&String& call(String line) throws Exception {
List&String& words = Arrays.asList(line.split(" "));
return words.iterator();
JavaPairRDD&String, Integer& wordsCount = wordsRDD.mapToPair(new PairFunction&String, String, Integer&() {
private static final long serialVersionUID = 1L;
public Tuple2&String, Integer& call(String word) throws Exception {
return new Tuple2&String, Integer&(word, 1);
JavaPairRDD&String, Integer& resultRDD = wordsCount.reduceByKey(new Function2&Integer, Integer, Integer&() {
private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
resultRDD.foreach(new VoidFunction&Tuple2&String, Integer&&() {
private static final long serialVersionUID = 1L;
public void call(Tuple2&String, Integer& t) throws Exception {
System.out.println(t._1 + "\t" + t._2());
sc.close();
2.11.3 Scala示例
val textFile = sc.textFile("file:///home/zkpk/spark-2.0.1/README.md")
val words = textFile.flatMap(line =& line.split(" "))
val wordPairs = words.map(word =& (word, 1))
val wordCounts = wordPairs.reduceByKey((a, b) =& a + b)
println("wordCounts: ")
wordCounts.collect().foreach(println)
2.12 aggregateByKey
2.12.1 概述
语法(java):
& JavaPairRDD& aggregateByKey(U zeroValue,
Partitioner partitioner,
Function2& seqFunc,
Function2& combFunc)
& JavaPairRDD& aggregateByKey(U zeroValue,
int numPartitions,
Function2& seqFunc,
Function2& combFunc)
& JavaPairRDD& aggregateByKey(U zeroValue,
Function2& seqFunc,
Function2& combFunc)
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值得类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接返回非RDD的结果。
zeroValue:表示在每个分区中第一次拿到key值时,用于创建一个返回类型的函数,这个函数最终会被包装成先生成一个返回类型,然后通过调用seqOp函数,把第一个key对应的value添加到这个类型U的变量中。
seqOp:这个用于把迭代分区中key对应的值添加到zeroValue创建的U类型实例中。
combOp:这个用于合并每个分区中聚合过来的两个U类型的值。
2.12.2 Java示例
static void aggregateByKey(JavaSparkContext sc) {
List&Tuple2&Integer, Integer&& datas = new ArrayList&&();
datas.add(new Tuple2&&(1, 3));
datas.add(new Tuple2&&(1, 2));
datas.add(new Tuple2&&(1, 4));
datas.add(new Tuple2&&(2, 3));
sc.parallelizePairs(datas, 2)
.aggregateByKey(
new Function2&Integer, Integer, Integer&() {
public Integer call(Integer v1, Integer v2) throws Exception {
System.out.println("seq: " + v1 + "\t" + v2);
return Math.max(v1, v2);
new Function2&Integer, Integer, Integer&() {
public Integer call(Integer v1, Integer v2) throws Exception {
System.out.println("comb: " + v1 + "\t" + v2);
return v1 + v2;
.collect()
.forEach(System.out);
2.12.3 Scala示例
def aggregateByKey(sc: SparkContext): Unit = {
// 合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
def seq(a:Int, b:Int): Int = {
println("seq: " + a + "\t" + b)
math.max(a, b)
// 合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型
def comb(a:Int, b:Int): Int = {
println("comb: " + a + "\t" + b)
// 数据拆分成两个分区
// 分区一数据: (1,3) (1,2)
// 分区二数据: (1,4) (2,3)
// zeroValue 中立值,定义返回value的类型,并参与运算
// seqOp 用来在一个partition中合并值的
// 分区一相同key的数据进行合并
(1,3)开始和中位值合并为3
(1,2)再次合并为3
// 分区二相同key的数据进行合并
(1,4)开始和中位值合并为4
(2,3)开始和中位值合并为3
// comb 用来在不同partition中合并值的
// 将两个分区的结果进行合并
// key为1的, 两个分区都有, 合并为(1,7)
// key为2的, 只有一个分区有, 不需要合并(2,3)
sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3)), 2)
.aggregateByKey(0)(seq, comb)
.collect()
.foreach(println)
2.13 sortByKey
2.13.1 概述
语法(java):
JavaRDD& sortBy(Function& f,
boolean ascending,
int numPartitions)
JavaPairRDD& sortByKey()
JavaPairRDD& sortByKey(boolean ascending)
JavaPairRDD& sortByKey(boolean ascending,
int numPartitions)
JavaPairRDD& sortByKey(parator& comp)
JavaPairRDD& sortByKey(parator& comp,
boolean ascending)
JavaPairRDD& sortByKey(parator& comp,
boolean ascending,
int numPartitions)
对&key, value&结构的RDD进行升序或降序排列
comp:排序时的比较运算方式。
ascending:false降序;true升序。
2.13.2 Java示例
static void sortByKey(JavaSparkContext sc) {
List&Integer& datas = Arrays.asList(60, 70, 80, 55, 45, 75);
sc.parallelize(datas)
.sortBy(new Function&Integer, Object&() {
public Object call(Integer v1) throws Exception {
return v1;
}, true, 1)
.foreach(v -& System.out.println(v));
sc.parallelize(datas)
.sortBy((Integer v1) -& v1, false, 1)
.foreach(v -& System.out.println(v));
List&Tuple2&Integer, Integer&& datas2 = new ArrayList&&();
datas2.add(new Tuple2&&(3, 3));
datas2.add(new Tuple2&&(2, 2));
datas2.add(new Tuple2&&(1, 4));
datas2.add(new Tuple2&&(2, 3));
sc.parallelizePairs(datas2)
.sortByKey(false)
.foreach(v -& System.out.println(v));
2.13.3 Scala示例
def sortByKey(sc: SparkContext) : Unit = {
sc.parallelize(Array(60, 70, 80, 55, 45, 75))
.sortBy(v =& v, false)
.foreach(println)
sc.parallelize(List((3, 3), (2, 2), (1, 4), (2, 3)))
.sortByKey(true)
.foreach(println)
2.14.1 概述
语法(java):
JavaPairRDD&K,scala.Tuple2&V,W&& join(JavaPairRDD&K,W& other)
JavaPairRDD&K,scala.Tuple2&V,W&& join(
JavaPairRDD&K,W& other,
int numPartitions)
JavaPairRDD&K,scala.Tuple2&V,W&& join(
JavaPairRDD&K,W& other,
Partitioner partitioner)
对&K, V&和&K, W&进行join操作,返回(K, (V, W))外连接函数为leftOuterJoin、rightOuterJoin和fullOuterJoin
2.14.2 Java示例
static void join(JavaSparkContext sc) {
List&Tuple2&Integer, String&& products = new ArrayList&&();
products.add(new Tuple2&&(1, "苹果"));
products.add(new Tuple2&&(2, "梨"));
products.add(new Tuple2&&(3, "香蕉"));
products.add(new Tuple2&&(4, "石榴"));
List&Tuple2&Integer, Integer&& counts = new ArrayList&&();
counts.add(new Tuple2&&(1, 7));
counts.add(new Tuple2&&(2, 3));
counts.add(new Tuple2&&(3, 8));
counts.add(new Tuple2&&(4, 3));
counts.add(new Tuple2&&(5, 9));
JavaPairRDD&Integer, String& productsRDD = sc.parallelizePairs(products);
JavaPairRDD&Integer, Integer& countsRDD = sc.parallelizePairs(counts);
productsRDD.join(countsRDD)
.foreach(v -& System.out.println(v));
(4,(石榴,3))
(1,(苹果,7))
(3,(香蕉,8))
(2,(梨,3))
2.14.3 Scala示例
sc.parallelize(List((1, "苹果"), (2, "梨"), (3, "香蕉"), (4, "石榴")))
.join(sc.parallelize(List((1, 7), (2, 3), (3, 8), (4, 3), (5, 9))))
.foreach(println)
2.15 cogroup
2.15.1 概述
语法(java):
JavaPairRDD&Iterable&V&,Iterable&&& cogroup(JavaPairRDD& other,
Partitioner partitioner)
JavaPairRDD&Iterable&V&,Iterable&,Iterable&&& cogroup(JavaPairRDD& other1,
JavaPairRDD& other2,
Partitioner partitioner)
JavaPairRDD&Iterable&V&,Iterable&,Iterable&,Iterable&&& cogroup(JavaPairRDD& other1,
JavaPairRDD& other2,
JavaPairRDD& other3,
Partitioner partitioner)
JavaPairRDD&Iterable&V&,Iterable&&& cogroup(JavaPairRDD& other)
JavaPairRDD&Iterable&V&,Iterable&,Iterable&&& cogroup(JavaPairRDD& other1,
JavaPairRDD& other2)
JavaPairRDD&Iterable&V&,Iterable&,Iterable&,Iterable&&& cogroup(JavaPairRDD& other1,
JavaPairRDD& other2,
JavaPairRDD& other3)
JavaPairRDD&Iterable&V&,Iterable&&& cogroup(JavaPairRDD& other,
int numPartitions)
JavaPairRDD&Iterable&V&,Iterable&,Iterable&&& cogroup(JavaPairRDD& other1,
JavaPairRDD& other2,
int numPartitions)
JavaPairRDD&Iterable&V&,Iterable&,Iterable&,Iterable&&& cogroup(JavaPairRDD& other1,
JavaPairRDD& other2,
JavaPairRDD& other3,
int numPartitions)
cogroup:对多个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
2.15.2 Java示例
static void cogroup(JavaSparkContext sc) {
List&Tuple2&Integer, String&& datas1 = new ArrayList&&();
datas1.add(new Tuple2&&(1, "苹果"));
datas1.add(new Tuple2&&(2, "梨"));
datas1.add(new Tuple2&&(3, "香蕉"));
datas1.add(new Tuple2&&(4, "石榴"));
List&Tuple2&Integer, Integer&& datas2 = new ArrayList&&();
datas2.add(new Tuple2&&(1, 7));
datas2.add(new Tuple2&&(2, 3));
datas2.add(new Tuple2&&(3, 8));
datas2.add(new Tuple2&&(4, 3));
List&Tuple2&Integer, String&& datas3 = new ArrayList&&();
datas3.add(new Tuple2&&(1, "7"));
datas3.add(new Tuple2&&(2, "3"));
datas3.add(new Tuple2&&(3, "8"));
datas3.add(new Tuple2&&(4, "3"));
datas3.add(new Tuple2&&(4, "4"));
datas3.add(new Tuple2&&(4, "5"));
datas3.add(new Tuple2&&(4, "6"));
sc.parallelizePairs(datas1)
.cogroup(sc.parallelizePairs(datas2),
sc.parallelizePairs(datas3))
.foreach(v -& System.out.println(v));
(4,([石榴],[3],[3, 4, 5, 6]))
(1,([苹果],[7],[7]))
(3,([香蕉],[8],[8]))
(2,([梨],[3],[3]))
2.15.3 Scala示例
def cogroup(sc: SparkContext): Unit = {
val datas1 = List((1, "苹果"),
(2, "梨"),
(3, "香蕉"),
(4, "石榴"))
val datas2 = List((1, 7),
val datas3 = List((1, "7"),
sc.parallelize(datas1)
.cogroup(sc.parallelize(datas2),
sc.parallelize(datas3))
.foreach(println)
(4,(CompactBuffer(石榴),CompactBuffer(3),CompactBuffer(3, 4, 5, 6)))
(1,(CompactBuffer(苹果),CompactBuffer(7),CompactBuffer(7)))
(3,(CompactBuffer(香蕉),CompactBuffer(8),CompactBuffer(8)))
(2,(CompactBuffer(梨),CompactBuffer(3),CompactBuffer(3)))
2.16 cartesian
2.16.1 概述
语法(java):
static & JavaPairRDD& cartesian(JavaRDDLike& other)
两个RDD进行笛卡尔积合并
2.16.2 Java示例
static void cartesian(JavaSparkContext sc) {
List&String& names = Arrays.asList("张三", "李四", "王五");
List&Integer& scores = Arrays.asList(60, 70, 80);
JavaRDD&String& namesRDD = sc.parallelize(names);
JavaRDD&Integer& scoreRDD = sc.parallelize(scores);
JavaPairRDD&String, Integer& cartesianRDD = namesRDD.cartesian(scoreRDD);
cartesianRDD.foreach(new VoidFunction&Tuple2&String, Integer&&() {
private static final long serialVersionUID = 1L;
public void call(Tuple2&String, Integer& t) throws Exception {
System.out.println(t._1 + "\t" + t._2());
2.16.3 Scala示例
namesRDD.cartesian(scoreRDD)
.foreach(println)
2.17.1 概述
语法(java):
JavaRDD&String& pipe(String command)
JavaRDD&String& pipe(java.util.List&String& command)
JavaRDD&String& pipe(java.util.List&String& command,
java.util.Map&String,String& env)
JavaRDD&String& pipe(java.util.List&String& command,
java.util.Map&String,String& env,
boolean separateWorkingDir,
int bufferSize)
static JavaRDD&String& pipe(java.util.List&String& command,
java.util.Map&String,String& env,
boolean separateWorkingDir,
int bufferSize,
String encoding)
执行cmd命令,创建RDD
2.17.2 Java示例
static void pipe(JavaSparkContext sc) {
List&String& datas = Arrays.asList("hi", "hello", "how", "are", "you")
sc.parallelize(datas)
.pipe("/Users/zhangws/echo.sh")
.collect()
.forEach(System.out::println)
2.17.3 Scala示例
echo.sh内容
#!/bin/bash
echo "Running shell script"
while read LINE; do
RESULT=${RESULT}" "${LINE}
echo ${RESULT} & /Users/zhangws/out123.txt
def pipe(sc: SparkContext): Unit = {
val data = List("hi", "hello", "how", "are", "you")
sc.makeRDD(data)
.pipe("/Users/zhangws/echo.sh")
.collect()
.foreach(println)
# out123.txt
hi hello how are you
Running shell script
2.18 coalesce
2.18.1 概述
语法(java):
JavaRDD&T& coalesce(int numPartitions)
JavaRDD&T& coalesce(int numPartitions,
boolean shuffle)
JavaPairRDD&K,V& coalesce(int numPartitions)
JavaPairRDD&K,V& coalesce(int numPartitions,
boolean shuffle)
用于将RDD进行重分区,使用HashPartitioner。且该RDD的分区个数等于numPartitions个数。如果shuffle设置为true,则会进行shuffle。
2.18.2 Java示例
static void coalesce(JavaSparkContext sc) {
List&String& datas = Arrays.asList("hi", "hello", "how", "are", "you")
JavaRDD&String& datasRDD = sc.parallelize(datas, 4)
System.out.println("RDD的分区数: " + datasRDD.partitions().size())
JavaRDD&String& datasRDD2 = datasRDD.coalesce(2)
System.out.println("RDD的分区数: " + datasRDD2.partitions().size())
RDD的分区数: 4
RDD的分区数: 2
2.18.3 Scala示例
def coalesce(sc: SparkContext): Unit = {
val datas = List("hi", "hello", "how", "are", "you")
val datasRDD = sc.parallelize(datas, 4)
println("RDD的分区数: " + datasRDD.partitions.length)
val datasRDD2 = datasRDD.coalesce(2)
println("RDD的分区数: " + datasRDD2.partitions.length)
2.19 repartition
2.19.1 概述
语法(java):
JavaRDD&T& repartition(int numPartitions)
JavaPairRDD&K,V& repartition(int numPartitions)
该函数其实就是coalesce函数第二个参数为true的实现
2.20 repartitionAndSortWithinPartitions
2.20.1 概述
语法(java):
JavaPairRDD& repartitionAndSortWithinPartitions(Partitioner partitioner)
JavaPairRDD& repartitionAndSortWithinPartitions(Partitioner partitioner,
parator& comp)
根据给定的Partitioner重新分区,并且每个分区内根据comp实现排序。
2.20.2 Java示例
static void repartitionAndSortWithinPartitions(JavaSparkContext sc) {
List&String& datas = new ArrayList&&();
Random random = new Random(1);
for (int i = 0; i & 10; i++) {
for (int j = 0; j & 100; j++) {
datas.add(String.format("product%02d,url%03d", random.nextInt(10), random.nextInt(100)));
JavaRDD&String& datasRDD = sc.parallelize(datas);
JavaPairRDD&String, String& pairRDD = datasRDD.mapToPair((String v) -& {
String[] values = v.split(",");
return new Tuple2&&(values[0], values[1]);
JavaPairRDD&String, String& partSortRDD = pairRDD.repartitionAndSortWithinPartitions(
new Partitioner() {
public int numPartitions() {
return 10;
public int getPartition(Object key) {
return Integer.valueOf(((String) key).substring(7));
partSortRDD.collect()
.forEach(System.out::println);
(product00,url099)
(product00,url006)
(product00,url088)
(product09,url004)
(product09,url021)
(product09,url036)
2.20.3 Scala示例
def repartitionAndSortWithinPartitions(sc: SparkContext): Unit = {
def partitionFunc(key:String): Int = {
key.substring(7).toInt
val datas = new Array[String](1000)
val random = new Random(1)
for (i &- 0 until 10; j &- 0 until 100) {
val index: Int = i * 100 + j
datas(index) = "product" + random.nextInt(10) + ",url" + random.nextInt(100)
val datasRDD = sc.parallelize(datas)
val pairRDD = datasRDD.map(line =& (line, 1))
.reduceByKey((a, b) =& a + b)
.foreach(println)
pairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
override def numPartitions: Int = 10
override def getPartition(key: Any): Int = {
val str = String.valueOf(key)
str.substring(7, str.indexOf(',')).toInt
}).foreach(println)
3.1 reduce
3.1.1 概述
语法(java):
static T reduce(Function2&T,T,T& f)
对RDD成员使用func进行reduce操作,func接受两个参数,合并之后只返回一个值。reduce操作的返回结果只有一个值。需要注意的是,func会并发执行
3.1.2 Scala示例
def reduce(sc: SparkContext): Unit = {
println(sc.parallelize(1 to 10)
.reduce((x, y) =& x + y))
3.2 collect
3.2.1 概述
语法(java):
static java.util.List&T& collect()
将RDD读取至Driver程序,类型是Array,一般要求RDD不要太大。
3.3.1 概述
语法(java):
static long count()
返回RDD的成员数量
3.3.2 Scala示例
def count(sc: SparkContext): Unit = {
println(sc.parallelize(1 to 10)
3.4.1 概述
语法(java):
static T first()
返回RDD的第一个成员,等价于take(1)
3.4.2 Scala示例
def first(sc: SparkContext): Unit = {
println(sc.parallelize(1 to 10)
3.5.1 概述
语法(java):
static java.util.List&T& take(int num)
返回RDD前n个成员
3.5.2 Scala示例
def take(sc: SparkContext): Unit = {
sc.parallelize(1 to 10)
.take(2).foreach(println)
3.6 takeSample
3.6.1 概述
语法(java):
static java.util.List&T& takeSample(boolean withReplacement,
long seed)
和sample用法相同,只不第二个参数换成了个数。返回也不是RDD,而是collect。
3.6.2 Scala示例
def takeSample(sc: SparkContext): Unit = {
sc.parallelize(1 to 10)
.takeSample(withReplacement = false, 3, 1)
.foreach(println)
3.7 takeOrdered
3.7.1 概述
语法(java):
java.util.List&T& takeOrdered(int num)
java.util.List&T& takeOrdered(int num,
java.util.Comparator&T& comp)
用于从RDD中,按照默认(升序)或指定排序规则,返回前num个元素。
3.7.2 Scala示例
def takeOrdered(sc: SparkContext): Unit = {
sc.parallelize(Array(5,6,2,1,7,8))
.takeOrdered(3)(new Ordering[Int](){
override def compare(x: Int, y: Int): Int = y.compareTo(x)
.foreach(println)
3.8 saveAsTextFile
3.8.1 概述
语法(java):
void saveAsTextFile(String path)
void saveAsTextFile(String path,
Class&? extends org.apache.hadoop.io.compress.CompressionCodec& codec)
将RDD转换为文本内容并保存至路径path下,可能有多个文件(和partition数有关)。路径path可以是本地路径或HDFS地址,转换方法是对RDD成员调用toString函数
3.8.2 Scala示例
def saveAsTextFile(sc: SparkContext): Unit = {
sc.parallelize(Array(5,6,2,1,7,8))
.saveAsTextFile("/Users/zhangws/Documents/test")
/Users/zhangws/Documents/test目录下
part-00000
// part-00000文件内容
3.9 saveAsSequenceFile
3.9.1 概述
语法(java):
def saveAsSequenceFile(path: String, codec: Option[Class[_ &: CompressionCodec]] = None): Unit
与saveAsTextFile类似,但以SequenceFile格式保存,成员类型必须实现Writeable接口或可以被隐式转换为Writable类型(比如基本Scala类型Int、String等)
3.10 saveAsObjectFile
3.10.1 概述
语法(java):
static void saveAsObjectFile(String path)
用于将RDD中的元素序列化成对象,存储到文件中。对于HDFS,默认采用SequenceFile保存。
3.11 countByKey
3.11.1 概述
语法(java):
java.util.Map&K,Long& countByKey()
仅适用于(K, V)类型,对key计数,返回(K, Int)
3.11.2 Scala示例
def reduce(sc: SparkContext): Unit = {
println(sc.parallelize(Array(("A", 1), ("B", 6), ("A", 2), ("C", 1), ("A", 7), ("A", 8)))
.countByKey())
Map(B -& 1, A -& 4, C -& 1)
3.12 foreach
3.12.1 概述
语法(java):
static void foreach(VoidFunction&T& f)
对RDD中的每个成员执行func,没有返回值,常用于更新计数器或输出数据至外部存储系统。这里需要注意变量的作用域
3.12.2 Java示例
forEach(System.out::println)
forEach(v -& System.out.println(v))
3.12.3 Scala示例
foreach(println)
本文已收录于以下专栏:
相关文章推荐
一、向量范数 
令x为向量:( x1,x2,…,xn)T 
常用向量范数有3种
  1-范数:║x║1=│x1│+│x2│+…+│xn│ 
  2-范数:║x║2=(│x1│^2+│x2│^2+…+│...
原文出处:    http://blog.csdn.net/zouxy09/article/details/
今天我们聊聊机器学习中出现的非常频繁的问题:过拟合与规则...
package cn.spark.study.coreimport org.apache.spark.SparkConf
import org.apache.spark.SparkContextob...
spark rdd详解及开发优化
今天客户出现了一个问题:一个字段存在数据库中时,显示字段过长。经过查看log,发现数据库中,该字段的类型是varchar2(30Byte),修改成 varchar2(30Char)时,问题解决了。  ...
在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置...
spark算子大致上可分三大类算子:
  1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。
  2、Key-Valu...
JDK本身提供了很多方便的JVM性能调优监控工具,除了集成式的VisualVM和jConsole外,还有jps、jstack、jmap、jhat、jstat等小巧的工具,本博客希望能起抛砖引玉之用,让...
前言  这段时间在研究android平台上的开源项目——StandupTimer,这是由jwood所设计的一个较为简单android应用,用于控制会议时间,类似秒表倒计时。PreferenceActi...
Spark的算子的分类
   从大方向来说,Spark 算子大致可以分为以下两类:
     1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。
他的最新文章
讲师:宋宝华
讲师:何宇健
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)}

我要回帖

更多关于 sc.parallelize作用 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信