RxJava-группировка, эмиссия и Zip отсортированных "кусков" с общим свойством?
Мои коллеги и я часто сталкиваемся с проблемой, и я надеюсь, что реактивное программирование может ее решить. Это, вероятно, потребует моей собственной реализации Operator
или Transformer
, хотя.
Я хочу взять любые Observable<T>
испускающие T
элементы, но я хочу, чтобы оператор сгруппировал их на отображении T
и испускал каждую группировку как List<T>
, или даже лучше какой-то общий накопитель, очень похожий на Collector
из потоков Java 8.
Но вот хитрая часть, которую я не думаю, что groupBy()
можете сделать. Я хочу взять две наблюдаемые объекты, проходящие через этот оператор, и предположить, что испускаемые элементы сортируются по этому свойству (входящие данные будут испускаться из отсортированного SQL-запроса и сопоставляться объекту T
). Оператор будет последовательно накапливать элементы до тех пор, пока свойство не изменится, а затем он выдает эту группу и переходит к следующей. Таким образом, я могу взять каждую группу данных из каждой наблюдаемой, сжать и обработать эти два куска, а затем выбросить их и перейти к следующему. Таким образом, я может поддерживать полу-буферизованное состояние и поддерживать низкий уровень использования памяти.
PARTITION_ID
, это визуально то, что я пытаюсь сделать.
Я делаю это только потому, что у меня может быть два запроса, каждый из которых содержит более миллиона записей, и мне нужно делать сложные сравнения бок о бок. У меня нет памяти, чтобы импортировать все данные с обеих сторон сразу, но я могу ограничить его до каждого отсортированного значения свойства и разбить его на партии. После каждой партии GC отбрасывает ее, и оператор может перейти к следующей.
Это код, который у меня есть до сих пор, но я не совсем понимаю, как действовать, поскольку я не хочу ничего выдавать, пока пакет не будет завершен. Как именно мне это сделать?
public final class SortedPartitioner<T,P,C,R> implements Transformer<T,R> {
private final Function<T,P> mappedPartitionProperty;
private final Supplier<C> acculatorSupplier;
private final BiConsumer<T,R> accumulator;
private final Function<C,R> finalResult;
private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
this.mappedPartitionProperty = mappedPartitionProperty;
this.acculatorSupplier = acculatorSupplier;
this.accumulator = accumulator;
this.finalResult = finalResult;
}
public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
Function<T,P> mappedPartitionProperty,
Supplier<C> accumulatorSupplier,
BiConsumer<T,R> accumulator,
Function<C,R> finalResult) {
return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);
}
@Override
public Observable<R> call(Observable<T> t) {
return null;
}
}
2 ответа:
Еще один ответ для вас, который использует библиотеку на Maven Central и намного короче.
Добавьте эту зависимость к вашему
pom.xml
.<dependency> <groupId>com.github.davidmoten</groupId> <artifactId>rxjava-extras</artifactId> <version>0.5.13</version> </dependency>
В терминах группировки элементов с одинаковыми
partition_id
Делаем так:import com.github.davidmoten.rx.Transformers; Observable<List<Item>> grouped = items.compose( Transformers.toListWhile( (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));
Тестирование довольно полно для этого метода (см. Также
Transformers.collectWhile
для структур данных, отличных от списков), но вы можете проверить источник самостоятельно наgithub .Затем переходите к
zip
.
Это сложный вопрос, но я также часто сталкивался с ним.
Фокус в том, чтобы использовать
Ниже приведен модульный тест, который из спискаmaterialize
,scan
иflatMap
.scan
накапливает список значений с одинаковым идентификатором partitionId и следующим другим значением, если оно существует.materialize
необходимо, потому что нам нужно знать, когда источник завершается, чтобы мы могли излучать оставшееся значение по другому значению, если оно существует.flatMap
принимает список и значение и выдает список, если значение присутствует (мы только что переключились на новый partitionId) и выдает значение (остаток), если поток завершается.1, 1, 2, 2, 2, 3
выдает списки{1, 1}, {2, 2, 2}, {3}
.Для вашего варианта использования вам просто нужно применить эту технику к обоим источникам и застегнуть их вместе.
Код:
Имейте в виду, что этот код был быстро собран и может иметь дыры. Обязательно выполните полное модульное тестирование с вашей адаптацией этого кода.import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; import org.junit.Test; import rx.Observable; public class StateMachineExampleTest { @Test public void testForStackOverflow() { Observable<Integer> a = Observable.just(1, 1, 2, 2, 2, 3); State<Integer> initial = new State<Integer>(Collections.emptyList(), Optional.empty(), false); List<List<Integer>> lists = a.materialize() // accumulate lists and uses onCompleted notification to emit // left overs when source completes .scan(initial, (state, notification) -> { if (notification.isOnCompleted()) { return new State<>(null, state.value, true); } else if (notification.isOnError()) throw new RuntimeException(notification.getThrowable()); else if (state.list.size() == 0) { return new State<>(Arrays.asList(notification.getValue()), Optional .empty(), false); } else if (partitionId(notification.getValue()) == partitionId(state.list .get(0))) { List<Integer> list = new ArrayList<>(); list.addAll(state.list); list.add(notification.getValue()); return new State<>(list, Optional.empty(), false); } else if (state.value.isPresent()) { if (partitionId(state.value.get()) == partitionId(notification .getValue())) { return new State<>(Arrays.asList(state.value.get(), notification.getValue()), Optional.empty(), false); } else { return new State<>(Arrays.asList(state.value.get()), Optional .of(notification.getValue()), false); } } else { return new State<>(state.list, Optional.of(notification.getValue()), false); } }) // emit lists from state .flatMap(state -> { if (state.completed) { if (state.value.isPresent()) return Observable.just(Arrays.asList(state.value.get())); else return Observable.empty(); } else if (state.value.isPresent()) { return Observable.just(state.list); } else { return Observable.empty(); } }) // get as a list of lists to check .toList().toBlocking().single(); assertEquals(Arrays.asList(Arrays.asList(1, 1), Arrays.asList(2, 2, 2), Arrays.asList(3)), lists); } private static int partitionId(Integer n) { return n; } private static final class State<T> { final List<T> list; final Optional<T> value; final boolean completed; State(List<T> list, Optional<T> value, boolean completed) { this.list = list; this.value = value; this.completed = completed; } } }
Дополнительная заметка для вас заключается в том, что мы используем поддержку противодавления Операторы
materialize
,scan
иflatMap
результирующее преобразование также поддерживает обратное давление и, таким образом, может быть безопасно объединено сzip
.