Stream 流系列
参考:
学习研究心得体会
为何要提出流的概念?
Lambda 表达式使流的实现成为了可能。
在Stream以前,我们查询一个集合中的某个或某类元素,我们会使用常规for循环,再在循环体内,进行if、else判断,依次或多次,最终得到我们的结果。
int longest = 0;
for(String str : strings){
if(str.startsWith("A")){// 1. filter(), 保留以A开头的字符串
int len = str.length();// 2. mapToInt(), 转换成长度
longest = Math.max(len, longest);// 3. max(), 保留最长的长度
}
}
使用流后:
int longestStringLengthStartingWithA
= strings.stream()
.filter(s -> s.startsWith("A"))
.mapToInt(String::length)
.max();
前后对比可以看出:
-
filter 对应 if 判断
-
mapToInt 对应 str.length
-
max 对应 Max.max
-
每一个操作,都会产生一个流,链式调用产生一个双向链表流。
-
流的每一步操作实际就是将原本的功能代码,转为Lambda表达式,传递给流,让流来帮我们进行计算,并返回相应的结果。
效果图:

结论(个人看法):在一堆数据中,进行筛选或其他操作,可多步,最终得到想要的结果。而每一步操作都可视为模板代码,例如if判断,去重这些。而流的创建就是为了简化、消除这些模板代码。
一个完整的链路流包含3个部分:源头 —> 处理过程(多个) —> 结果
Stream 和 Pipeline 的关系:
Pipleline为Stream的具体实现,一个完整的链路流,包含多个Pipleline,也即多个Stream组成。
实例分析
需求:从一群学生中找到符合要求的学生名字集合
public static void main(String[] args) {
List<Student> objects = new ArrayList<>();
// 学号id、年龄age、性别sex、姓名name
objects.add(new Student(2, 15, true, "小明"));
objects.add(new Student(1, 16, false, "小红"));
objects.add(new Student(5, 15, false, "小芳"));
objects.add(new Student(4, 16, false, "小花"));
objects.add(new Student(6, 15, false, "小爱"));
objects.add(new Student(3, 18, true, "小新"));
Stream<Student> stream1 = objects.stream();
Stream<Student> stream2 = stream1.skip(2);
Stream<Student> stream3 = stream2.filter(e -> e.getAge() > 15);
Stream<Student> stream4 = stream3.sorted(Comparator.comparing(Student::getAge));
Stream<String> stream5 = stream4.map(Student::getName);
Stream<String> stream6 = stream5.distinct();
List<String> collect = stream6.collect(Collectors.toList());
}
这里没有采用链式调用,而是常规定义出各个操作时的流,便于debug查看元素组成。
如何生成双向链表流:
-
当debug走过stream1流时,查看源码可知,底层创建了一个ReferencePipelin$Head对象流:

-
当debug走过stream2流时,查看源码可知,底层使用SliceOps创建了一个ReferencePipeline.StatelessOp流:

-
当debug走过stream3流时,底层创建了一个ReferencePipeline.StatefulOp流

-
当debug走过stream4流时,查看源码可知,底层使用SortedOps创建了一个ReferencePipeline.StatelessOp流:

-
当debug走过stream5流时,查看源码可知,底层使用SortedOps创建了一个ReferencePipeline.StatefulOp流:

-
当debug走过stream6流时,查看源码可知,底层使用SortedOps创建了一个ReferencePipeline.StatelessOp流:

-
当debug走过collect时:

每走一步,可以单步进入方法内部,会发现每一个stage操作中,只包含创建对象。在collect之前,完全是在创建各种流,并且流承上启下,其中关键字段为previousStage、nextStage、depth以及linkedOrConsumed。当使用上一个流产生新的流时,就会将上一个流赋值给下一个流的previousStage,并且linkedOrConsued设置为true,可无限调用,直到一个结束动作(例如collect)。
-
其结构就是一个双向链表,previousStage和nextStage连接前后流,Head只有nextStage,无previousStage。
-
depth标示这个元素处于index位置(从0开始)。在结束操作中,会根据这个depth循环遍历每一个节点。
-
linkedOrConsumed,查看其源码,可知如果流被引用后,这个流就不能再次使用了。原因很简单,这个流被使用了,处于某个链表结构中,如果再被其他的流引用,可能会导致流的改变,从而改变原来链表的结果,因此,流一旦被使用就不能被修改了。
最终结构如下:
如何处理数据
目前为止,我们知道了,一个完整的流包含3个过程,Head流、中间操作stage(多个)以及一个结束操作,并且创建流链表没做任何业务处理,可知结束操作一定会通过创建的链表流来进行一系列的操作,下面就具体分析下`collect`,看看这个方法做了什么?
先看参数:Collectors.toList()
:
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
创建了一个CollectorImpl对象,有参构造器由四个参数组成:
-
Supplier<A> supplier
-
BiConsumer<A, T> accumulator
-
BinaryOperator<A> combiner
-
Set<Characteristics> characteristics
接着进入collect方法中:
@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
-
这里 collector 参数即为Collectors.toList()创建的对象,由此可知泛型参数 ?super P_OUT 指元素类型(这里时String),A 代表一个List,R代表
-
A container:
-
第一个if:当时平行流时,处理。 todo:如何处理?
-
else: 串行处理流,本例就是这里进行处理,可知调用了一个evaluate方法。
-
return: todo
ReduceOps.makeRef(collector):
实质创建了 ReduceOp
匿名对象:
public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
-
这里创建了一个局部内部类ReducingSink,继承Box<I>,这个Box要留意一下。
-
return 了一个
ReduceOp
匿名对象,其中从写了makeSink方法,返回了一个局部内部内对象。
evaluate 方法:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
-
这里参数 terminalOp 也即上一步创建的
ReduceOp
匿名对象 -
assert 判断这个流的 StreamShape 一定要等于 terminalOp 对象里面的 StreamShape,需要保持一致。
-
if 判断,同理,如果这个流被使用了,就不能再次使用了。这里的流指的时 collect 操作前一个流。
-
isParallel(): 平行流还是串行流,也即多线程还是单线程处理。
-
这里是单线程,terminalOp.evaluateSequential
继续 terminalOp.evaluateSequential
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
-
这里有个关键 makeSink(), 也即调用了前面
ReduceOp
匿名对象的方法,返回一个 ReducingSink 对象。 -
get(): 会等
helper.wrapAndCopyInto
执行完后取值
进入 helper.wrapAndCopyInto
中:
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
-
重点 :
wrapSink(Objects.requireNonNull(sink))
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
// 这里就用到开头所属的depth作用。
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
最终的sink对象结构:

这里通过debug,依次遍历,可知创建顺序,现实创建了 1 标注点的downStream,然后赋值给sink,然后将sink传递给下一个,创建了 2 标注点的downStream,再次赋值给sink,依次往前套娃,直到depth等于0,便形成这样一个 sink 链。这是很重要的一个操作。如下图,红色箭头表示创建wrapperSink的顺序,并且容易理解到,从下到上遍历创建,最终形成上一图 1/2/3/4/5/6 这种结构,最下层的sink在最后。

Sink的作用:等价于每个流的操作,例如skip、sorted、map等等,形成wrapperSink这样嵌套结构,就是为了遍历元素,让元素按照sink链进行筛选,最终得到想要的结果。由此可知,以上所有操作都不涉及处理元素。
真正处理元素的是 copyInto
方法:
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
-
wrappedSink : 就是处理元素的层层条件
-
spliterator : 也即待处理的元素
由此可知,前面的所有操作都是为了得到一个执行链 wrappedSink。 这里会进入if代码块中,只分析if。
-
wrappedSink.begin(spliterator.getExactSizeIfKnown());
通过debug进入begin,会发现依次调用了 stream2#Sink、stream3#sink、stream4#sink 就停止了(绿色箭头)。stream4 执行 sorted,查看源码会发现stream4#sink创建了一个list,便停止继续调用 downStream#begin 方法了。

-
spliterator.forEachRemaining(wrappedSink);
// 这里只分析核心代码。从这可以看出,开始遍历处理元素了。
for (; i < hi; ++i) {
@SuppressWarnings("unchecked") E e = (E) a[i];
action.accept(e);
}
同理通过源码分析可知,第一个元素 e, 调用第一个accept,发现这个是skip,e不符合规则,不操作,继续遍历第二个元素,同理,无法通过第一个 accept,直到第3个元素,可以通过第一个accept,然后进入第二个accept,第二个accept判断是否符合filter的Lambda表达式,不通过,不操作,通过则进入第3个accept,第三个accept,只做了一件事,就是把通过的元素e,放入begin链调用时创建的list中,因此可以知道此行代码进行的操作,是循环过滤符合第一个accept以及第二个accept的元素,并放入第三个accept中的list。

-
wrappedSink.end();
通过源码发现,当执行到第3个end时,也即 sorted sink 中的end,进行了list排序,也即
@Override
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
list.forEach(downstream::accept);
}
else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}
-
第一行代码,进行了排序。也即这个 sink 所要做的事
-
第二行,又开始调用begin了。源码可知,此begin就是第一次begin执行中止后的begin,值得注意的是,进入distinct#sink#begin中,创建了一个HashSet对象,同上面创建list一样,这里通过set来进行distinct操作,最后一个begin则设置了如何获取目标元素。
-
if/else 又开始调用剩余accept函数,依次进行元素操作。可以看出,这里的循环对象是,上一次产生的list,已经执行过前3个accept操作了。
-
accept 执行完成后,又开始执行剩余的end操作。
-
复制list = null,垃圾收集器回收,感悟:不用的对象,即使进行回收。
至此,整个执行流程应当如下:

大致可以得出:
-
依次调用begin链、accept链、end链。
-
如果begin链发生中断,则调用accept链,在相同的sink中也会中断,然后执行end链,由end链再次发起 being链-accept链-end链流程。
-
为什么会发生中断?这里以sorted进行说明。当遇到 sorted sink时,后面的sink操作必须等待排序完成后才能操作,也即 end 操作后。说简单点就是后面的操作必须依赖上一步操作执行完,才能正确处理。
-
推论出,如果一个流中,包含多个中断操作流,那么一定会进行多个 being链-accept链-end链流程 ,而不是一个。
最后的最后,处理完成后,会通过最后一个sink的get操作获取目标对象,目标对象放在 Box 类中的 state 属性。
总结一下:
-
整个流为双向链表,直到一个结束流来进行处理。
-
通过将链表流中的 sink 操作包装起来,形成调用链。
-
然后将元素依次执行 sink 链的操作,每一个sink包含begin、accept、end操作,sink会出现中断和不中断2种情况,所以sink分2大类,也即对应流的stage分2大类, stagelessOp/stagefulOp。
-
最终将数据放入,最后生成的结束流中。
java.lang.stream 包介绍
流的相关类

关键字:
-
stream:流,抽象接口
-
pipeline: 管道,也即一个流的具体实现,每个流对应响应流的操作。
-
ops:具体操作流,例如查找、匹配等
-
sink: 执行Lambda表达式的链路
分类:

-
从结构中,可知,核心类都是抽象类以及接口内,不能直接实例化对象,因此在很多代码内部都是创建匿名类来生成对应的对象。
-
主要流接口由 Stream、LongStream、IntStream、DoubleStream
-
每个流接口,都有一个抽象pipeline类实现。
-
每个pipeline实现抽象类中,都有3个静态内部抽象类 Head、StagefulOp、StagefulOp,并且它们都继承外部的pipeline抽象类。
从 Stream 类开始
先知晓2个概念,从Stream接口中的方法的注释,会标注这个方法是属于
-
This is an
intermediate
operation. 这是一个中间操作,会产生一个新的流,其返回值必是一个流。(执行步骤) -
This is a
terminal
operation. 这是一个终止操作,会结束整个流,其返回值不再是流,而是期望的结果。(结果)
中间操作有2个类型,stageless、stageful
-
StatelessOp: 表示这个操作会
-
StatefulOp:

映射类(返回值均为流,表明是中间操作,其主要目的是将元素转换成目标类型。):
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);
筛选和切片:
Stream<T> filter(Predicate<? super T> predicate);
Stream<T> distinct();
Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
Stream<T> limit(long maxSize);
Stream<T> skip(long n);
Optional<T> min(Comparator<? super T> comparator);
Optional<T> max(Comparator<? super T> comparator);
<T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)
循环:
Stream<T> peek(Consumer<? super T> action);
void forEach(Consumer<? super T> action);
void forEachOrdered(Consumer<? super T> action);
void forEachOrdered(Consumer<? super T> action);
查找:
boolean anyMatch(Predicate<? super T> predicate);
boolean allMatch(Predicate<? super T> predicate);
boolean noneMatch(Predicate<? super T> predicate);
Optional<T> findFirst();
Optional<T> findAny();
收集、转为结果:
Object[] toArray();
<A> A[] toArray(IntFunction<A[]> generator);
T reduce(T identity, BinaryOperator<T> accumulator);
Optional<T> reduce(BinaryOperator<T> accumulator);
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
<R, A> R collect(Collector<? super T, A, R> collector);