Stream 流系列

参考:

学习研究心得体会

为何要提出流的概念?

Lambda 表达式使流的实现成为了可能。

在Stream以前,我们查询一个集合中的某个或某类元素,我们会使用常规for循环,再在循环体内,进行if、else判断,依次或多次,最终得到我们的结果。

int longest = 0;
for(String str : strings){
    if(str.startsWith("A")){// 1. filter(), 保留以A开头的字符串
        int len = str.length();// 2. mapToInt(), 转换成长度
        longest = Math.max(len, longest);// 3. max(), 保留最长的长度
    }
}

使用流后:

int longestStringLengthStartingWithA
        = strings.stream()
              .filter(s -> s.startsWith("A"))
              .mapToInt(String::length)
              .max();

前后对比可以看出:

  • filter 对应 if 判断

  • mapToInt 对应 str.length

  • max 对应 Max.max

  • 每一个操作,都会产生一个流,链式调用产生一个双向链表流。

  • 流的每一步操作实际就是将原本的功能代码,转为Lambda表达式,传递给流,让流来帮我们进行计算,并返回相应的结果。

效果图:

图片.png
Figure 1. 图片.png

结论(个人看法):在一堆数据中,进行筛选或其他操作,可多步,最终得到想要的结果。而每一步操作都可视为模板代码,例如if判断,去重这些。而流的创建就是为了简化、消除这些模板代码。

一个完整的链路流包含3个部分:源头 —> 处理过程(多个) —> 结果

Stream 和 Pipeline 的关系:

Pipleline为Stream的具体实现,一个完整的链路流,包含多个Pipleline,也即多个Stream组成。

实例分析

需求:从一群学生中找到符合要求的学生名字集合

    public static void main(String[] args) {
        List<Student> objects = new ArrayList<>();
    // 学号id、年龄age、性别sex、姓名name
        objects.add(new Student(2, 15, true, "小明"));
        objects.add(new Student(1, 16, false, "小红"));
        objects.add(new Student(5, 15, false, "小芳"));
        objects.add(new Student(4, 16, false, "小花"));
        objects.add(new Student(6, 15, false, "小爱"));
        objects.add(new Student(3, 18, true, "小新"));

        Stream<Student> stream1 = objects.stream();
        Stream<Student> stream2 = stream1.skip(2);
        Stream<Student> stream3 = stream2.filter(e -> e.getAge() > 15);
        Stream<Student> stream4 = stream3.sorted(Comparator.comparing(Student::getAge));
        Stream<String> stream5 = stream4.map(Student::getName);
        Stream<String> stream6 = stream5.distinct();
        List<String> collect = stream6.collect(Collectors.toList());
    }

这里没有采用链式调用,而是常规定义出各个操作时的流,便于debug查看元素组成。

如何生成双向链表流:

  1. 当debug走过stream1流时,查看源码可知,底层创建了一个ReferencePipelin$Head对象流:

图片.png
Figure 2. 图片.png
  1. 当debug走过stream2流时,查看源码可知,底层使用SliceOps创建了一个ReferencePipeline.StatelessOp流:

图片.png
Figure 3. 图片.png
  1. 当debug走过stream3流时,底层创建了一个ReferencePipeline.StatefulOp流

图片.png
Figure 4. 图片.png
  1. 当debug走过stream4流时,查看源码可知,底层使用SortedOps创建了一个ReferencePipeline.StatelessOp流:

图片.png
Figure 5. 图片.png
  1. 当debug走过stream5流时,查看源码可知,底层使用SortedOps创建了一个ReferencePipeline.StatefulOp流:

图片.png
Figure 6. 图片.png
  1. 当debug走过stream6流时,查看源码可知,底层使用SortedOps创建了一个ReferencePipeline.StatelessOp流:

图片.png
Figure 7. 图片.png
  1. 当debug走过collect时:

图片.png
Figure 8. 图片.png

每走一步,可以单步进入方法内部,会发现每一个stage操作中,只包含创建对象。在collect之前,完全是在创建各种流,并且流承上启下,其中关键字段为previousStage、nextStage、depth以及linkedOrConsumed。当使用上一个流产生新的流时,就会将上一个流赋值给下一个流的previousStage,并且linkedOrConsued设置为true,可无限调用,直到一个结束动作(例如collect)。

  • 其结构就是一个双向链表,previousStage和nextStage连接前后流,Head只有nextStage,无previousStage。

  • depth标示这个元素处于index位置(从0开始)。在结束操作中,会根据这个depth循环遍历每一个节点。

  • linkedOrConsumed,查看其源码,可知如果流被引用后,这个流就不能再次使用了。原因很简单,这个流被使用了,处于某个链表结构中,如果再被其他的流引用,可能会导致流的改变,从而改变原来链表的结果,因此,流一旦被使用就不能被修改了。

最终结构如下: 图片.png

如何处理数据

目前为止,我们知道了,一个完整的流包含3个过程,Head流、中间操作stage(多个)以及一个结束操作,并且创建流链表没做任何业务处理,可知结束操作一定会通过创建的链表流来进行一系列的操作,下面就具体分析下`collect`,看看这个方法做了什么?

先看参数:Collectors.toList()

    public static <T>
    Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_ID);
    }

创建了一个CollectorImpl对象,有参构造器由四个参数组成:

  • Supplier<A> supplier

  • BiConsumer<A, T> accumulator

  • BinaryOperator<A> combiner

  • Set<Characteristics> characteristics

接着进入collect方法中:

    @Override
    @SuppressWarnings("unchecked")
    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        }
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }
  • 这里 collector 参数即为Collectors.toList()创建的对象,由此可知泛型参数 ?super P_OUT 指元素类型(这里时String),A 代表一个List,R代表

  • A container:

  • 第一个if:当时平行流时,处理。 todo:如何处理?

  • else: 串行处理流,本例就是这里进行处理,可知调用了一个evaluate方法。

  • return: todo

ReduceOps.makeRef(collector):

实质创建了 ReduceOp 匿名对象:

    public static <T, I> TerminalOp<T, I>
    makeRef(Collector<? super T, I, ?> collector) {
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        BiConsumer<I, ? super T> accumulator = collector.accumulator();
        BinaryOperator<I> combiner = collector.combiner();
        class ReducingSink extends Box<I>
                implements AccumulatingSink<T, I, ReducingSink> {
            @Override
            public void begin(long size) {
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }

            @Override
            public int getOpFlags() {
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                       ? StreamOpFlag.NOT_ORDERED
                       : 0;
            }
        };
    }
  • 这里创建了一个局部内部类ReducingSink,继承Box<I>,这个Box要留意一下。

  • return 了一个 ReduceOp 匿名对象,其中从写了makeSink方法,返回了一个局部内部内对象。

evaluate 方法:

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
  • 这里参数 terminalOp 也即上一步创建的 ReduceOp 匿名对象

  • assert 判断这个流的 StreamShape 一定要等于 terminalOp 对象里面的 StreamShape,需要保持一致。

  • if 判断,同理,如果这个流被使用了,就不能再次使用了。这里的流指的时 collect 操作前一个流。

  • isParallel(): 平行流还是串行流,也即多线程还是单线程处理。

  • 这里是单线程,terminalOp.evaluateSequential

继续 terminalOp.evaluateSequential

        @Override
        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }
  • 这里有个关键 makeSink(), 也即调用了前面 ReduceOp 匿名对象的方法,返回一个 ReducingSink 对象。

  • get(): 会等 helper.wrapAndCopyInto 执行完后取值

进入 helper.wrapAndCopyInto

    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
  • 重点 :wrapSink(Objects.requireNonNull(sink))

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
    // 这里就用到开头所属的depth作用。
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

最终的sink对象结构:

图片.png
Figure 9. 图片.png

这里通过debug,依次遍历,可知创建顺序,现实创建了 1 标注点的downStream,然后赋值给sink,然后将sink传递给下一个,创建了 2 标注点的downStream,再次赋值给sink,依次往前套娃,直到depth等于0,便形成这样一个 sink 链。这是很重要的一个操作。如下图,红色箭头表示创建wrapperSink的顺序,并且容易理解到,从下到上遍历创建,最终形成上一图 1/2/3/4/5/6 这种结构,最下层的sink在最后。

图片.png
Figure 10. 图片.png

Sink的作用:等价于每个流的操作,例如skip、sorted、map等等,形成wrapperSink这样嵌套结构,就是为了遍历元素,让元素按照sink链进行筛选,最终得到想要的结果。由此可知,以上所有操作都不涉及处理元素。

真正处理元素的是 copyInto 方法:

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
  • wrappedSink : 就是处理元素的层层条件

  • spliterator : 也即待处理的元素

由此可知,前面的所有操作都是为了得到一个执行链 wrappedSink。 这里会进入if代码块中,只分析if。

  1. wrappedSink.begin(spliterator.getExactSizeIfKnown());

通过debug进入begin,会发现依次调用了 stream2#Sink、stream3#sink、stream4#sink 就停止了(绿色箭头)。stream4 执行 sorted,查看源码会发现stream4#sink创建了一个list,便停止继续调用 downStream#begin 方法了。

图片.png
Figure 11. 图片.png
  1. spliterator.forEachRemaining(wrappedSink);

// 这里只分析核心代码。从这可以看出,开始遍历处理元素了。
for (; i < hi; ++i) {
   @SuppressWarnings("unchecked") E e = (E) a[i];
   action.accept(e);
}

同理通过源码分析可知,第一个元素 e, 调用第一个accept,发现这个是skip,e不符合规则,不操作,继续遍历第二个元素,同理,无法通过第一个 accept,直到第3个元素,可以通过第一个accept,然后进入第二个accept,第二个accept判断是否符合filter的Lambda表达式,不通过,不操作,通过则进入第3个accept,第三个accept,只做了一件事,就是把通过的元素e,放入begin链调用时创建的list中,因此可以知道此行代码进行的操作,是循环过滤符合第一个accept以及第二个accept的元素,并放入第三个accept中的list。

图片.png
Figure 12. 图片.png
  1. wrappedSink.end();

通过源码发现,当执行到第3个end时,也即 sorted sink 中的end,进行了list排序,也即

        @Override
        public void end() {
            list.sort(comparator);
            downstream.begin(list.size());
            if (!cancellationWasRequested) {
                list.forEach(downstream::accept);
            }
            else {
                for (T t : list) {
                    if (downstream.cancellationRequested()) break;
                    downstream.accept(t);
                }
            }
            downstream.end();
            list = null;
        }
  • 第一行代码,进行了排序。也即这个 sink 所要做的事

  • 第二行,又开始调用begin了。源码可知,此begin就是第一次begin执行中止后的begin,值得注意的是,进入distinct#sink#begin中,创建了一个HashSet对象,同上面创建list一样,这里通过set来进行distinct操作,最后一个begin则设置了如何获取目标元素。

  • if/else 又开始调用剩余accept函数,依次进行元素操作。可以看出,这里的循环对象是,上一次产生的list,已经执行过前3个accept操作了。

  • accept 执行完成后,又开始执行剩余的end操作。

  • 复制list = null,垃圾收集器回收,感悟:不用的对象,即使进行回收。

至此,整个执行流程应当如下:

图片.png
Figure 13. 图片.png

大致可以得出:

  • 依次调用begin链、accept链、end链。

  • 如果begin链发生中断,则调用accept链,在相同的sink中也会中断,然后执行end链,由end链再次发起 being链-accept链-end链流程。

  • 为什么会发生中断?这里以sorted进行说明。当遇到 sorted sink时,后面的sink操作必须等待排序完成后才能操作,也即 end 操作后。说简单点就是后面的操作必须依赖上一步操作执行完,才能正确处理。

  • 推论出,如果一个流中,包含多个中断操作流,那么一定会进行多个 being链-accept链-end链流程 ,而不是一个。

最后的最后,处理完成后,会通过最后一个sink的get操作获取目标对象,目标对象放在 Box 类中的 state 属性。

总结一下:

  1. 整个流为双向链表,直到一个结束流来进行处理。

  2. 通过将链表流中的 sink 操作包装起来,形成调用链。

  3. 然后将元素依次执行 sink 链的操作,每一个sink包含begin、accept、end操作,sink会出现中断和不中断2种情况,所以sink分2大类,也即对应流的stage分2大类, stagelessOp/stagefulOp。

  4. 最终将数据放入,最后生成的结束流中。

java.lang.stream 包介绍

流的相关类

图片.png
Figure 14. 图片.png

关键字:

  • stream:流,抽象接口

  • pipeline: 管道,也即一个流的具体实现,每个流对应响应流的操作。

  • ops:具体操作流,例如查找、匹配等

  • sink: 执行Lambda表达式的链路

分类:

图片.png
Figure 15. 图片.png
  • 从结构中,可知,核心类都是抽象类以及接口内,不能直接实例化对象,因此在很多代码内部都是创建匿名类来生成对应的对象。

  • 主要流接口由 Stream、LongStream、IntStream、DoubleStream

  • 每个流接口,都有一个抽象pipeline类实现。

  • 每个pipeline实现抽象类中,都有3个静态内部抽象类 Head、StagefulOp、StagefulOp,并且它们都继承外部的pipeline抽象类。

从 Stream 类开始

先知晓2个概念,从Stream接口中的方法的注释,会标注这个方法是属于

  • This is an intermediate operation. 这是一个中间操作,会产生一个新的流,其返回值必是一个流。(执行步骤)

  • This is a terminal operation. 这是一个终止操作,会结束整个流,其返回值不再是流,而是期望的结果。(结果)

中间操作有2个类型,stageless、stageful

  • StatelessOp: 表示这个操作会

  • StatefulOp:

图片.png
Figure 16. 图片.png

映射类(返回值均为流,表明是中间操作,其主要目的是将元素转换成目标类型。):

<R> Stream<R> map(Function<? super T, ? extends R> mapper);

IntStream mapToInt(ToIntFunction<? super T> mapper);

LongStream mapToLong(ToLongFunction<? super T> mapper);

DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);

LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);

DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);

筛选和切片:

Stream<T> filter(Predicate<? super T> predicate);

Stream<T> distinct();

Stream<T> sorted();

Stream<T> sorted(Comparator<? super T> comparator);

Stream<T> limit(long maxSize);

Stream<T> skip(long n);

Optional<T> min(Comparator<? super T> comparator);

Optional<T> max(Comparator<? super T> comparator);

<T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)

循环:

Stream<T> peek(Consumer<? super T> action);

void forEach(Consumer<? super T> action);

void forEachOrdered(Consumer<? super T> action);

void forEachOrdered(Consumer<? super T> action);

查找:

boolean anyMatch(Predicate<? super T> predicate);

boolean allMatch(Predicate<? super T> predicate);

boolean noneMatch(Predicate<? super T> predicate);

Optional<T> findFirst();

Optional<T> findAny();

收集、转为结果:

Object[] toArray();

<A> A[] toArray(IntFunction<A[]> generator);

T reduce(T identity, BinaryOperator<T> accumulator);

Optional<T> reduce(BinaryOperator<T> accumulator);

<U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);

<R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

<R, A> R collect(Collector<? super T, A, R> collector);