当前位置:首页 - Spark

pyspark读取hbase数据并由rdd转为dataframe

作者:高景洋 日期:2020-12-04 13:13:00 浏览次数:2290

1、Hbase中数据列并不是统一的

2、如果在列不统一的情况下,将数据通过spark从hbase读出后,直接转dataframe会报错

3、操作dataframe的方便性比操作rdd好很多,因此我们需要想办法,把字段不统一的rdd转换为dataframe

具体逻辑请看以下代码,亲测可用:



from pyspark import SparkContext,SparkConf

from pyspark.sql import SparkSession

import json

def deal_missing_dec(no_row_key_colnames):

    def deal_missing(x):

        result = {}

        for i in no_row_key_colnames:

            if i in x[1].keys():

                result[i]=x[1][i]

            else:

                result[i]='missing'

        return (x[0],result)

    return deal_missing

def deal_row(x):

    return [x[0]]+list(x[1].values())

def call_transfor(y1):

    y2 = [json.loads(i) for i in y1]

    fdc={}

    for i in y2:

        colname = i['qualifier']

        value = i['value']

        fdc[colname] = value

    return fdc

def rdd_to_df(hbase_rdd,sc):

    data_split = hbase_rdd.map(lambda x:(x[0],x[1].split('\n')))

    data_cols = data_split.map(lambda x:(x[0],call_transfor(x[1])))

    no_row_key_colnames = data_cols.map(lambda x:[i for i in x[1]]).take(2)[1]

    deal_missing = deal_missing_dec(no_row_key_colnames)

    no_missing = data_cols.map(deal_missing)

    spark = SparkSession(sc) # 没有实例化SparkSession 会报 AttributeError: 'PipelinedRDD' object has no attribute 'toDF'

    # 【】toDF方法是在SparkSession(SQLContext1.x中的构造函数)构造函数内部执行的猴子补丁,因此要使用它,必须首先创建一个SQLContext(或SparkSession)

    data = no_missing.map(deal_row).toDF(['row_key']+no_row_key_colnames)

    return data

if __name__ == '__main__':

    conf = SparkConf()

    sc = SparkContext(conf=conf)

    host = 'zk_host1:port,zk_host2:port,zk_host3:port'

    table = 'Collect:ProductIndex'

    conf = {"hbase.zookeeper.quorum": host,

            "hbase.mapreduce.inputtable": table,

            "hbase.mapreduce.scan.row.start": '005_',

            "hbase.mapreduce.scan.row.stop": '006_',

            #【】如果通过pyspark读hbase时,需要指定列,那就不要指定列簇参数,在指定列簇参数后,会忽略列参数,扫描全部列

            #"hbase.mapreduce.scan.column.family":"P",

            "hbase.mapreduce.scan.columns": "P:Url P:WebsiteID P:IsDeleted P:JobHistory"

            }

    keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

    valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

    hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",

                                   "org.apache.hadoop.hbase.io.ImmutableBytesWritable",

                                   "org.apache.hadoop.hbase.client.Result", keyConverter=keyConv,

                                   valueConverter=valueConv, conf=conf)

    count = hbase_rdd.count()

    hbase_rdd.cache()

    output = hbase_rdd.collect()

    fdc = rdd_to_df(hbase_rdd,sc)

    fdc.show()

    # fdc.select('WebsiteID','Url').show()

    # fdc.groupby('WebsiteID').count().show()

    print(count)

    #for (k, v) in output:

    #    print(k, v)


本文永久性链接:
<a href="http://r4.com.cn/art166.aspx">pyspark读取hbase数据并由rdd转为dataframe</a>
当前header:Host: r4.com.cn X-Host1: r4.com.cn X-Host2: r4.com.cn X-Host3: 127.0.0.1:8080 X-Forwarded-For: 18.222.111.60 X-Real-Ip: 18.222.111.60 X-Domain: r4.com.cn X-Request: GET /art166.aspx HTTP/1.1 X-Request-Uri: /art166.aspx Connection: close Accept: */* User-Agent: Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; ClaudeBot/1.0; +claudebot@anthropic.com) Accept-Encoding: gzip, br, zstd, deflate