wenwentana 2020-07-17
代码:
import json
from pyspark.sql import SparkSession
from pyspark import SparkConf
def trans_form(data_tuple):
"""
对从es读取出来的每一条数据进行格式转换
:param data_tuple:
:return:
"""
data = data_tuple[1]
return data
def get_es_conf(es_hot, es_port, index, type_, query_dic):
query = {"query": {"match_all": {}}}
if isinstance(query_dic, dict):
query = json.dumps(query_dic)
else:
query = json.dumps(query)
es_read_conf = {
"es.nodes": es_hot,
"es.port": es_port, # 必须是字符串类型
"es.resource": ‘{}/{}‘.format(index, type_),
"es.out.json": "yes",
"es.query": query
}
return es_read_conf
def read_data_from_es(sc, es_hot, es_port, index, type_, query_dic):
es_read_conf = get_es_conf(es_hot, es_port, index, type_, query_dic)
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf
)
return es_rdd
if __name__ == ‘__main__‘:
conf = SparkConf()
spark = SparkSession().builder.config(conf).appName(‘test‘).getOrCreate()
sc = spark.SparkContext
es_host = ‘127.0.0.1‘
es_port = ‘9200‘
index = ‘test‘
type_name = ‘result‘
query = {"query": {"match_all": {}}}
es_rdd = read_data_from_es(sc, es_host, es_port, index, type_name, query)
# 读取出来的是_id和数据组成的元组,转换格式之后过滤空值就是我们要的数据
hdd = es_rdd.map(lambda x: trans_form(x)).filter(lambda x: x) 另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。