大数据项目数据初始化到ElasticSearch
数据初始化到ElasticSearch
1 启动ElasticSearch服务器(略)
2 将数据写入ElasticSearch
与上节类似,同样主要通过Spark SQL提供的write方法进行数据的分布式插入,实现storeDataInES方法:
def storeDataInES(movieDF:DataFrame)(implicit eSConfig: ESConfig): Unit = {
//新建一个配置
val settings:Settings = Settings.builder() .put("cluster.name",eSConfig.clustername).build()
//新建一个ES的客户端
val esClient = new PreBuiltTransportClient(settings)
//需要将TransportHosts添加到esClient中
val REGEX_HOST_PORT = "(.+):(\\d+)".reSConfig.transportHosts.split(",").foreach{
case REGEX_HOST_PORT(host:String,port:String) => {
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host),port.toInt))
}
}
//需要清除掉ES中遗留的数据
if(esClient.admin().indices().exists(new IndicesExistsRequest(eSConfig.index)).actionGet().isExists){ esClient.admin().indices().delete(new DeleteIndexRequest(eSConfig.index))
}
esClient.admin().indices().create(new CreateIndexRequest(eSConfig.index))
//将数据写入到ES中
movieDF
.write
.option("es.nodes",eSConfig.httpHosts)
.option("es.http.timeout","100m")
.option("es.mapping.id","mid")
.mode("overwrite")
.format("org.elasticsearch.spark.sql")
.save(eSConfig.index+"/"+ES_MOVIE_INDEX)
}
想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。