Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ >>> rdd=sc.parallelize({('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)}) >>> rdd.collect() [('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)] >>> >>> rdd.mapValues(lambda x: (x, 1)).collect() [('panda', (1, 1)), ('pink', (3, 1)), ('pirate', (3, 1)), ('panda', (0, 1)), ('pink', (4, 1))] >>> nums=rdd >>> nums.collect() [('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)] >>> >>> sumCount = nums.combineByKey((lambda x: (x,1)),(lambda x, y: (x[0] + y, x[1] + 1)),(lambda x, y: (x[0] + y[0], x[1] + y[1]))) >>> >>> nums.mapValues(lambda x: (x,1)).collect() #比较巧妙的用法 [('panda', (1, 1)), ('pink', (3, 1)), ('pirate', (3, 1)), ('panda', (0, 1)), ('pink', (4, 1))] >>> nums.mapValues(lambda x: (x,1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect() [('pink', (7, 2)), ('panda', (1, 2)), ('pirate', (3, 1))] >>> >>> data=[('a',3),('b',4),('a',1)] >>> data [('a', 3), ('b', 4), ('a', 1)] >>> >>> sc.parallelize(data).reduceByKey(lambda x,y: x+y).collect()#根据键进行合并 [('b', 4), ('a', 4)] >>> sc.parallelize(data).reduceByKey(lambda x,y: x+y,10).collect()#自定义并行度 [('b', 4), ('a', 4)] >>> >>> >>> rdd.collect() [('panda', 1), ('pink', 3), ('pirate', 3), ('panda', 0), ('pink', 4)] >>> rdd.sortByKey(ascending=True,numPartitions=None,keyfunc=lambda x:str(x)).collect() [('panda', 1), ('panda', 0), ('pink', 3), ('pink', 4), ('pirate', 3)]#根据key进行排序 >>> >>> >>> >>> import csv >>> import StringIO >>> def loadRecord(line): ... input=StringIO.StringIO(line) ... reader=csv.DictReader(input,fieldnames=['name','favouriteAnimal']) ... return reader.next() input=sc.textFile('file:///usr/ins//2.5.0.0-1245//spark2//bin//README.md').map(loadRecord) >>> input.count() >>> input.collect() >>> >>> #完整读取csv文件 >>> def loadRecords(fileNameContents): ... input=StringIO.StringIO(fileNameContents[1]) ... reader=csv.DictReader(input,fieldnames=['name','favoriteAnimal']) ... return reader >>> fullFileData=sc.wholeTextFiles('file:///usr//ins//2.5.0.0-1245//spark2//bin//README.md').flatMap(loadRecords) >>> fullFileData.count() >>> fullFileData.collect() >>> >>> >>> from pyspark.sql import HiveContext >>> hiveCtx=HiveContext(sc) >>> rows=hiveCtx.sql("select cust_id,item_id,date1,qty_sold from p_cust_item_month") >>> rows.first() Row(cust_id=u'110102206188', item_id=u'43010116', date1=u'201504', qty_sold=Decimal('5.00')) >>> print rows.first().item_id 35260104 >>> rows DataFrame[cust_id: string, item_id: string, date1: string, qty_sold: decimal(22,2)] >>> >>> rows.head().cust_id u'110108202829' >>> rows.head().item_id u'53020107' >>> >>> rows.first() Row(cust_id=u'110116101756', item_id=u'53010216', date1=u'201412', qty_sold=Decimal('10.00')) >>> >>> rows.show(5) +------------+--------+------+--------+ | cust_id| item_id| date1|qty_sold| +------------+--------+------+--------+ |110105209584|53010202|201602| 5.00| |110105102027|90010207|201602| 1.00| |110106100592|11018820|201602| 5.00| |110116102566|53020502|201602| 63.00| |110111101915|11016808|201602| 8.00| +------------+--------+------+--------+ only showing top 5 rows >>> >>> rows.select("item_id").show(5) +--------+ | item_id| +--------+ |22240111| |53020502| |35260123| |33010107| |53020411| +--------+ only showing top 5 rows >>>