pyspark读取hbase数据并由rdd转为dataframe
1、Hbase中数据列并不是统一的
2、如果在列不统一的情况下,将数据通过spark从hbase读出后,直接转dataframe会报错
3、操作dataframe的方便性比操作rdd好很多,因此我们需要想办法,把字段不统一的rdd转换为dataframe
具体逻辑请看以下代码,亲测可用:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import json
def deal_missing_dec(no_row_key_colnames):
def deal_missing(x):
result = {}
for i in no_row_key_colnames:
if i in x[1].keys():
result[i]=x[1][i]
else:
result[i]='missing'
return (x[0],result)
return deal_missing
def deal_row(x):
return [x[0]]+list(x[1].values())
def call_transfor(y1):
y2 = [json.loads(i) for i in y1]
fdc={}
for i in y2:
colname = i['qualifier']
value = i['value']
fdc[colname] = value
return fdc
def rdd_to_df(hbase_rdd,sc):
data_split = hbase_rdd.map(lambda x:(x[0],x[1].split('\n')))
data_cols = data_split.map(lambda x:(x[0],call_transfor(x[1])))
no_row_key_colnames = data_cols.map(lambda x:[i for i in x[1]]).take(2)[1]
deal_missing = deal_missing_dec(no_row_key_colnames)
no_missing = data_cols.map(deal_missing)
spark = SparkSession(sc) # 没有实例化SparkSession 会报 AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
# 【坑】toDF方法是在SparkSession(SQLContext1.x中的构造函数)构造函数内部执行的猴子补丁,因此要使用它,必须首先创建一个SQLContext(或SparkSession)
data = no_missing.map(deal_row).toDF(['row_key']+no_row_key_colnames)
return data
if __name__ == '__main__':
conf = SparkConf()
sc = SparkContext(conf=conf)
host = 'zk_host1:port,zk_host2:port,zk_host3:port'
table = 'Collect:ProductIndex'
conf = {"hbase.zookeeper.quorum": host,
"hbase.mapreduce.inputtable": table,
"hbase.mapreduce.scan.row.start": '005_',
"hbase.mapreduce.scan.row.stop": '006_',
#【坑】如果通过pyspark读hbase时,需要指定列,那就不要指定列簇参数,在指定列簇参数后,会忽略列参数,扫描全部列
#"hbase.mapreduce.scan.column.family":"P",
"hbase.mapreduce.scan.columns": "P:Url P:WebsiteID P:IsDeleted P:JobHistory"
}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result", keyConverter=keyConv,
valueConverter=valueConv, conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
fdc = rdd_to_df(hbase_rdd,sc)
fdc.show()
# fdc.select('WebsiteID','Url').show()
# fdc.groupby('WebsiteID').count().show()
print(count)
#for (k, v) in output:
# print(k, v)
<a href="http://r4.com.cn/art166.aspx">pyspark读取hbase数据并由rdd转为dataframe</a>