Spark大数据分析——pyspark(二)

时间:2023-02-01 08:10:41
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
>>> rdd=sc.parallelize({('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)})
>>> rdd.collect()
[('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)]
>>> 
>>> rdd.mapValues(lambda x: (x, 1)).collect()
[('panda', (1, 1)), ('pink', (3, 1)), ('pirate', (3, 1)), ('panda', (0, 1)), ('pink', (4, 1))]
>>> nums=rdd
>>> nums.collect()
[('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)]
>>>
>>> sumCount = nums.combineByKey((lambda x: (x,1)),(lambda x, y: (x[0] + y, x[1] + 1)),(lambda x, y: (x[0] + y[0], x[1] + y[1])))
>>>
>>> nums.mapValues(lambda x: (x,1)).collect()  #比较巧妙的用法
[('panda', (1, 1)), ('pink', (3, 1)), ('pirate', (3, 1)), ('panda', (0, 1)), ('pink', (4, 1))]
>>> nums.mapValues(lambda x: (x,1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
[('pink', (7, 2)), ('panda', (1, 2)), ('pirate', (3, 1))]
>>>
>>> data=[('a',3),('b',4),('a',1)]
>>> data
[('a', 3), ('b', 4), ('a', 1)]
>>> 
>>> sc.parallelize(data).reduceByKey(lambda x,y: x+y).collect()#根据键进行合并
[('b', 4), ('a', 4)]
>>> sc.parallelize(data).reduceByKey(lambda x,y: x+y,10).collect()#自定义并行度
[('b', 4), ('a', 4)]
>>>
>>>
>>> rdd.collect()
[('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)]
>>> rdd.sortByKey(ascending=True,numPartitions=None,keyfunc=lambda x:str(x)).collect()
[('panda', 1), ('panda', 0), ('pink', 3), ('pink', 4), ('pirate', 3)]#根据key进行排序
>>>
>>>
>>>
>>> import csv
>>> import StringIO
>>> def loadRecord(line):
...     input=StringIO.StringIO(line)
...     reader=csv.DictReader(input,fieldnames=['name','favouriteAnimal'])
...     return reader.next()
input=sc.textFile('file:///usr/ins//2.5.0.0-1245//spark2//bin//README.md').map(loadRecord)
>>> input.count()
>>> input.collect()
>>>
>>> #完整读取csv文件
>>> def loadRecords(fileNameContents):
...     input=StringIO.StringIO(fileNameContents[1])
...     reader=csv.DictReader(input,fieldnames=['name','favoriteAnimal'])
...     return reader
>>> fullFileData=sc.wholeTextFiles('file:///usr//ins//2.5.0.0-1245//spark2//bin//README.md').flatMap(loadRecords)                          >>> fullFileData.count()
>>> fullFileData.collect()
>>>
>>>
>>> from pyspark.sql import HiveContext
>>> hiveCtx=HiveContext(sc)
>>> rows=hiveCtx.sql("select cust_id,item_id,date1,qty_sold from p_cust_item_month")
>>> rows.first()
Row(cust_id=u'110102206188', item_id=u'43010116', date1=u'201504', qty_sold=Decimal('5.00'))
>>> print rows.first().item_id
35260104
>>> rows
DataFrame[cust_id: string, item_id: string, date1: string, qty_sold: decimal(22,2)]
>>>
>>> rows.head().cust_id
u'110108202829'
>>> rows.head().item_id
u'53020107'
>>>
>>> rows.first()
Row(cust_id=u'110116101756', item_id=u'53010216', date1=u'201412', qty_sold=Decimal('10.00'))
>>>
>>> rows.show(5)
+------------+--------+------+--------+
|     cust_id| item_id| date1|qty_sold|
+------------+--------+------+--------+
|110105209584|53010202|201602|    5.00|
|110105102027|90010207|201602|    1.00|
|110106100592|11018820|201602|    5.00|
|110116102566|53020502|201602|   63.00|
|110111101915|11016808|201602|    8.00|
+------------+--------+------+--------+
only showing top 5 rows
>>>
>>> rows.select("item_id").show(5)
+--------+
| item_id|
+--------+
|22240111|
|53020502|
|35260123|
|33010107|
|53020411|
+--------+
only showing top 5 rows
>>>