RDD中的函数传递_大数据培训

RDD中的函数传递

在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。下面我们看几个例子:

1 传递一个方法

1.创建一个类

class Search(query:String){

 

//过滤出包含字符串的数据

  def isMatch(s: String): Boolean = {

    s.contains(query)

  }

 

//过滤出包含字符串的RDD

  def getMatch1 (rdd: RDD[String]): RDD[String] = {

    rdd.filter(isMatch)

  }

 

  //过滤出包含字符串的RDD

  def getMatche2(rdd: RDD[String]): RDD[String] = {

    rdd.filter(x => x.contains(query))

  }

 

}

2.创建Spark主程序

object SeriTest {

 

  def main(args: Array[String]): Unit = {

 

    //1.初始化配置信息及SparkContext

    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

 

//2.创建一个RDD

    val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))

 

//3.创建一个Search对象

    val search = new Search()

 

//4.运用第一个过滤函数并打印结果

    val match1: RDD[String] = search.getMatche1(rdd)

    match1.collect().foreach(println)

    }

}

3.运行程序

大数据培训

4.问题说明

//过滤出包含字符串的RDD

  def getMatch1 (rdd: RDD[String]): RDD[String] = {

    rdd.filter(isMatch)

  }

在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。

5.解决方案

使类继承scala.Serializable即可。

class Search() extends Serializable{...}

2 传递一个属性

1.创建Spark主程序

object TransmitTest {

 

  def main(args: Array[String]): Unit = {

 

    //1.初始化配置信息及SparkContext

    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

 

//2.创建一个RDD

    val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))

 

//3.创建一个Search对象

    val search = new Search()

 

//4.运用第一个过滤函数并打印结果

    val match1: RDD[String] = search.getMatche2(rdd)

    match1.collect().foreach(println)

    }

}

2.运行程序

大数据培训

3.问题说明

  //过滤出包含字符串的RDD

  def getMatche2(rdd: RDD[String]): RDD[String] = {

    rdd.filter(x => x.contains(query))

  }

在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this. query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。

4.解决方案

1)使类继承scala.Serializable即可。

class Search() extends Serializable{...}

2)将类变量query赋值给局部变量

修改getMatche2为

  //过滤出包含字符串的RDD

  def getMatche2(rdd: RDD[String]): RDD[String] = {

    val query_ : String = this.query//将类变量赋值给局部变量

    rdd.filter(x => x.contains(query_))

  }

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。