Вы можете разделить поток на два потока?
у меня есть набор данных, представленный потоком Java 8:
Stream<T> stream = ...;
Я вижу, как отфильтровать его, чтобы получить случайное подмножество - например
Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));
Я также вижу, как я мог бы уменьшить этот поток, чтобы получить, например, два списка, представляющие две случайные половины набора данных, а затем превратить их обратно в потоки. Но есть ли прямой способ генерировать два потока из начального? Что-то вроде
(heads, tails) = stream.[some kind of split based on filter]
Спасибо за любое понимание.
9 ответов:
не совсем. Вы не можете получить два
Stream
s из одного; это не имеет смысла - как бы вы перебирали один без необходимости генерировать другой в то же время? Потоком можно управлять только один раз.однако, если вы хотите сбросить их в списке или что-то, вы могли бы сделать
stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
A коллектор можно использовать для этого.
- для двух категорий используйте
Collectors.partitioningBy()
фабрики.это создаст
Map
СBoolean
доList
, и поместить элементы в один или другой список на основеPredicate
.Примечание: поскольку поток должен потребляться целиком, это не может работать на бесконечных потоках. Поскольку поток потребляется в любом случае, этот метод просто помещает их в списки вместо создания нового поток-с-памятью.
кроме того, нет необходимости в итераторе, даже в головах-только пример, который вы предоставили.
Random r = new Random(); Map<Boolean, List<String>> groups = stream .collect(Collectors.partitioningBy(x -> r.nextBoolean())); System.out.println(groups.get(false).size()); System.out.println(groups.get(true).size());
- для большего количества категорий используйте
Collectors.groupingBy()
фабрики.Map<Object, List<String>> groups = stream .collect(Collectors.groupingBy(x -> r.nextInt(3))); System.out.println(groups.get(0).size()); System.out.println(groups.get(1).size()); System.out.println(groups.get(2).size());
в случае, если потоки не являются
Stream
, но один из примитивных потоков вродеIntStream
, то этот.collect(Collectors)
метод недоступен. Вам придется сделать это вручную, без фабрики коллектора. Это реализация выглядит так это:IntStream intStream = IntStream.iterate(0, i -> i + 1).limit(1000000); Predicate<Integer> p = x -> r.nextBoolean(); Map<Boolean, List<Integer>> groups = intStream.collect(() -> { Map<Boolean, List<Integer>> map = new HashMap<>(); map.put(false, new ArrayList<>()); map.put(true, new ArrayList<>()); return map; }, (map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }, (map1, map2) -> { map1.get(false).addAll(map2.get(false)); map1.get(true).addAll(map2.get(true)); }); System.out.println(groups.get(false).size()); System.out.println(groups.get(true).size());
Edit
как уже отмечалось, выше "обходной путь" не является потокобезопасным. Преобразование в нормальный
Stream
перед сбором это путь:Stream<Integer> stream = intStream.boxed();
к сожалению, то, что вы просите, прямо нахмурилось в JavaDoc потока:
поток должен работать (вызывая промежуточный или терминал потоковая операция) только один раз. Это исключает, например, " раздвоение" потоки, где один и тот же источник питает два или более трубопроводов, или несколько обходов одного и того же потока.
Вы можете обойти это с помощью
peek
или другие методы, Если вы действительно хотите, чтобы этот тип о поведении. В этом случае вместо того, чтобы пытаться поддерживать два потока из одного и того же исходного источника потока с помощью фильтра разветвления, вы должны дублировать свой поток и фильтровать каждый из дубликатов соответствующим образом.однако, вы можете пересмотреть, если a
Stream
является подходящей структурой для вашего случая.
я наткнулся на этот вопрос к себе, и я чувствую, что раздвоенный поток имеет некоторые случаи использования, которые могут оказаться действительными. Я написал код ниже в качестве потребителя, чтобы он ничего не делал, но вы могли бы применить его к функциям и всему остальному, с чем вы можете столкнуться.
class PredicateSplitterConsumer<T> implements Consumer<T> { private Predicate<T> predicate; private Consumer<T> positiveConsumer; private Consumer<T> negativeConsumer; public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative) { this.predicate = predicate; this.positiveConsumer = positive; this.negativeConsumer = negative; } @Override public void accept(T t) { if (predicate.test(t)) { positiveConsumer.accept(t); } else { negativeConsumer.accept(t); } } }
теперь ваша реализация кода может быть примерно такой:
personsArray.forEach( new PredicateSplitterConsumer<>( person -> person.getDateOfBirth().isPresent(), person -> System.out.println(person.getName()), person -> System.out.println(person.getName() + " does not have Date of birth")));
это противоречит общему механизму потока. Скажем, вы можете разделить поток S0 на Sa и Sb, как вы хотели. Выполнение любой терминальной операции, скажем
count()
, на Sa обязательно будут "потреблять" все элементы в S0. Поэтому Sb потерял свой источник данных.ранее поток имел
tee()
метод, я думаю, который дублирует поток на два. Теперь он удален.Stream имеет метод peek (), хотя вы можете использовать его для достижения своих требований.
не совсем, но вы можете быть в состоянии выполнить то, что вам нужно, вызывая
Collectors.groupingBy()
. вы создаете новую коллекцию, а затем можете создавать экземпляры потоков в этой новой коллекции.
Это был наименее плохой ответ, который я мог придумать.
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; public class Test { public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate, Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) { Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate)); L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream()); R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream()); return new ImmutablePair<L, R>(trueResult, falseResult); } public static void main(String[] args) { Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10); Pair<List<Integer>, String> results = splitStream(stream, n -> n > 5, s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()), s -> s.map(n -> n.toString()).collect(Collectors.joining("|"))); System.out.println(results); } }
это занимает поток целых чисел и разбивает их на 5. Для тех, кто больше 5 он фильтрует только четные числа и помещает их в список. В остальном он присоединяется к ним с |.
выходы:
([6, 8],0|1|2|3|4|5)
его не идеально, так как он собирает все в промежуточные коллекции, разбивая поток (и имеет слишком много аргументов!)
я наткнулся на этот вопрос, ища способ фильтровать определенные элементы из потока и регистрировать их как ошибки. Поэтому мне не нужно было разбивать поток настолько, чтобы прикрепить преждевременное завершающее действие к предикату с ненавязчивым синтаксисом. Вот что я придумал:
public class MyProcess { /* Return a Predicate that performs a bail-out action on non-matching items. */ private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) { return x -> { if (pred.test(x)) { return true; } altAction.accept(x); return false; }; /* Example usage in non-trivial pipeline */ public void processItems(Stream<Item> stream) { stream.filter(Objects::nonNull) .peek(this::logItem) .map(Item::getSubItems) .filter(withAltAction(SubItem::isValid, i -> logError(i, "Invalid"))) .peek(this::logSubItem) .filter(withAltAction(i -> i.size() > 10, i -> logError(i, "Too large"))) .map(SubItem::toDisplayItem) .forEach(this::display); } }