Почему filter () после flatMap ()" не полностью " ленив в потоках Java?


у меня есть следующий код:

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);

вывод выглядит следующим образом:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

отсюда я вижу, что в первом случае stream действительно ведет себя лениво - мы используем findFirst() поэтому, как только у нас есть первый элемент, наша фильтрация лямбда не вызывается. Однако, во втором случае, который использует flatMaps мы видим, что несмотря на то, что первый элемент, который выполняет условие фильтра, найден (это просто любой первый элемент, поскольку лямбда всегда возвращает true), дальнейшее содержимое потока все еще подается через функцию фильтрации.

Я пытаюсь понять, почему он ведет себя так, а не сдается после того, как первый элемент вычисляется, как в первом случае. Любая полезная информация будет оценена.

6 63

6 ответов:

TL; DR, это было рассмотрено в JDK-8075939 и исправлено в Java 10.

при рассмотрении реализации (ReferencePipeline.java) мы видим метод [ссылке]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

который будет вызывать для findFirst операции. Особенная вещь, чтобы заботиться о это sink.cancellationRequested() что позволяет завершить цикл на первом матче. Сравните с [ссылке]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}

метод для продвижения одного элемента заканчивается вызовом forEach на подпотоке без какой-либо возможности для более раннего завершения и комментария в начале flatMap метод даже говорит об этой отсутствующей функции.

поскольку это больше, чем просто оптимизация, поскольку это означает, что код просто ломается, когда подпоток бесконечен, я надеюсь, что разработчики скоро докажут, что они "могут сделать лучше, чем это"...


чтобы проиллюстрировать последствия, в то время как Stream.iterate(0, i->i+1).findFirst() работает, как ожидалось, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() будет в конечном итоге в бесконечный цикл.

Что касается спецификации, большинство из них можно найти в

глава" потоковые операции и трубопроводы " спецификации пакета:

...

промежуточные операции возвращают новый поток. Они всегда лень;

...

... Лень также позволяет избежать изучения всех данных, когда это не нужно; для операций например, "найти первую строку длиной более 1000 символов", необходимо только изучить достаточно строк, чтобы найти ту, которая имеет желаемые характеристики, не изучая все строки, доступные из источника. (Это поведение становится еще более важным, когда входной поток бесконечен, а не просто велик.)

...

далее, некоторые операции считаются короткое замыкание операции. Промежуточной операцией является короткое замыкание, если оно представлено с бесконечным входом, может привести к конечному потоку в результате. Терминальная операция является коротким замыканием, если при представлении с бесконечным входом она может завершиться за конечное время. Наличие короткого замыкания работы в трубопроводе является необходимым, но не достаточным условием для переработки бесконечного потока завершается за конечное время.

ясно, что операция короткого замыкания не гарантирует конечного времени завершение, например, когда фильтр не соответствует какому-либо элементу, обработка не может быть завершена, но реализация, которая не поддерживает никакого завершения в конечное время, просто игнорируя характер короткого замыкания операции, далека от спецификации.

элементы входного потока лениво потребляются один за другим. Первый элемент 1, преобразуется двумя flatMap s в поток -1, 0, 1, 0, 1, 2, 1, 2, 3, Так что весь поток соответствует только первому входному элементу. Вложенные потоки охотно материализуются трубопроводом, затем сплющиваются, а затем подаются в

что касается обрыва с бесконечными подпотоками, поведение flatMap становится еще более удивительным, когда один бросает в средний (в отличие от терминала) короткое замыкание.

в то время как следующие работы, как и ожидалось, печать бесконечной последовательности целых чисел

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println);

следующий код выводит только "1", но до сих пор не завершить:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println);

Я не могу представить себе чтение спецификация, в которой это не было ошибкой.

В свободное StreamEx библиотека я представил короткозамыкающие коллекторы. При сборе последовательного потока с короткозамыкающим коллектором (например MoreCollectors.first()) ровно один элемент потребляется из источника. Внутренне это реализовано довольно грязным способом: используя пользовательское исключение, чтобы сломать поток управления. Используя мою библиотеку, ваш образец можно переписать следующим образом:

System.out.println(
        "Result: " +
                StreamEx.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .collect(MoreCollectors.first())
                .get()
        );

результат следующий:

-1
Result: -1

Я согласен с другими людьми, это ошибка открыт в JDK-8075939. И так как это все еще не исправлено более чем через год. Я хотел бы рекомендовать вам: AbacusUtil

N.println("Result: " + Stream.of(1, 2, 3).peek(N::println).first().get());

N.println("-----------");

N.println("Result: " + Stream.of(1, 2, 3)
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .peek(N::println).first().get());

// output:
// 1
// Result: 1
// -----------
// -1
// Result: -1

раскрытие информации: я разработчик AbacusUtil.

к сожалению .flatMap() не лень. Тем не менее, пользовательская flatMap обходной путь доступен здесь:почему .flatMap () настолько неэффективен (не ленив) в java 8 и java 9