ReactiveX: вычислить частоту отдельных элементов в наблюдаемом


У меня есть Observable<String>. Я хотел бы превратить это в Map<String, Int>, который говорит мне число вхождений для каждой отдельной строки.

Наблюдаемое содержит ~1 миллиард элементов, из которых 1000 различны (поэтому хранение всего набора данных в оперативной памяти не является вариантом). В настоящее время я перебираю Observable и обновляю HashMap. Я также удостоверяюсь, что наблюдаю на той же самой нити, чтобы избежать условий гонки. Однако получение частоты элемента должно быть изначально легко распараллелить, поэтому было бы неплохо воспользоваться этим.

Есть ли способ сделать это?

1 2

1 ответ:

Вы можете использовать groupBy вместо того, чтобы поддерживать HashMap самостоятельно. groupBy создаст Observable для каждого ключа, и вы можете подписаться на него в другом планировщике. Например,

public class KeyCounter {
    int key;
    long count;

    public KeyCounter(int key, long count) {
        this.key = key;
        this.count = count;
    }

    @Override
    public String toString() {
        return "key: " + key + " count: "  + count;
    }
}

@Test
public void foo() {
    Observable<Integer> o = Observable.just(1, 2, 3, 2, 1);
    o.groupBy(i -> i).flatMap(
        group ->
            group.subscribeOn(Schedulers.computation()).countLong().map(count -> new KeyCounter(group.getKey(), count))
    ).subscribe(System.out::println);

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}