Java 8 提供了一个新的 API(称为”流“,Stream),它支持大数据的并行操作,其思路和在数据库查询语言中的思路相似 – 用更高级的方式表达想要的东西,而由”实现“(在这里是 Streams 库)来选择最佳低级执行机制。这样就可以避免用 synchronized 来编写代码,这一行代码不仅容器出错,而且在多核 CPU 上执行所需的成本比想象要高1

思想

是函数式编程(functional programming)的一种 Java 实现

强调将计算过程分解成可复用的函数,主要使用 map 方法和 reduce 方法组合而成的 MapReduce 算法,最好的实现 Apache Hadoop

Streams 和 Collections 的不同

  • 不储存元素。 Stream 不是储存元素的数据结构;相反的,它通过管道对源就像数据结构、数组、构造方法、IO 流的元素进行操作。
  • 纯粹的方法,也叫无副作用 (no side effect)。 Stream 上的操作会产生结果,但不会修改其来源。例如,过滤从集合获取的流会生成一个没有过滤元素的新 Stream,而不是从源集合中删除元素。
  • 惰性化。 许多流操作(例如过滤,映射或重复移除)可以被惰性化实现,从而为优化提供机会。流操作分为中间 (intermediate) 操作和终结 (terminal) 操作,中间操作总是惰性的。
  • 可能没有限制。 尽管集合的大小有限,但流不需要。诸如 limit(n) 或 findFirst() 之类的短路操作可以允许无限流上的计算在有限的时间内完成。
  • 一次性。 流的元素在流的生命周期中仅访问过一次。像 Iterator 一样,必须生成一个新的流来重新访问源的相同元素。

集合和流,它们有不同的关注点,集合主要关注集合的有效管理和访问。 相反,流不直接提供访问和操作元素的手段,而是关注于声明性地描述它们的来源和将在该来源上进行的计算操作。如果流操作没有你想要的功能,你可以使用 iterator() 或 spliterator() 来遍历操作。

外部迭代器是客户端主动调用迭代的,内部迭代器是由内部迭代器自身迭代的。枚举,增强 for 循环还是使用 iterator() 来进行遍历,它们同属于外部遍历器,java8 集合的 forEach 和 stream 的 forEach() 属于内部遍历器,流的内部遍历器可以使用到流的并行(parallel)特性,从而加快速度。

流操作分为中间操作(intermediate operations)和终结操作(terminal operations),结合形成流管道。流管道由源(例如集合,数组,生成器函数或 I/O 通道)组成; 随后是零个或多个中间操作,例如 Stream.filterStream.map 和诸如 Stream.forEachStream.reduce 之类的终结操作。

常用操作

操作类型返回类型使用的类型/函数式接口函数描述符
filter中间StreamPredicateT -> boolean
distinct中间 (有状态 - 无界)Stream
skip中间 (有状态 - 有界)Streamlong
limit中间 (有状态 - 有界)Streamlong
map中间StreamFunction<T, R>T -> R
flatMap中间StreamFunction<T, Stream>T -> Stream
sorted中间 (有状态 - 无界)StreamComparator(T, T) -> int
anyMatch终端booleanPredicateT -> boolean
noneMatch终端booleanPredicateT -> boolean
allMatch终端booleanPredicateT -> boolean
findAny终端Optional
findFirst终端Optional
forEach终端voidConsumerT -> void
collect终端RCollector<T, A, R>
reduce终端 (有状态 - 有界)OptionalBinaryOperator(T, T) -> T
count终端long

Intermediate operations(中间操作)

中间操作返回一个新的流(Stream)。

他们总是惰性的,执行诸如 filter() 之类的中间操作实际上并不执行任何过滤,而是创建一个新的流,该流在遍历时包含与给定谓词相匹配的初始流的元素。在管道的终结操作被执行时对源的流水遍历才会开始。

中间操作进一步分为无状态和有状态操作。无状态操作(如 filtermap)在处理新元素时不会保留先前看到的元素的状态 – 每个元素可以独立于其他元素上的操作进行处理。有状态的操作(如 distinctsorted)可能会在处理新元素时结合之前看到的元素的状态。

有状态的操作可能需要在生成结果之前处理整个输入。例如,只有在查看了流的所有元素之后,才能对排序流产生任何结果。因此,在并行计算中,一些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓存重要数据。只包含无状态中间操作的流水线可以一次处理,无论是顺序处理还是并行处理,只需最少的数据缓冲。

常用的一些操作:

map

map

返回由给定函数作用于此流的元素后产生的结果组成的流。  给定函数应为无干涉,无状态的操作作用于每个元素。不然对于并发流后面的操作结果可能不会很准确。

  • 无干涉 无干涉主要是指在流操作期间不去修改源流

 + 无状态 无状态是指我们在处理时不产生中间状态,操作不依赖之前的状态。

filter

过滤是按照一定的规则对流中的元素进行检查,将符合条件的元素抽取到新的流中的操作。

filter

distinct

distinct

distinct 保证输出的流中包含唯一的元素,它是通过 Object.equals(Object) 来检查是否包含相同的元素。它是一个有状态的中间操作。

在并行流中对无序数组去重效率更高,对于有序数组可以使用 unordered() 无序检索提高速度,或者使用sequential()来实现串行。

相反有序数组更适合使用串行流。

peek

peek

peek 产生一个和原流相同的流,并在遍历流的过程中去消费每个元素。

使用 peek 的主要目的是 “看,不要动”。 此方法主要用于支持调试,您希望在元素流经管道中的某个点时看到这些元素:请谨慎使用此方法作为副作用,因为它有可能会修改源流。

flatMap

flatmap

返回一个流,该流包含将原流的每个元素替换为映射函数应用于每个元素而生成的映射流的内容的结果。每个映射流都将其内容放入此流后关闭。(如果映射流为空,则使用空流代替) 简而言之,扁平化多个流为一个新流。

flatMap() 操作具有对流的元素应用一对多转换,然后将生成的元素展平为新流的效果。

example

orders 是采购订单流,并且每个采购订单都包含一系列采购列,则以下内容会生成包含所有订单中的所有采购列的流:

orders.flatMap(order -> order.getLineItems().stream())...

如果 path 是文件的路径,那么下面的内容会生成包含在该文件中的单词流:

    Stream<String> lines = Files.lines(path, StandardCharsets.UTF_8);
    Stream<String> words = lines.flatMap(line -> Stream.of(line.split(" +")));

上面 flatmap 中的映射函数使用的正则比较简单,具体单词划分的正则不是这样。

limit

limit

对流进行截断操作,获取其前 N 个元素,如果原流中包含的元素个数小于 N,那就获取其所有的元素。

count

返回此流中元素的数量。这是一个简写,相当于:

    return mapToLong(e -> 1L).sum();

Terminal operations(终结操作)

终结操作返回确定类型的结果

Stream.forEachIntStream.sum,可能会遍历流以产生结果或副作用(side-effect)。终结操作执行后,流管道被视为消耗,并不能再使用;如果你需要再次遍历相同的数据源,则必须返回到数据源以获取新的流。在几乎所有情况下,终结操作都非常急切,在返回之前完成数据源的遍历和管道的处理。只有终结操作 iterator()spliterator() 不是。

副作用(side-effect)

副作用可能会违反无状态要求和对线程安全产生危害。 许多计算可能会产生副作用,但是可以更安全有效地表达,而不会产生副作用,例如使用 归约 而不是 可变累加器。少量流操作(例如 forEach()peek())只能通过副作用操作,应该小心使用。比如我们在对流操做以期望得到想要的结果,而无意修改了原始流,便产生了副作用。

常见的副作用包括:

  • 修改任何外部变量或对象属性(例如,全局变量或流作用域外部的变量)
  • 打印日志
  • 文件读写
  • 网络请求
  • 触发外部事件
  • 调用其它由副作用的方法
  • 等等

归约(Reduction operations)

归约操作(也称为折叠)将一系列输入元素,通过重复应用组合操作(例如查找一组数字的和或最大值)或将元素累加到列表中来将它们组合为单个汇总结果。

流类具有多种形式的通用归约操作,称为 reduce()collect(),以及多个专用简化形式,如 sum()max()count()

规约方法的优势与并行化

相比于逐步迭代求和,使用 reduce 的好处在于,这里的迭代被内部迭代抽象掉了,这让内部实现得以选择并行执行 reduce 操作。而迭代式求和要更新共享变量 sum,这不是那么容易并行化的。如果你加入了同步,很可能会发现线程竞争抵消了并行本应带来的性能提升,这种计算的并行化需要另一种办法:将输入分块,分块求和,最后再合并起来。参见:ForkJoin 框架 但现在重要的是要认识到,可变的累加器模式对于并行化来说是思路一条。需要一种新的模式,这正是 reduce 所提供的。

int sum = numbers.parallelStream().reduce(0, Integer::sum);

但要并行执行这段代码也要付出一定代价:传递给 reduce 的 Lambda 不能更改状态(如实例变量),而且操作作必须满足结合律才可以按任意顺序执行。

可变归约(Mutable reduction)

可变归约操作将输入元素累加到可变结果容器中,例如 CollectionStringBuilder,因为它处理流中的元素。

可变归约操作称为 collect(),因为它将所需结果一起收集到结果容器(如集合)中。 收集操作需要三个功能:构造结果容器的新实例的供应者函数,将输入元素并入结果容器的累加器函数以及将一个结果容器的内容合并到另一个结果容器的组合函数。

  • 供应器 (supplier())
  • 累加器 (accumulator())
  • 组合器 (combiner())
  • 修整器 (finisher()) 可省略

归约 (reduce) 方法旨在把两个值结合起来生成一个新值,它是一个不可变的规约。与此相反,collect 方法的设计就是要改变容器,从而累加要输出的结果。错误的使用归约方法可能导致并行工作不正常。

<R> R collect(Supplier<R> supplier,
            BiConsumer<R, ? super T> accumulator,
            BiConsumer<R, R> combiner);

例如下面的代码:

ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
   strings.add(element.toString());
} 

我们可以写成:

ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
                                        (c, e) -> c.add(e.toString()),
                                        (c1, c2) -> c1.addAll(c2));

简写作:

List<String> strings = stream.map(Object::toString)
                                  .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);

在这里,我们的供应器是 ArrayList 构造函数,累加器将字符串化的元素添加到 ArrayList,组合器只是简单地使用 addAll 将字符串从一个容器复制到另一个容器中。 collect 的供应器,累加器和组合器三个方面紧密耦合。我们可以使用抽象的 Collector 来包含三个方面,上面的代码可以重写为:

List<String> strings = stream.map(Object::toString)
                                  .collect(Collectors.toList());
Collector 收集器接口

使用收集器对流的元素执行可变归约操作。收集器封装了用作收集参数的函数 (供应器,累加器,组合器),从而允许重用收集策略和组合收集操作,例如多级分组或分区。

收集器(Collectors)实现类

Collectors 是 Collector 接口的实现类,它里面包含了常用的收集策略。 常见的策略以静态工厂方法的形式提供,包括:

  • 将元素累加到集合中; toList toMap toSet toCollection
  • 使用 StringBuilder 连接字符串:joining
  • 计算关于总和,最小值,最大值或平均值等元素的摘要信息;
    • 求和 counting() collectingAndThen
    • 汇总 summarizingDouble summingDouble
    • 最大值、最小值 maxBy minBy
    • 平均值 averagingDouble averagingInt
  • 计算“数据透视表”摘要,例如“卖方最大价值交易”等。
    • 分组 groupingBy
    • 分割 partitioningBy

例子:

// 将用户姓名归约成一个集合
List<String> list = people.stream().map(Person::getName).collect(Collectors.toList());
 
// 把用户姓名归约到 TreeSet 集中
Set<String> set = people.stream().map(Person::getName).collect(Collectors.toCollection(TreeSet::new));

// 将对象转换为字符串并将它们用逗号分隔连接起来
String joined = things.stream()
        .map(Object::toString)
        .collect(Collectors.joining(", "));

// 计算输入员工的工资总和 
int total = employees.stream()
                .collect(Collectors.summingInt(Employee::getSalary)));

// 输入员工按部门分组
Map<Department, List<Employee>> byDept = employees.stream()
            .collect(Collectors.groupingBy(Employee::getDepartment));

// 按部门分组计算工资总额 
Map<Department, Integer> totalByDept = employees.stream()
        .collect(Collectors.groupingBy(Employee::getDepartment, Collectors.summingInt(Employee::getSalary)));

// 把学生分成及格和不及格数组
Map<Boolean, List<Student>> passingFailing = students.stream()
                .collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD));

  1. 多核 CPU 的每个处理器内核都有独立的高速缓存。加锁需要这些高速缓存同步运行,然而这又需要在内核间进行较慢的缓存一致性协议通信。 ↩︎