lambda表达式是stream的基础,初学者建议先学习lambda表达式,http://www.zzvips.com/article/123637.html
1.初识stream
先来一个总纲:
东西就是这么多啦,stream是java8中加入的一个非常实用的功能,最初看时以为是io中的流(其实一点关系都没有),让我们先来看一个小例子感受一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@before
public void init() {
random = new random();
stulist = new arraylist<student>() {
{
for ( int i = 0 ; i < 100 ; i++) {
add( new student( "student" + i, random.nextint( 50 ) + 50 ));
}
}
};
}
public class student {
private string name;
private integer score;
//-----getters and setters-----
}
//1列出班上超过85分的学生姓名,并按照分数降序输出用户名字
@test
public void test1() {
list<string> studentlist = stulist.stream()
.filter(x->x.getscore()> 85 )
.sorted(comparator.comparing(student::getscore).reversed())
.map(student::getname)
.collect(collectors.tolist());
system.out.println(studentlist);
}
|
列出班上分数超过85分的学生姓名,并按照分数降序输出用户名字,在java8之前我们需要三个步骤:
1)新建一个list<student> newlist,在for循环中遍历stulist,将分数超过85分的学生装入新的集合中
2)对于新的集合newlist进行排序操作
3)遍历打印newlist
这三个步骤在java8中只需要两条语句,如果紧紧需要打印,不需要保存新生产list的话实际上只需要一条,是不是非常方便。
2.stream的特性
我们首先列出stream的如下三点特性,在之后我们会对照着详细说明
1.stream不存储数据
2.stream不改变源数据
3.stream的延迟执行特性
通常我们在数组或集合的基础上创建stream,stream不会专门存储数据,对stream的操作也不会影响到创建它的数组和集合,对于stream的聚合、消费或收集操作只能进行一次,再次操作会报错,如下代码:
1
2
3
4
5
6
|
@test
public void test1(){
stream<string> stream = stream.generate(()-> "user" ).limit( 20 );
stream.foreach(system.out::println);
stream.foreach(system.out::println);
}
|
程序在正常完成一次打印工作后报错。
stream的操作是延迟执行的,在列出班上超过85分的学生姓名例子中,在collect方法执行之前,filter、sorted、map方法还未执行,只有当collect方法执行时才会触发之前转换操作
看如下代码:
1
2
3
4
5
6
7
8
9
10
11
|
public boolean filter(student s) {
system.out.println( "begin compare" );
return s.getscore() > 85 ;
}
@test
public void test() {
stream<student> stream = stream.of(stuarr).filter( this ::filter);
system.out.println( "split-------------------------------------" );
list<student> studentlist = stream.collect(tolist());
}
|
我们将filter中的逻辑抽象成方法,在方法中加入打印逻辑,如果stream的转换操作是延迟执行的,那么split会先打印,否则后打印,代码运行结果为
可见stream的操作是延迟执行的。
tip:
当我们操作一个流的时候,并不会修改流底层的集合(即使集合是线程安全的),如果想要修改原有的集合,就无法定义流操作的输出。
由于stream的延迟执行特性,在聚合操作执行前修改数据源是允许的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
list<string> wordlist;
@before
public void init() {
wordlist = new arraylist<string>() {
{
add( "a" );
add( "b" );
add( "c" );
add( "d" );
add( "e" );
add( "f" );
add( "g" );
}
};
}
/**
* 延迟执行特性,在聚合操作之前都可以添加相应元素
*/
@test
public void test() {
stream<string> words = wordlist.stream();
wordlist.add( "end" );
long n = words.distinct().count();
system.out.println(n);
}
|
最后打印的结果是8
如下代码是错误的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
/**
* 延迟执行特性,会产生干扰
* nullpointexception
*/
@test
public void test2(){
stream<string> words1 = wordlist.stream();
words1.foreach(s -> {
system.out.println( "s->" +s);
if (s.length() < 4 ) {
system.out.println( "select->" +s);
wordlist.remove(s);
system.out.println(wordlist);
}
});
}
|
结果报空指针异常
3.创建stream
1)通过数组创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
/**
* 通过数组创建流
*/
@test
public void testarraystream(){
//1.通过arrays.stream
//1.1基本类型
int [] arr = new int []{ 1 , 2 , 34 , 5 };
intstream intstream = arrays.stream(arr);
//1.2引用类型
student[] studentarr = new student[]{ new student( "s1" , 29 ), new student( "s2" , 27 )};
stream<student> studentstream = arrays.stream(studentarr);
//2.通过stream.of
stream<integer> stream1 = stream.of( 1 , 2 , 34 , 5 , 65 );
//注意生成的是int[]的流
stream< int []> stream2 = stream.of(arr,arr);
stream2.foreach(system.out::println);
}
|
2)通过集合创建流
1
2
3
4
5
6
7
8
9
10
11
|
/**
* 通过集合创建流
*/
@test
public void testcollectionstream(){
list<string> strs = arrays.aslist( "11212" , "dfd" , "2323" , "dfhgf" );
//创建普通流
stream<string> stream = strs.stream();
//创建并行流
stream<string> stream1 = strs.parallelstream();
}
|
3)创建空的流
1
2
3
4
5
6
7
8
9
10
11
12
|
@test
public void testemptystream(){
//创建一个空的stream
stream<integer> stream = stream.empty();
}
4 )创建无限流
@test
public void testunlimitstream(){
//创建无限流,通过limit提取指定大小
stream.generate(()-> "number" + new random().nextint()).limit( 100 ).foreach(system.out::println);
stream.generate(()-> new student( "name" , 10 )).limit( 20 ).foreach(system.out::println);
}
|
5)创建规律的无限流
1
2
3
4
5
6
7
8
9
10
|
/**
* 产生规律的数据
*/
@test
public void testunlimitstream1(){
stream.iterate( 0 ,x->x+ 1 ).limit( 10 ).foreach(system.out::println);
stream.iterate( 0 ,x->x).limit( 10 ).foreach(system.out::println);
//stream.iterate(0,x->x).limit(10).foreach(system.out::println);与如下代码意思是一样的
stream.iterate( 0 , unaryoperator.identity()).limit( 10 ).foreach(system.out::println);
}
|
4.对stream的操作
1)最常使用
map:转换流,将一种类型的流转换为另外一种流
1
2
3
4
5
6
7
8
9
|
/**
* map把一种类型的流转换为另外一种类型的流
* 将string数组中字母转换为大写
*/
@test
public void testmap() {
string[] arr = new string[]{ "yes" , "yes" , "no" , "no" };
arrays.stream(arr).map(x -> x.tolowercase()).foreach(system.out::println);
}
|
filter:过滤流,过滤流中的元素
1
2
3
4
5
|
@test
public void testfilter(){
integer[] arr = new integer[]{ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 };
arrays.stream(arr).filter(x->x> 3 &&x< 8 ).foreach(system.out::println);
}
|
flapmap:拆解流,将流中每一个元素拆解成一个流
1
2
3
4
5
6
7
8
9
10
11
|
/**
* flapmap:拆解流
*/
@test
public void testflapmap1() {
string[] arr1 = { "a" , "b" , "c" , "d" };
string[] arr2 = { "e" , "f" , "c" , "d" };
string[] arr3 = { "h" , "j" , "c" , "d" };
// stream.of(arr1, arr2, arr3).flatmap(x -> arrays.stream(x)).foreach(system.out::println);
stream.of(arr1, arr2, arr3).flatmap(arrays::stream).foreach(system.out::println);
}
|
sorted:对流进行排序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
string[] arr1 = { "abc" , "a" , "bc" , "abcd" };
/**
* comparator.comparing是一个键提取的功能
* 以下两个语句表示相同意义
*/
@test
public void testsorted1_(){
/**
* 按照字符长度排序
*/
arrays.stream(arr1).sorted((x,y)->{
if (x.length()>y.length())
return 1 ;
else if (x.length()<y.length())
return - 1 ;
else
return 0 ;
}).foreach(system.out::println);
arrays.stream(arr1).sorted(comparator.comparing(string::length)).foreach(system.out::println);
}
/**
* 倒序
* reversed(),java8泛型推导的问题,所以如果comparing里面是非方法引用的lambda表达式就没办法直接使用reversed()
* comparator.reverseorder():也是用于翻转顺序,用于比较对象(stream里面的类型必须是可比较的)
* comparator. naturalorder():返回一个自然排序比较器,用于比较对象(stream里面的类型必须是可比较的)
*/
@test
public void testsorted2_(){
arrays.stream(arr1).sorted(comparator.comparing(string::length).reversed()).foreach(system.out::println);
arrays.stream(arr1).sorted(comparator.reverseorder()).foreach(system.out::println);
arrays.stream(arr1).sorted(comparator.naturalorder()).foreach(system.out::println);
}
/**
* thencomparing
* 先按照首字母排序
* 之后按照string的长度排序
*/
@test
public void testsorted3_(){
arrays.stream(arr1).sorted(comparator.comparing( this ::com1).thencomparing(string::length)).foreach(system.out::println);
}
public char com1(string x){
return x.charat( 0 );
}
|
2)提取流和组合流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
@before
public void init(){
arr1 = new string[]{ "a" , "b" , "c" , "d" };
arr2 = new string[]{ "d" , "e" , "f" , "g" };
arr3 = new string[]{ "i" , "j" , "k" , "l" };
}
/**
* limit,限制从流中获得前n个数据
*/
@test
public void testlimit(){
stream.iterate( 1 ,x->x+ 2 ).limit( 10 ).foreach(system.out::println);
}
/**
* skip,跳过前n个数据
*/
@test
public void testskip(){
// stream.of(arr1).skip(2).limit(2).foreach(system.out::println);
stream.iterate( 1 ,x->x+ 2 ).skip( 1 ).limit( 5 ).foreach(system.out::println);
}
/**
* 可以把两个stream合并成一个stream(合并的stream类型必须相同)
* 只能两两合并
*/
@test
public void testconcat(){
stream<string> stream1 = stream.of(arr1);
stream<string> stream2 = stream.of(arr2);
stream.concat(stream1,stream2).distinct().foreach(system.out::println);
}
|
3)聚合操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@before
public void init(){
arr = new string[]{ "b" , "ab" , "abc" , "abcd" , "abcde" };
}
/**
* max、min
* 最大最小值
*/
@test
public void testmaxandmin(){
stream.of(arr).max(comparator.comparing(string::length)).ifpresent(system.out::println);
stream.of(arr).min(comparator.comparing(string::length)).ifpresent(system.out::println);
}
/**
* count
* 计算数量
*/
@test
public void testcount(){
long count = stream.of(arr).count();
system.out.println(count);
}
/**
* findfirst
* 查找第一个
*/
@test
public void testfindfirst(){
string str = stream.of(arr).parallel().filter(x->x.length()> 3 ).findfirst().orelse( "noghing" );
system.out.println(str);
}
/**
* findany
* 找到所有匹配的元素
* 对并行流十分有效
* 只要在任何片段发现了第一个匹配元素就会结束整个运算
*/
@test
public void testfindany(){
optional<string> optional = stream.of(arr).parallel().filter(x->x.length()> 3 ).findany();
optional.ifpresent(system.out::println);
}
/**
* anymatch
* 是否含有匹配元素
*/
@test
public void testanymatch(){
boolean aboolean = stream.of(arr).anymatch(x->x.startswith( "a" ));
system.out.println(aboolean);
}
@test
public void teststream1() {
optional<integer> optional = stream.of( 1 , 2 , 3 ).filter(x->x> 1 ).reduce((x,y)->x+y);
system.out.println(optional.get());
}
|
4)optional类型
通常聚合操作会返回一个optional类型,optional表示一个安全的指定结果类型,所谓的安全指的是避免直接调用返回类型的null值而造成空指针异常,调用optional.ifpresent()可以判断返回值是否为空,或者直接调用ifpresent(consumer<? super t> consumer)在结果部位空时进行消费操作;调用optional.get()获取返回值。通常的使用方式如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
@test
public void testoptional() {
list<string> list = new arraylist<string>() {
{
add( "user1" );
add( "user2" );
}
};
optional<string> opt = optional.of( "andy with u" );
opt.ifpresent(list::add);
list.foreach(system.out::println);
}
|
使用optional可以在没有值时指定一个返回值,例如
1
2
3
4
5
6
7
8
9
10
|
@test
public void testoptional2() {
integer[] arr = new integer[]{ 4 , 5 , 6 , 7 , 8 , 9 };
integer result = stream.of(arr).filter(x->x> 9 ).max(comparator.naturalorder()).orelse(- 1 );
system.out.println(result);
integer result1 = stream.of(arr).filter(x->x> 9 ).max(comparator.naturalorder()).orelseget(()->- 1 );
system.out.println(result1);
integer result2 = stream.of(arr).filter(x->x> 9 ).max(comparator.naturalorder()).orelsethrow(runtimeexception:: new );
system.out.println(result2);
}
|
optional的创建
采用optional.empty()创建一个空的optional,使用optional.of()创建指定值的optional。同样也可以调用optional对象的map方法进行optional的转换,调用flatmap方法进行optional的迭代
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@test
public void teststream1() {
optional<student> studentoptional = optional.of( new student( "user1" , 21 ));
optional<string> optionalstr = studentoptional.map(student::getname);
system.out.println(optionalstr.get());
}
public static optional< double > inverse( double x) {
return x == 0 ? optional.empty() : optional.of( 1 / x);
}
public static optional< double > squareroot( double x) {
return x < 0 ? optional.empty() : optional.of(math.sqrt(x));
}
/**
* optional的迭代
*/
@test
public void teststream2() {
double x = 4d;
optional< double > result1 = inverse(x).flatmap(streamtest7::squareroot);
result1.ifpresent(system.out::println);
optional< double > result2 = optional.of( 4.0 ).flatmap(streamtest7::inverse).flatmap(streamtest7::squareroot);
result2.ifpresent(system.out::println);
}
|
5)收集结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
student[] students;
@before
public void init(){
students = new student[ 100 ];
for ( int i= 0 ;i< 30 ;i++){
student student = new student( "user" ,i);
students[i] = student;
}
for ( int i= 30 ;i< 60 ;i++){
student student = new student( "user" +i,i);
students[i] = student;
}
for ( int i= 60 ;i< 100 ;i++){
student student = new student( "user" +i,i);
students[i] = student;
}
}
@test
public void testcollect1(){
/**
* 生成list
*/
list<student> list = arrays.stream(students).collect(tolist());
list.foreach((x)-> system.out.println(x));
/**
* 生成set
*/
set<student> set = arrays.stream(students).collect(toset());
set.foreach((x)-> system.out.println(x));
/**
* 如果包含相同的key,则需要提供第三个参数,否则报错
*/
map<string,integer> map = arrays.stream(students).collect(tomap(student::getname,student::getscore,(s,a)->s+a));
map.foreach((x,y)-> system.out.println(x+ "->" +y));
}
/**
* 生成数组
*/
@test
public void testcollect2(){
student[] s = arrays.stream(students).toarray(student[]:: new );
for ( int i= 0 ;i<s.length;i++)
system.out.println(s[i]);
}
/**
* 指定生成的类型
*/
@test
public void testcollect3(){
hashset<student> s = arrays.stream(students).collect(tocollection(hashset:: new ));
s.foreach(system.out::println);
}
/**
* 统计
*/
@test
public void testcollect4(){
intsummarystatistics summarystatistics = arrays.stream(students).collect(collectors.summarizingint(student::getscore));
system.out.println( "getaverage->" +summarystatistics.getaverage());
system.out.println( "getmax->" +summarystatistics.getmax());
system.out.println( "getmin->" +summarystatistics.getmin());
system.out.println( "getcount->" +summarystatistics.getcount());
system.out.println( "getsum->" +summarystatistics.getsum());
}
|
6)分组和分片
分组和分片的意义是,将collect的结果集展示位map<key,val>的形式,通常的用法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
student[] students;
@before
public void init(){
students = new student[ 100 ];
for ( int i= 0 ;i< 30 ;i++){
student student = new student( "user1" ,i);
students[i] = student;
}
for ( int i= 30 ;i< 60 ;i++){
student student = new student( "user2" ,i);
students[i] = student;
}
for ( int i= 60 ;i< 100 ;i++){
student student = new student( "user3" ,i);
students[i] = student;
}
}
@test
public void testgroupby1(){
map<string,list<student>> map = arrays.stream(students).collect(groupingby(student::getname));
map.foreach((x,y)-> system.out.println(x+ "->" +y));
}
/**
* 如果只有两类,使用partitioningby会比groupingby更有效率
*/
@test
public void testpartitioningby(){
map< boolean ,list<student>> map = arrays.stream(students).collect(partitioningby(x->x.getscore()> 50 ));
map.foreach((x,y)-> system.out.println(x+ "->" +y));
}
/**
* downstream指定类型
*/
@test
public void testgroupby2(){
map<string,set<student>> map = arrays.stream(students).collect(groupingby(student::getname,toset()));
map.foreach((x,y)-> system.out.println(x+ "->" +y));
}
/**
* downstream 聚合操作
*/
@test
public void testgroupby3(){
/**
* counting
*/
map<string, long > map1 = arrays.stream(students).collect(groupingby(student::getname,counting()));
map1.foreach((x,y)-> system.out.println(x+ "->" +y));
/**
* summingint
*/
map<string,integer> map2 = arrays.stream(students).collect(groupingby(student::getname,summingint(student::getscore)));
map2.foreach((x,y)-> system.out.println(x+ "->" +y));
/**
* maxby
*/
map<string,optional<student>> map3 = arrays.stream(students).collect(groupingby(student::getname,maxby(comparator.comparing(student::getscore))));
map3.foreach((x,y)-> system.out.println(x+ "->" +y));
/**
* mapping
*/
map<string,set<integer>> map4 = arrays.stream(students).collect(groupingby(student::getname,mapping(student::getscore,toset())));
map4.foreach((x,y)-> system.out.println(x+ "->" +y));
}
|
5.原始类型流
在数据量比较大的情况下,将基本数据类型(int,double...)包装成相应对象流的做法是低效的,因此,我们也可以直接将数据初始化为原始类型流,在原始类型流上的操作与对象流类似,我们只需要记住两点
1.原始类型流的初始化
2.原始类型流与流对象的转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
doublestream doublestream;
intstream intstream;
/**
* 原始类型流的初始化
*/
@before
public void teststream1(){
doublestream = doublestream.of( 0.1 , 0.2 , 0.3 , 0.8 );
intstream = intstream.of( 1 , 3 , 5 , 7 , 9 );
intstream stream1 = intstream.rangeclosed( 0 , 100 );
intstream stream2 = intstream.range( 0 , 100 );
}
/**
* 流与原始类型流的转换
*/
@test
public void teststream2(){
stream< double > stream = doublestream.boxed();
doublestream = stream.maptodouble( double :: new );
}
|
6.并行流
可以将普通顺序执行的流转变为并行流,只需要调用顺序流的parallel() 方法即可,如stream.iterate(1, x -> x + 1).limit(10).parallel()。
1) 并行流的执行顺序
我们调用peek方法来瞧瞧并行流和串行流的执行顺序,peek方法顾名思义,就是偷窥流内的数据,peek方法声明为stream<t> peek(consumer<? super t> action);加入打印程序可以观察到通过流内数据,见如下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public void peek1( int x) {
system.out.println(thread.currentthread().getname() + ":->peek1->" + x);
}
public void peek2( int x) {
system.out.println(thread.currentthread().getname() + ":->peek2->" + x);
}
public void peek3( int x) {
system.out.println(thread.currentthread().getname() + ":->final result->" + x);
}
/**
* peek,监控方法
* 串行流和并行流的执行顺序
*/
@org .junit.test
public void testpeek() {
stream<integer> stream = stream.iterate( 1 , x -> x + 1 ).limit( 10 );
stream.peek( this ::peek1).filter(x -> x > 5 )
.peek( this ::peek2).filter(x -> x < 8 )
.peek( this ::peek3)
.foreach(system.out::println);
}
@test
public void testpeekpal() {
stream<integer> stream = stream.iterate( 1 , x -> x + 1 ).limit( 10 ).parallel();
stream.peek( this ::peek1).filter(x -> x > 5 )
.peek( this ::peek2).filter(x -> x < 8 )
.peek( this ::peek3)
.foreach(system.out::println);
}
|
串行流打印结果如下:
并行流打印结果如下:
咋看不一定能看懂,我们用如下的图来解释
我们将stream.filter(x -> x > 5).filter(x -> x < 8).foreach(system.out::println)的过程想象成上图的管道,我们在管道上加入的peek相当于一个阀门,透过这个阀门查看流经的数据,
1)当我们使用顺序流时,数据按照源数据的顺序依次通过管道,当一个数据被filter过滤,或者经过整个管道而输出后,第二个数据才会开始重复这一过程
2)当我们使用并行流时,系统除了主线程外启动了七个线程(我的电脑是4核八线程)来执行处理任务,因此执行是无序的,但同一个线程内处理的数据是按顺序进行的。
2) sorted()、distinct()等对并行流的影响
sorted()、distinct()是元素相关方法,和整体的数据是有关系的,map,filter等方法和已经通过的元素是不相关的,不需要知道流里面有哪些元素 ,并行执行和sorted会不会产生冲突呢?
结论:1.并行流和排序是不冲突的,2.一个流是否是有序的,对于一些api可能会提高执行效率,对于另一些api可能会降低执行效率
3.如果想要输出的结果是有序的,对于并行的流需要使用foreachordered(foreach的输出效率更高)
我们做如下实验:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
/**
* 生成一亿条0-100之间的记录
*/
@before
public void init() {
random random = new random();
list = stream.generate(() -> random.nextint( 100 )).limit( 100000000 ).collect(tolist());
}
/**
* tip
*/
@org .junit.test
public void test1() {
long begin1 = system.currenttimemillis();
list.stream().filter(x->(x > 10 )).filter(x->x< 80 ).count();
long end1 = system.currenttimemillis();
system.out.println(end1-begin1);
list.stream().parallel().filter(x->(x > 10 )).filter(x->x< 80 ).count();
long end2 = system.currenttimemillis();
system.out.println(end2-end1);
long begin1_ = system.currenttimemillis();
list.stream().filter(x->(x > 10 )).filter(x->x< 80 ).distinct().sorted().count();
long end1_ = system.currenttimemillis();
system.out.println(end1-begin1);
list.stream().parallel().filter(x->(x > 10 )).filter(x->x< 80 ).distinct().sorted().count();
long end2_ = system.currenttimemillis();
system.out.println(end2_-end1_);
}
|
可见,对于串行流.distinct().sorted()方法对于运行时间没有影响,但是对于串行流,会使得运行时间大大增加,因此对于包含sorted、distinct()等与全局数据相关的操作,不推荐使用并行流。
7.stream vs spark rdd
最初看到stream的一个直观感受是和spark像,真的像
1
2
3
4
|
val count = sc.parallelize( 1 to num_samples).filter { _ =>
val x = math.random
val y = math.random
x*x + y*y < 1 }.count()println(s "pi is roughly ${4.0 * count / num_samples}" )
|
以上代码摘自spark官网,使用的是scala语言,一个最基础的word count代码,这里我们简单介绍一下spark,spark是当今最流行的基于内存的大数据处理框架,spark中的一个核心概念是rdd(弹性分布式数据集),将分布于不同处理器上的数据抽象成rdd,rdd上支持两种类型的操作1) transformation(变换)2) action(行动),对于rdd的transformation算子并不会立即执行,只有当使用了action算子后,才会触发。
总结
以上所示是小编给大家介绍的java8中的stream相关知识,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!
原文链接:http://www.cnblogs.com/andywithu/p/7404101.html