Коллектор для разделения потока на куски заданного размера


У меня есть проблема, которую я пытаюсь решить с помощью чего-то, что я совершенно уверен, что не должен делать, но не вижу альтернативы. Мне дают список строк, и я должен разбить его на куски заданного размера. Затем результат должен быть передан какому-либо методу для дальнейшей обработки. Поскольку список может быть огромным, обработка должна выполняться асинхронно.

Мой подход заключается в создании пользовательского коллектора, который берет поток строк и преобразует его в Поток>:

final Stream<List<Long>> chunks = list
                        .stream()
                        .parallel()
                        .collect(MyCollector.toChunks(CHUNK_SIZE)) 
                        .flatMap(p -> doStuff(p))
                        .collect(MyCollector.toChunks(CHUNK_SIZE))
                        .map(...)
                        ...

Код для коллектора:

public final class MyCollector<T, A extends List<List<T>>, R extends Stream<List<T>>> implements Collector<T, A, R> {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger current = new AtomicInteger(-1);
private final int chunkSize;

private MyCollector(final int chunkSize){
    this.chunkSize = chunkSize;
}

@Override
public Supplier<A> supplier() {
    return () -> (A)new ArrayList<List<T>>();
}

@Override
public BiConsumer<A, T> accumulator() {
    return (A candidate, T acc) -> {
        if (index.getAndIncrement() % chunkSize == 0){
            candidate.add(new ArrayList<>(chunkSize));
            current.incrementAndGet();
        }
        candidate.get(current.get()).add(acc);
    };
}

@Override
public BinaryOperator<A> combiner() {
    return (a1, a2) -> {
        a1.addAll(a2);
        return a1;
    };
}
@Override
public Function<A, R> finisher() {
    return (a) -> (R)a.stream();
}

@Override
public Set<Characteristics> characteristics() {
    return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
}

public static <T> MyCollector<T, List<List<T>>, Stream<List<T>>> toChunks(final int chunkSize){
    return new MyCollector<>(chunkSize);
}

}

Это, кажется, работает в большинстве случаев, но иногда я получаю NPE.. Я уверен, что в аккумуляторе нет потокобезопасности, так как при добавлении новых списков в основной список могут вмешиваться два потока. Я не возражаю против того, чтобы кусок имел несколько слишком много или слишком мало элементов.

Я попробовал это вместо текущей функции поставщика:

 return () -> (A)new ArrayList<List<T>>(){{add(new ArrayList<T>());}};

Чтобы убедиться, что список присутствует всегда. Это не работает вообще и приводит к пустым спискам.

Вопросы:

  • я почти уверен, что пользовательский сплитератор был бы хорошим решением. Однако это не будет работать для синхронных сценариев. Кроме того, я уверен, что сплитератор называется?
      Я знаю, что у меня вообще не должно быть состояния, но не знаю, как его изменить.

Вопросы:

    Является ли этот подход полностью неправильным или каким-то образом исправимым?
  • Если я использую Spliterator-могу ли я быть уверен, что он называется, или это решает базовая реализация?
  • я почти уверен, что приведения к (A) и (R) в поставщике и финишере не нужны, но IntelliJ жалуется. Может быть, я чего-то не понимаю?

EDIT:

  • я добавил еще несколько в клиентский код в качестве предложений с IntStream.диапазон не будет работать, когда прикован.
  • я понимаю, что мог бы сделать это по-другому, как предложено в комментарии, но это также немного о стиле и о том, возможно ли это.
  • У меня есть параллельная характеристика, потому что я предполагаю, что в противном случае Stream API вернется к синхронной обработке. Решение не является потокобезопасным, как было указано ранее.
Любая помощь будет весьма признательна.

Лучший, D

3 3

3 ответа:

Я пока не могу комментировать, но я хотел бы опубликовать следующую ссылку на очень похожий вопрос (хотя и не дубликат, насколько я понимаю): Java 8 Stream with batch processing

Вас также может заинтересовать следующий выпуск на GitHub: https://github.com/jOOQ/jOOL/issues/296


Теперь, ваше использование характеристики CONCURRENT неверно-док говорит следующее о Collector.Characteristics.CONCURRENT:

Указывает, что этот коллектор является concurrent , что означает, что результирующий контейнер может поддерживать функцию-накопитель, вызываемую одновременно с одним и тем же результирующим контейнером из нескольких потоков.

Это означает, что supplier вызывается только один раз, а combiner фактически никогда не вызывается (ср. источник метода ReferencePipeline.collect()). Вот почему у тебя иногда бывают НПВ. В результате я предлагаю упрощенную версию того, что вы придумали:
public static <T> Collector<T, List<List<T>>, Stream<List<T>>> chunked(int chunkSize) {
  return Collector.of(
          ArrayList::new,
          (outerList, item) -> {
            if (outerList.isEmpty() || last(outerList).size() >= chunkSize) {
              outerList.add(new ArrayList<>(chunkSize));
            }
            last(outerList).add(item);
          },
          (a, b) -> {
            a.addAll(b);
            return a;
          },
          List::stream,
          Collector.Characteristics.UNORDERED
  );
}

private static <T> T last(List<T> list) {
  return list.get(list.size() - 1);
}

В качестве альтернативы, вы могли бы написать действительно параллельный Collector с использованием правильной синхронизации, но если вы не возражаете иметь более одного списка размером меньше chunkSize (что является эффектом, который вы можете получить с не-параллельным Collector, Как я предложил выше), я бы не беспокоился.

Вот один способ, в духе выполнения всего этого в одном выражении, который странно удовлетворяет: сначала свяжите каждую строку с ее индексом в списке, а затем используйте его в сборщике, чтобы выбрать список строк, в который будет помещена каждая строка. Затем потоковая передача этих списков параллельно с вашим методом конвертера.

  final Stream<List<Long>> longListStream = IntStream.range(0, strings.size())
    .parallel()
    .mapToObj(i -> new AbstractMap.SimpleEntry<>(i, strings.get(i)))
    .collect(
        () -> IntStream.range(0, strings.size() / CHUNK_SIZE + 1)
            .mapToObj(i -> new LinkedList<String>())
            .collect(Collectors.toList()),
        (stringListList, entry) -> {
            stringListList.get(entry.getKey() % CHUNK_SIZE).add(entry.getValue());
        },
        (stringListList1, stringListList2) -> { })
    .parallelStream()
    .map(this::doStuffWithStringsAndGetLongsBack);

Я думаю, что вам не нужно писать пользовательский Collector, скорее это можно сделать, используя существующие функциональные возможности, доступные в stream API. Это один из способов сделать это.

final int pageSize = 3;
List<Long> chunks  = IntStream.range(0, (numbers.size() + pageSize - 1) / pageSize)
        .peek(System.out::println)
        .mapToObj(i -> numbers.subList(i * pageSize, Math.min(pageSize * (i + 1), numbers.size())))
        .flatMap(l -> doStuff(l).stream())
        .collect(Collectors.toList());

Также я не вижу никакого смысла иметь Stream<List<Long>> chunks в качестве конечного результата, скорее это будет List<Long>.