Коллектор для разделения потока на куски заданного размера
У меня есть проблема, которую я пытаюсь решить с помощью чего-то, что я совершенно уверен, что не должен делать, но не вижу альтернативы. Мне дают список строк, и я должен разбить его на куски заданного размера. Затем результат должен быть передан какому-либо методу для дальнейшей обработки. Поскольку список может быть огромным, обработка должна выполняться асинхронно.
Мой подход заключается в создании пользовательского коллектора, который берет поток строк и преобразует его в Поток>:
final Stream<List<Long>> chunks = list
.stream()
.parallel()
.collect(MyCollector.toChunks(CHUNK_SIZE))
.flatMap(p -> doStuff(p))
.collect(MyCollector.toChunks(CHUNK_SIZE))
.map(...)
...
Код для коллектора:
public final class MyCollector<T, A extends List<List<T>>, R extends Stream<List<T>>> implements Collector<T, A, R> {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger current = new AtomicInteger(-1);
private final int chunkSize;
private MyCollector(final int chunkSize){
this.chunkSize = chunkSize;
}
@Override
public Supplier<A> supplier() {
return () -> (A)new ArrayList<List<T>>();
}
@Override
public BiConsumer<A, T> accumulator() {
return (A candidate, T acc) -> {
if (index.getAndIncrement() % chunkSize == 0){
candidate.add(new ArrayList<>(chunkSize));
current.incrementAndGet();
}
candidate.get(current.get()).add(acc);
};
}
@Override
public BinaryOperator<A> combiner() {
return (a1, a2) -> {
a1.addAll(a2);
return a1;
};
}
@Override
public Function<A, R> finisher() {
return (a) -> (R)a.stream();
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
}
public static <T> MyCollector<T, List<List<T>>, Stream<List<T>>> toChunks(final int chunkSize){
return new MyCollector<>(chunkSize);
}
}
Это, кажется, работает в большинстве случаев, но иногда я получаю NPE.. Я уверен, что в аккумуляторе нет потокобезопасности, так как при добавлении новых списков в основной список могут вмешиваться два потока. Я не возражаю против того, чтобы кусок имел несколько слишком много или слишком мало элементов.
Я попробовал это вместо текущей функции поставщика:
return () -> (A)new ArrayList<List<T>>(){{add(new ArrayList<T>());}};
Чтобы убедиться, что список присутствует всегда. Это не работает вообще и приводит к пустым спискам.
Вопросы:
- я почти уверен, что пользовательский сплитератор был бы хорошим решением. Однако это не будет работать для синхронных сценариев. Кроме того, я уверен, что сплитератор называется?
- Я знаю, что у меня вообще не должно быть состояния, но не знаю, как его изменить.
Вопросы:
- Является ли этот подход полностью неправильным или каким-то образом исправимым?
- Если я использую Spliterator-могу ли я быть уверен, что он называется, или это решает базовая реализация?
- я почти уверен, что приведения к (A) и (R) в поставщике и финишере не нужны, но IntelliJ жалуется. Может быть, я чего-то не понимаю?
EDIT:
- я добавил еще несколько в клиентский код в качестве предложений с IntStream.диапазон не будет работать, когда прикован.
- я понимаю, что мог бы сделать это по-другому, как предложено в комментарии, но это также немного о стиле и о том, возможно ли это.
- У меня есть параллельная характеристика, потому что я предполагаю, что в противном случае Stream API вернется к синхронной обработке. Решение не является потокобезопасным, как было указано ранее.
Лучший, D
3 ответа:
Я пока не могу комментировать, но я хотел бы опубликовать следующую ссылку на очень похожий вопрос (хотя и не дубликат, насколько я понимаю): Java 8 Stream with batch processing
Вас также может заинтересовать следующий выпуск на GitHub: https://github.com/jOOQ/jOOL/issues/296
Теперь, ваше использование характеристики
CONCURRENT
неверно-док говорит следующее оCollector.Characteristics.CONCURRENT
:Это означает, чтоУказывает, что этот коллектор является concurrent , что означает, что результирующий контейнер может поддерживать функцию-накопитель, вызываемую одновременно с одним и тем же результирующим контейнером из нескольких потоков.
supplier
вызывается только один раз, аcombiner
фактически никогда не вызывается (ср. источник методаReferencePipeline.collect()
). Вот почему у тебя иногда бывают НПВ. В результате я предлагаю упрощенную версию того, что вы придумали:public static <T> Collector<T, List<List<T>>, Stream<List<T>>> chunked(int chunkSize) { return Collector.of( ArrayList::new, (outerList, item) -> { if (outerList.isEmpty() || last(outerList).size() >= chunkSize) { outerList.add(new ArrayList<>(chunkSize)); } last(outerList).add(item); }, (a, b) -> { a.addAll(b); return a; }, List::stream, Collector.Characteristics.UNORDERED ); } private static <T> T last(List<T> list) { return list.get(list.size() - 1); }
В качестве альтернативы, вы могли бы написать действительно параллельный
Collector
с использованием правильной синхронизации, но если вы не возражаете иметь более одного списка размером меньшеchunkSize
(что является эффектом, который вы можете получить с не-параллельнымCollector
, Как я предложил выше), я бы не беспокоился.
Вот один способ, в духе выполнения всего этого в одном выражении, который странно удовлетворяет: сначала свяжите каждую строку с ее индексом в списке, а затем используйте его в сборщике, чтобы выбрать список строк, в который будет помещена каждая строка. Затем потоковая передача этих списков параллельно с вашим методом конвертера.
final Stream<List<Long>> longListStream = IntStream.range(0, strings.size()) .parallel() .mapToObj(i -> new AbstractMap.SimpleEntry<>(i, strings.get(i))) .collect( () -> IntStream.range(0, strings.size() / CHUNK_SIZE + 1) .mapToObj(i -> new LinkedList<String>()) .collect(Collectors.toList()), (stringListList, entry) -> { stringListList.get(entry.getKey() % CHUNK_SIZE).add(entry.getValue()); }, (stringListList1, stringListList2) -> { }) .parallelStream() .map(this::doStuffWithStringsAndGetLongsBack);
Я думаю, что вам не нужно писать пользовательский
Collector
, скорее это можно сделать, используя существующие функциональные возможности, доступные вstream
API. Это один из способов сделать это.final int pageSize = 3; List<Long> chunks = IntStream.range(0, (numbers.size() + pageSize - 1) / pageSize) .peek(System.out::println) .mapToObj(i -> numbers.subList(i * pageSize, Math.min(pageSize * (i + 1), numbers.size()))) .flatMap(l -> doStuff(l).stream()) .collect(Collectors.toList());
Также я не вижу никакого смысла иметь
Stream<List<Long>> chunks
в качестве конечного результата, скорее это будетList<Long>
.