Java 8 stream combiner никогда не вызывался
Я пишу пользовательский Java 8 collector, который должен вычислять среднее значение POJO, имеющего метод getValue()
. Вот код:
public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {
@Override
public Supplier<BigDecimal[]> supplier() {
return () -> {
BigDecimal[] start = new BigDecimal[2];
start[0] = BigDecimal.ZERO;
start[1] = BigDecimal.ZERO;
return start;
};
}
@Override
public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
return (a,b) -> {
a[0] = a[0].add(b.getValue());
a[1] = a[1].add(BigDecimal.ONE);
};
}
@Override
public BinaryOperator<BigDecimal[]> combiner() {
return (a,b) -> {
a[0] = a[0].add(b[0]);
a[1] = a[1].add(b[1]);
return a;
};
}
@Override
public Function<BigDecimal[], BigDecimal> finisher() {
return (a) -> {
return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
};
}
private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
@Override
public Set<Characteristics> characteristics() {
return CHARACTERISTICS;
}
};
Все это хорошо работает в непараллельном случае. Однако, когда я использую parallelStream()
, это иногда не работает. Например, учитывая значения от 1 до 10, он вычисляет( 53/9 вместо 55/10). При отладке отладчик никогда не попадает в точку останова в функции combiner (). Есть ли какой-то флаг, который мне нужно установить?
2 ответа:
Похоже, что проблема заключается в характеристике
CONCURRENT
, которая делает что-то другое, чем вы могли бы подумать:Указывает, что этот коллектор является параллельным , что означает, что контейнер результата может поддержать функцию аккумулятора будучи вызывается одновременно с одним и тем же контейнером результата из нескольких нити.
вместо вызова объединителя, аккумулятор вызывается одновременно, используя один и тот же
BigDecimal[] a
для всех нити. Доступ кa
не является атомарным, поэтому он идет неправильно:Thread1 -> retrieves value of a[0]: 3 Thread2 -> retrieves value of a[0]: 3 Thread1 -> adds own value: 3 + 3 = 6 Thread2 -> adds own value: 3 + 4 = 7 Thread1 -> writes 6 to a[0] Thread2 -> writes 7 to a[0]
Делая значение
a[0]
7, когда оно должно быть 10. То же самое может произойти и сa[1]
, поэтому результаты могут быть противоречивыми.
Если вы удалите характеристику
CONCURRENT
, вместо нее будет использоваться объединитель.
Ну, это именно то, что вы просите при указании
Characteristics.CONCURRENT
:Указывает, что этот коллектор является параллельным, что означает, что контейнер результата может поддерживать функцию аккумулятора, вызываемую одновременно с одним и тем же контейнером результата из нескольких потоков.
Если это не так, как в случае с вашим
Collector
, Вы не должны указывать этот флаг.
В качестве дополнительной заметки,
new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
довольно неэффективно для определения характеристик. Вы можете просто использоватьEnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED)
. Когда вы удаляете неправильнуюпараллельную характеристику, вы можете использовать либоEnumSet.of(Characteristics.UNORDERED)
, либоCollections.singleton(Characteristics.UNORDERED)
, ноHashSet
определенно перебор.