pyspark读取elasticsearch

wenwentana 2020-07-17

代码:

import json
from pyspark.sql import SparkSession
from pyspark import SparkConf


def trans_form(data_tuple):
    """
    对从es读取出来的每一条数据进行格式转换
    :param data_tuple:
    :return:
    """
    data = data_tuple[1]
    return data


def get_es_conf(es_hot, es_port, index, type_, query_dic):
    query = {"query": {"match_all": {}}}
    if isinstance(query_dic, dict):
        query = json.dumps(query_dic)
    else:
        query = json.dumps(query)

    es_read_conf = {
        "es.nodes": es_hot,
        "es.port": es_port,  # 必须是字符串类型
        "es.resource": ‘{}/{}‘.format(index, type_),
        "es.out.json": "yes",
        "es.query": query
    }
    return es_read_conf


def read_data_from_es(sc, es_hot, es_port, index, type_, query_dic):
    es_read_conf = get_es_conf(es_hot, es_port, index, type_, query_dic)
    es_rdd = sc.newAPIHadoopRDD(
        inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_read_conf
    )
    return es_rdd


if __name__ == ‘__main__‘:
    conf = SparkConf()
    spark = SparkSession().builder.config(conf).appName(‘test‘).getOrCreate()
    sc = spark.SparkContext

    es_host = ‘127.0.0.1‘
    es_port = ‘9200‘
    index = ‘test‘
    type_name = ‘result‘
    query = {"query": {"match_all": {}}}
    es_rdd = read_data_from_es(sc, es_host, es_port, index, type_name, query)

    # 读取出来的是_id和数据组成的元组,转换格式之后过滤空值就是我们要的数据
    hdd = es_rdd.map(lambda x: trans_form(x)).filter(lambda x: x)

相关推荐