流的创建

空流

如果创建空流,则应使用 Stream.empty() 方法。

通常情况下创建空流的目的是避免返回 null:

public Stream<String> streamOf(List<String> list) {
    return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}

从数组或集合创建流

我们可以从数组使用 stream() 或者 of() 方法来创建流:

public static <T> Stream<T> stream(T[] array)

返回以指定数组作为源的顺序Stream。

static <T> Stream<T> of(T... values)

返回其元素为指定值的顺序有序流。

Stream 的 of 方法内部还是使用 Arrays stream 方法

String[] arr = new String[]{"a", "b", "c"};
Stream<String> stream = Arrays.stream(arr);
stream = Stream.of("a", "b", "c");

在集合的接口上 java8 添加了默认的 stream() 方法允许使用任何集合作为元素源创建 Stream :

Stream<String> stream = list.stream();

如果想要多个流合并成单个流,可以使用流的 concat 方法:

可以在里面嵌套使用方法,但是注意 java8 的流不能复用。

Stream<Object> concatStream = Stream.concat(Stream.concat(collection1.stream(), collection2.stream()), collection3.stram());

静态工厂方法创建流

static IntStream range(int startInclusive, int endExclusive);

从 startInclusive(包括)到 endExclusive(不包括)的递增步长返回顺序排序的 IntStream。

static <T> Stream<T> iterate(T seed, UnaryOperator<T> f);

返回通过将函数 f 迭代应用于初始元素种子而生成的无限顺序有序流,生成由种子,f(seed),f(f(seed)) 等组成的流。

构建器、生成器、迭代器生成流

  • 构建器

    使用构建器时,应在语句的右侧部分另外指定所需类型,否则 build() 方法将创建 Stream 的实例:

    Stream<String> streamBuilder =
      Stream.<String>builder().add("a").add("b").add("c").build();
    
  • 生成器 - 可用于生成无限流

    generate() 方法接受 Supplier 以生成元素。由于结果流是无限的,开发人员应指定所需的大小,否则 generate() 方法将一直有效,直到达到内存限制:

    Stream<String> streamGenerated =
      Stream.generate(() -> "element").limit(10);
    
  • 迭代器 - 可用于生成无限流

    iterate() 方法有两个参数:种子和函数。 种子是流的第一个元素。通过将函数应用于第一元素来生成第二元素。通过在第二个元素上应用函数来生成第三个元素。

    因此,元素是:seed, f(seed), f(f(seed)), f(f(f(seed)))....

    public class Main {
      public static void main(String[] args) {
        Stream.iterate(2L, n  ->  n  + 1)
        .filter(Main::isOdd)
        .limit(5)
        .forEach(System.out::printf);
      }
      public static boolean isOdd(long number) {
        if (number % 2 == 0) {
          return false;
        }
        return true;
      }
    }
    

    代码返回以下执行结果:

    3 5 7 9 11

创建无尽流

我们已经知道了流可以分为中间和终止操作。我们也可以利用流的惰性话来创建一个无尽流。 我们创建一个从 0 开始每次加 2 的无限流,在调用终止操作前限制它的长度。 **在调用终止操作前执行 limit 操作是至关重要的。**不然程序将无限制的运行:

// given
Stream<Integer> infiniteStream = Stream.iterate(0, i -> i + 2);

// when
List<Integer> collect = infiniteStream
  .limit(10)
  .collect(Collectors.toList());

// then
assertEquals(collect, Arrays.asList(0, 2, 4, 6, 8, 10, 12, 14, 16, 18));

我们使用 iterate() 方法创建一个无限流。然后调用 limit() 转换和 collect() 终止操作。由于 Stream 的惰性,我们将拥有无限序列的前 10 个元素组成的集合。

创建自定义元素的无限流

假设我们想创建一个无限的随机 UUID 流。 使用 Stream API 实现此目的的第一步是创建这些随机值的供应商(Supplier):

Supplier<UUID> randomUUIDSupplier = UUID::randomUUID;

当我们定义好供应商时,我们可以使用 generate() 方法创建无限流:

Stream<UUID> infiniteStreamOfRandomUUID = Stream.generate(randomUUIDSupplier);

然后我们可以从流中获取一些元素。如果你希望程序在有限时间内完成,请记得使用 limit() 方法:

List<UUID> randomInts = infiniteStreamOfRandomUUID
.skip(10)
.limit(10)
.collect(Collectors.toList());

我们使用 skip() 转换来丢弃前 10 个结果并采用接下来的 10 个元素。我们可以通过将 Supplier 接口的函数传递给 Stream 上的 generate() 方法来创建任何自定义类型元素的无限流。

字符串流

String 也可以用作创建流的源。

借助 String 类的 chars() 方法。由于 JDK 中没有接口 CharStream,因此 IntStream 用于表示字符流。

IntStream streamOfChars = "abc".chars();

以下示例根据指定的 RegEx 将 String 拆分为子字符串:

Stream<String> streamOfString =
  Pattern.compile(", ").splitAsStream("a, b, c");

文件流

Java NIO 类 Files 允许通过 lines() 方法生成文本文件的 Stream 。文本的每一行都成为流的一个元素:

Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset = 
  Files.lines(path, Charset.forName("UTF-8"));

可以将 Charset 指定为 lines() 方法的参数。

Java 8 流不能重用。

流管道,是源流,中间操作和终止操作组成的链。

每个流只能使用一个终止操作。

中间操作是惰性的。这意味着只有在执行终止操作时才会调用它们。

减少流大小的中间操作应放在应用于每个元素的操作之前。因此,在流管道的顶部保留skip(),filter(),distinct() 等方法。

流的归约

API 有许多终止操作,它们将流聚合到类型或原始类型,例如 count()max()min()sum(),但这些操作根据预定义的实现工作。如果开发人员需要定制 Stream 的归约机制呢?有两种方法可以实现这一点 - reduce()collect() 方法。

reduce 方法

这种方法有三种变体,它们的传参和返回类型不同。它们可以具有以下参数:

identity(标识) - 累加器的初始值或者如果流为空且没有任何可累积的默认值;

accumulator(累加器) - 一个指定元素聚合逻辑的函数。累加器为每个减少步骤创建一个新值。

combiner(结合器) - 聚合累加器结果的函数。仅在并行模式下从不同线程归约累加器结果。

int reducedParams = Stream.of(1, 2, 3)
  .reduce(10, (a, b) -> a + b, (a, b) -> {
     log.info("combiner was called");
     return a + b;
  });

相当于:

U result = identity;
for (T element : this stream)
    result = accumulator.apply(result, element)
return result;

结果与未使用结合器中的结果相同,都为 16 (10 + 1 + 2 +3),并且没有日志,这意味着没有调用该组合器。要使组合器工作,流应该是并行的:

int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
    .reduce(10, (a, b) -> a + b, (a, b) -> {
       log.info("combiner was called");
       return a + b;
    });

这里的结果是不同的(36),并且组合器被调用两次。操作并行进行。首先(10 + 1 = 11; 10 + 2 = 12; 10 + 3 = 13;)。现在组合器可以合并这三个结果。它需要两次迭代(12 + 13 = 25; 25 + 11 = 36)。

collect 方法

参考 java8-stream 学习

Collector toMap 方法,可以允许将流转换为 Map 类型,并提供了 key 相同时的合并方法:

toMap(keyMapper, valueMapper, mergeFunction)

并行流

在 Java 8 之前,并行化很复杂。 ExecutorServiceForkJoin 的出现简化了开发人员的生活,但是他们仍然应该记住如何创建一个特定的执行器,如何运行它,使用 fork/join 框架,您必须指定问题的细分方式(分区)等等。Java 8 引入了一种函数风格的新方式来实现并行机制。

在使用集合的应用程序中实现并行性的一个难点是集合不是线程安全的,这意味着多线程无法在不引入线程干扰内存一致性错误的情况下操作集合。 Collections 框架提供了同步包装器,它将自动同步添加到任意集合,使其成为线程安全的。 但是,同步会引入线程争用。 您希望避免线程争用,因为它会阻止线程并行运行。 聚合操作和并行流使您可以实现与非线程安全集合的并行性,前提是您在操作集合时不要修改集合。

API 允许创建并行流,以并行模式执行操作。当流的源是 Collection 或数组时,可以在parallelStream() 方法的帮助下实现:

Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection
  .map(product -> product.getPrice() * 12)
  .anyMatch(price -> price > 200);

如果流的源不同于 Collection 或数组,则应使用 parallel() 方法:

IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();

内部,Stream API 自动使用 ForkJoin 框架并行执行操作。默认情况下,将使用公共线程池,并且没有办法(至少现在)为它分配一些自定义线程池。

并行流内部使用了默认的 ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由 Runtime.getRuntime().availableProcessors() 得到的。 但是你可以通过系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 来改变线程池大小,如下所示: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12"); 这是一个全局设置,因此它将印象代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让 ForkJoinPool 的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议不要修改它。

在并行模式下使用流时,避免阻塞操作并在任务需要相同的执行时间时使用并行模式(如果一个任务比另一个任务持续时间长,则可能会减慢整个应用程序的工作流程)。

可以使用 sequential() 方法将并行模式的流转换回顺序模式:

IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel();

⚠️ 并行流的顺序是由流来源的 Spliterator 的属性决定的,比如 ListLinkedHashSet 实现的 Spliterator 就包含 Spliterator.ORDERED 属性,我们可以通过调用 Stream.unordered() 无序处理以加快并行速度。

Stream forEach 在并行流下不保证顺序,但是我们可以使用 forEachOrdered 使具有顺序的流顺序输出。

新的流式写法

Do-While

假设在我们的代码里有这样一个简单的 do..while 循环:

int i = 0;
while (i < 10) {
    System.out.println(i);
    i++;
}

当我们想要实现类似于标准 do-while 循环的功能时,我们需要使用 limit() 方法:

Stream<Integer> integers = Stream
  .iterate(0, i -> i + 1);
integers
  .limit(10)
  .forEach(System.out::println);

我们用较少的代码实现了相同功能,但是使用 limit() 函数并没有 doWhile 那样容易阅读。

if/else

在 forEach() 中使用 if/else

首先,让我们创建一个 Integer 数组,然后在 Integer 流的 forEach() 方法中使用传统的 if / else 逻辑:

 List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

 ints.stream()
    .forEach(i -> {
        if (i.intValue() % 2 == 0) {
            Assert.assertTrue(i.intValue() % 2 == 0);
        } else {
            Assert.assertTrue(i.intValue() % 2 != 0);
        }
    });

我们的 forEach 方法包含 if-else 逻辑,它使用 Java 模运算符验证 Integer 是奇数还是偶数。

使用 filter 作为 if/else

其次,让我们看一下使用 Stream filter() 方法的更优雅的实现:

Stream<Integer> evenIntegers = ints.stream()
    .filter(i -> i.intValue() % 2 == 0);
Stream<Integer> oddIntegers = ints.stream()
    .filter(i -> i.intValue() % 2 != 0);
 
evenIntegers.forEach(i -> Assert.assertTrue(i.intValue() % 2 == 0));
oddIntegers.forEach(i -> Assert.assertTrue(i.intValue() % 2 != 0));

上面我们使用 Stream filter() 方法实现了 if / else 逻辑,将整数列表分成两个 Streams,一个用于偶数整数,另一个用于奇数整数。

使用 partitioningBy 分割为两部分

我们也可以使用约减操作的 partitioningBy 分割将结果分割成两部分:

Map<Boolean, List<Integer>> integerMap = 
  ints.stream().collect(Collectors.partitioningBy(i -> i.intValue() % 2 == 0);

这样我们会得到 true 的 偶数数组和 false 的奇数数组。