Spark大数据分析——pyspark(一)

时间:2023-02-01 08:15:23

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

SparkSession available as 'spark'
>>> lines=sc.textFile("README.md")
>>> lines=sc.textFile("file:///usr/inspur/2.5.0.0-1245/spark2/bin/README.md")
>>> lines.count()
8
>>> lines.collect()
[u'test spark2', u'RDD', u'zhangshuai_lc', u'123456', u'89', u'inspur hadoop', u'86+18615223050', u'100million']
>>> lines.first()
u'test spark2'
>>> sc
<pyspark.context.SparkContext object at 0x1aa3c10>
>>> lines
file:///usr/inspur/2.5.0.0-1245/spark2/bin/README.md MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0

>>> numlines=lines.map(lambda x: '23' in x)
>>> numlines.collect()
[False, False, False, True, False, False, True, False]
>>> numlines.first()
False
>>>
>>> lines.collect()
[u'test spark2', u'RDD', u'zhanghuai_lc', u'123456', u'89', u'inspuradoop', u'86+18622223050', u'100million']
>>> numlines=lines.filter(lambda x: '23' in x) #过滤
>>> numlines.first()
u'123456'
>>> numlines.collect()
[u'123456', u'86+18615223050']
>>> numlines
PythonRDD[18] at collect at <stdin>:1
>>> numlines.count()
2
>>> lines1=sc.parallelize(['pandas','i like pandas']) #创建RDD
>>> lines1
ParallelCollectionRDD[20] at parallelize at PythonRDD.scala:475
>>> lines1.collect()
['pandas', 'i like pandas']
>>> numlines.collect() #RDD分为转化操作和行动操作
[u'123456', u'86+18615223050']

>>> alines=lines.filter(lambda x: 'a' in x)
>>> alines.collect()
[u'test spark2', u'zhanghuai_lc', u'inspuradoop']

>>> num_a_lines=numlines.union(alines) #联合
>>> num_a_lines.collect()
[u'123456', u'86+18622223050', u'test spark2', u'zhanghuai_lc', u'inspuradoop']
>>> num_a_lines.first()
u'123456'
>>> num_a_lines.take(3)
[u'123456', u'86+18615223050', u'test spark2']
>>> for item in num_a_lines.take(3): #取前三个
...     print item
...
123456
86+18615223050
test spark2
#RDD惰性求值,不应该将其视为存放着特定数据的数据集,而是通过转化操作构建的、包含各种计算数据方式的指令集
>>> nums=sc.parallelize([1,2,3,4])
>>> squared=nums.map(lambda x: x*x).collect()
>>> for item in squared:
...     print '%d' %item
1
4
9
16

>>> lines=sc.parallelize(['hello world','inspu'])
>>> lines.first()
'hello world'
>>> words=lines.flatMap(lambda x: x.split(' '))
>>> words.collect()
['hello', 'world', 'inspu']
>>> words.first()
'hello'
>>> #cartesian(other)用来求两个RDD的笛卡尔积
...
>>> nums=sc.parallelize([1,2,3,4])
>>> nums.mean()
2.5
>>> nums.collect()
[1, 2, 3, 4]
>>>
>>> rdd=sc.parallelize([1,2,3,3])
>>> rdd.countByValue()  #根据键值计数
defaultdict(<type 'int'>, {1: 1, 2: 1, 3: 2})
>>> rdd.take(2)
[1, 2]
>>> rdd.top(2)
[3, 3]
>>> rdd.takeOrdered(2)
[1, 2]
>>>
>>> pairs = lines.map(lambda x: (x.split(" ")[0], x))
>>> pairs.collect()
[('hello', 'hello world'), ('inspu', 'inspu')]
>>> #在 Python 中使用第一个单词作为键创建出一个 pair RDD
...
>>> rdd=sc.parallelize({(1, 2), (3, 4), (3, 6)}) #键值对
>>> rdd.collect()
[(1, 2), (3, 4), (3, 6)]
>>>
>>>
>>>
>>> rdd=sc.parallelize({(1, 2), (3, 4), (3, 6)}) #键值对
>>> rdd.collect()
[(1, 2), (3, 4), (3, 6)]
>>> rdd.reduceByKey(lambda x, y: x + y).collect() #根据键reduce合并
[(1, 2), (3, 10)]
>>> 
>>>
>>> rdd.groupByKey().collect()
>>> rdd.collect()
[(1, 2), (3, 4), (3, 6)]
>>> rdd.groupByKey().first()
(1, <pyspark.resultiterable.ResultIterable object at 0x1b09850>)
>>> rdd.groupByKey().count()
2
>>>
>>> rdd.mapValues(lambda x: x+1).collect()  #只对键值操作
[(1, 3), (3, 5), (3, 7)]
>>> rdd.keys().collect()
[1, 3, 3]
>>> rdd.values().collect()
[2, 4, 6]
>>> other=sc.parallelize({(3,9)})
>>> other.collect()
[(3, 9)]
#键值对 减操作
>>> rdd.subtractByKey(other).collect()
[(1, 2)]
>>>
>>> other.collect()
[(3, 9)]
>>> rdd.collect()
[(1, 2), (3, 4), (3, 6)]
>>> rdd.join(other).collect()#对两个RDD进行内连接
[(3, (4, 9)), (3, (6, 9))]
>>> 
>>> pairs.collect()
[('hello', 'hello world'), ('inspu', 'inspu')]
>>> result = pairs.filter(lambda keyValue: len(keyValue[1]) < 10) #过滤键值
>>> result.collect()
[('inspur', 'inspur')]
>>>
>>>
>>>
>>> rdd=sc.parallelize({('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)})                                                     
>>> rdd.collect()
[('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)]
>>> rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()#求和并做统计
[('pink', (7, 2)), ('panda', (1, 2)), ('pirate', (3, 1))]
>>> 
>>> 
>>> rdd=sc.parallelize([1,2,3,3])
>>> rdd.countByValue()
defaultdict(<type 'int'>, {1: 1, 2: 1, 3: 2})
>>>
>>> pairs = lines.map(lambda x: (x.split(" ")[0], x)) #获取键值对的两种方法
>>> pairs.collect()
[('hello', 'hello world'), ('inspur', 'inspur')]
>>>
>>> rdd=sc.parallelize({(1, 2), (3, 4), (3, 6)})
>>> rdd.collect()
[(1, 2), (3, 4), (3, 6)]
>>> rdd.reduceByKey(lambda x, y: x + y).collect() #键值对
[(1, 2), (3, 10)]
>>>
>>>
>>>
>>> result = pairs.filter(lambda keyValue: len(keyValue[1]) < 10)
>>> result.collect()
[('inspu', 'inspu')]
>>>
>>> rdd=sc.parallelize({('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)})
>>> rdd.collect()
[('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)]
>>> rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect() #对应键求和并做统计
[('pink', (7, 2)), ('panda', (1, 2)), ('pirate', (3, 1))]
>>> #归并
...
>>> rdd.mapValues(lambda x: (x,1)).collect()
[('panda', (1, 1)), ('pink', (3, 1)), ('pirate', (3, 1)), ('panda', (0, 1)), ('pink', (4, 1))]
>>> rdd.collect()
[u'hello world', u'inspu', u'bj', u'zhanghuai_lc', u'google', u'machine learning', u'AI', u'what are U doing']
>>> words = rdd.flatMap(lambda x: x.split(" "))
>>> words.count()
13
>>> 
>>> words.map(lambda x: (x, 1)).collect()
>>> result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)#可用来统计相同键的个数