详解 Java Stream流式处理 API

要求:JDK1.8+

循环遍历 - stream().forEach

stream().forEach 用的多线程方式,其调用线程池的时候必然会耗费更多的时间。但如果你在循环内要处理的事情很多,或者要循环调用远程接口or数据库的时候,无疑极大的提升了效率。

stream().forEach
1
2
books.stream().forEach(System.out::print);
books.stream().forEach(book -> book.setUrl("https://codeun.com"));

过滤 - stream().filter

1
2
3
List<Book> newList = books.stream().filter(book ->
book.getUrl().equals("http://codeun.com")
).collect(Collectors.toList());

排序 - stream().sorted

1
2
3
List<Book> sortList = books.stream().sorted(Comparator.comparing(Book::getPrice)).collect(Collectors.toList());
// 反转
List<Book> reversedList = books.stream().sorted(Comparator.comparing(Book::getPrice).reversed()).collect(Collectors.toList());

转换 - stream().map | stream().flatmap()

1
2
3
4
5
6
7
8
//输出 [j, a, v, a][p, y, t, h, o, n][g, o]
array.stream()
.map(word -> word.split(""))
.forEach(w -> System.out.print(Arrays.toString(w)));
// 输出 javapythongo
array.stream()
.flatMap(word -> Stream.of(word.split("")))
.forEach(System.out::print);

map的操作是处理完之后就返回一个数组对象,最后再转变成一个stream,所以在stream的foreach遍历的时候,就有三个stream对象

flatmap的操作则是深入到每一个元素,对每个元素数组处理后,汇集成一个新的stream

flatmapmap上的用法也有区别,flatmap需要再加一个Arrays.stream,而map不用。

归约 - stream().reduce

reduce方法用于对stream中元素进行聚合求值,最常见的用法就是将stream中一连串的值合成为单个值,比如为一个包含一系列数值的数组求和。

reduce方法有三个重载的用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Optional<T> reduce(BinaryOperator<T> accumulator);

Example:
List<Integer> numList = Arrays.asList(1,2,3,4,5);
int result = numList.stream().reduce((a,b) -> a + b ).get();

/** Console
a=1, b=2
a=3, b=3
a=6, b=4
a=10, b=5
result: 15
*/
```

> 示例代码实现了元素累加,当表达式是第一次被执行,`a``b` 是`stream`中的第一、二个元素,之后`a`参数是表达式对上一次执行结果的缓存,对当次执行结果作为下一次执行的参数,而参数`b`则是依次为`stream`中每个元素。

```java
T reduce(T identity, BinaryOperator<T> accumulator);
Example:
List<Integer> numList = Arrays.asList(1,2,3,4,5);
int result = numList.stream().reduce(10,(a,b) -> a + b );
// result: 25

与上面方式的区别在于添加identity来指定一个自定义初始值,这样表达式第一次执行时,a就不是指向第一个元素了。
因为指定了初始值,因此即使stream为空,也不会出现返回结果为null的情况,返回结果就不需要Optional包装了。

1
2
3
4
5
<U> U reduce(U identity, BiFunction<U, ? super T, U>  accumulator, BinaryOperator<U> combiner);
Example:
List<Integer> numList = Arrays.asList(Integer.MAX_VALUE,Integer.MAX_VALUE);
long result = numList.stream().reduce(0L,(a,b) -> a + b, (a,b)-> 0L );
// result: 4294967294

第三种签名主要作用是类型归并,增加灵活度,前两种方式返回结果必须与元素类型相同,比如元素过大,返回结果超过int的最大值。
需要注意的是这个方法中的第三个参数:BinaryOperator<U>类型的表达式,在常规的stream中它并不会被执行到,但是不能给定为Null,不然会报错,只有在并行stream中,此表达式则会被执行到。

stream().distinct() 去重

distinct使用hashCode()equals()方法来获取不同的元素,所以自定义类中必须要实现hashCode()equals()方法。
如果我们想要按照对象的属性进行去重,我们可以通过其它方法来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
List<Book> list = new ArrayList<>();
{
list.add(new Book("Core Java", 200));
list.add(new Book("Core Java", 300));
list.add(new Book("Learning Freemarker", 150));
list.add(new Book("Spring MVC", 200));
list.add(new Book("Hibernate", 300));
}
list.stream().filter(distinctByKey(b -> b.getName()))
.forEach(b -> System.out.println(b.getName()+ "," + b.getPrice()));
}
private static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
Map<Object,Boolean> seen = new ConcurrentHashMap<>();
return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
}
}
// 输出:
// Core Java,200
// Learning Freemarker,150
// Spring MVC,200
// Hibernate,300

stream().count() 总数量

跟List接口的size一样,返回的都是这个集合流中元素的长度,不同的是,流是集合的一个高级工厂,中间操作是工厂里的每一道工序,count作用于我们对这个流操作完成后数量的和。

stream().anyMatch() | stream().allMatch() | stream().noneMatch()

1
2
3
4
List<String> array = Arrays.asList("java", "python", "go");
boolean anyMatch = array.stream().anyMatch(s -> s.equals("java")); // true
boolean allMatch = array.stream().allMatch(s -> s.equals("java")); // false
boolean noneMatch = array.stream().noneMatch(s -> s.equals("php")); // true

这三个方法,传入的都是Predicate的函数式接口
anyMatch表示,判断的条件里,任意一个元素成功,返回true
allMatch表示,判断条件里的元素,所有的都是,返回true
noneMatchallMatch相反,判断条件里的元素,所有的都不是,返回true

stream().limit() 和 stream().skip - 中间操作

1
2
3
4
5
6
7
8
9
10
11
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.filter(i -> i % 2 == 0)
.limit(2)
.forEach(System.out::println);
// 输出 2 4

Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.filter(i -> i % 2 == 0)
.skip(2)
.forEach(System.out::println);
// 输出 6 8 10

limit是一种短路操作,达到n之后会截断操作,而skip是跳过stream中的前n个元素,如果n大于stream的大小,则返回空stream,两者的n都不能为负值。
组合skip(offset)limit(limit)一起使用,可以达到分页效果,

stream().peek() - 中间操作

peek()map()有些相似,可以方便调试,帮助我们观察传递给每个操作的数据。同时,在修改流中的元素时,可以使用peek()而不是使用map(),但是不能修改不可变对象。

stream().collect() 收集

利用collect(Collectors.toList())是一个简单的收集操作,是对处理结果的封装,对应的还有toSet、toMap等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Example:
// 最大价格
books.stream().collect(Collectors.maxBy(Comparator.comparing(Book::getPrice)));
// 最小价格
books.stream().collect(Collectors.minBy(Comparator.comparing(Book::getPrice)));
// 价格总和
books.stream().collect(Collectors.summingInt(Book::getPrice));
// 平均价格
books.stream().collect(Collectors.averagingInt(Book::getPrice));
// 一次性得到元素个数、总和、均值、最大值、最小值,对应的还有summarizingLong、summarizingDouble
// 结果: IntSummaryStatistics { count=10, sum=95, min=3, average=9.5, max=13 }
IntSummaryStatistics statistics = books.stream().collect(Collectors.summarizingInt(Book::getPrice));
// 字符串拼接
//String names = books.stream().map(Book::getName).collect(Collectors.joining(", "));

启动并行流式处理 - parallelStream()

parallelStream就是一个并行执行的流,默认通过ForkJoinPool多线程执行任务,提高速度。
parallelStream本质上基于java7的Fork-Join框架实现,其默认的线程数为宿主机的内核数,使用parallelStream()注意要保证线程安全。
处理的过程是使用分治法(Divide-and-Conquer Algorithm),也就是将一个大任务切分成多个小任务,每个任务都是一个操作。

1
2
3
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream().forEach(System.out::println);
numbers.parallelStream().forEachOrdered(System.out::println);

使用parallelStream得到的顺序因为多线程执行的原因,可能是任意顺序,如果平行处理时,希望最后顺序是按照原来Stream的数据顺序,那可以调用forEachOrdered()。