Java 8 stream combiner никогда не вызывался


Я пишу пользовательский Java 8 collector, который должен вычислять среднее значение POJO, имеющего метод getValue(). Вот код:

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

        @Override
        public Supplier<BigDecimal[]> supplier() {
            return () -> {
                BigDecimal[] start = new BigDecimal[2];
                start[0] = BigDecimal.ZERO;
                start[1] = BigDecimal.ZERO;
                return start;
            };
        }

        @Override
        public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
            return (a,b) ->  {
                a[0] = a[0].add(b.getValue());
                a[1] = a[1].add(BigDecimal.ONE);
            };
        }

        @Override
        public BinaryOperator<BigDecimal[]> combiner() {
            return (a,b) -> {
                a[0] = a[0].add(b[0]);
                a[1] = a[1].add(b[1]);
                return a;
            };
        }

        @Override
        public Function<BigDecimal[], BigDecimal> finisher() {
            return (a) -> {
                return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
            };
        }

        private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

        @Override
        public Set<Characteristics> characteristics() {
            return CHARACTERISTICS;
        }

    };

Все это хорошо работает в непараллельном случае. Однако, когда я использую parallelStream(), это иногда не работает. Например, учитывая значения от 1 до 10, он вычисляет( 53/9 вместо 55/10). При отладке отладчик никогда не попадает в точку останова в функции combiner (). Есть ли какой-то флаг, который мне нужно установить?

2 22

2 ответа:

Похоже, что проблема заключается в характеристике CONCURRENT, которая делает что-то другое, чем вы могли бы подумать:

Указывает, что этот коллектор является параллельным , что означает, что контейнер результата может поддержать функцию аккумулятора будучи вызывается одновременно с одним и тем же контейнером результата из нескольких нити.

вместо вызова объединителя, аккумулятор вызывается одновременно, используя один и тот же BigDecimal[] a для всех нити. Доступ к a не является атомарным, поэтому он идет неправильно:

Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]

Делая значение a[0] 7, когда оно должно быть 10. То же самое может произойти и с a[1], поэтому результаты могут быть противоречивыми.


Если вы удалите характеристику CONCURRENT, вместо нее будет использоваться объединитель.

Ну, это именно то, что вы просите при указании Characteristics.CONCURRENT:

Указывает, что этот коллектор является параллельным, что означает, что контейнер результата может поддерживать функцию аккумулятора, вызываемую одновременно с одним и тем же контейнером результата из нескольких потоков.

Если это не так, как в случае с вашим Collector, Вы не должны указывать этот флаг.


В качестве дополнительной заметки, new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED)); довольно неэффективно для определения характеристик. Вы можете просто использовать EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED). Когда вы удаляете неправильнуюпараллельную характеристику, вы можете использовать либо EnumSet.of(Characteristics.UNORDERED), либо Collections.singleton(Characteristics.UNORDERED), но HashSet определенно перебор.