RxJava-группировка, эмиссия и Zip отсортированных "кусков" с общим свойством?


Мои коллеги и я часто сталкиваемся с проблемой, и я надеюсь, что реактивное программирование может ее решить. Это, вероятно, потребует моей собственной реализации Operator или Transformer, хотя.

Я хочу взять любые Observable<T> испускающие T элементы, но я хочу, чтобы оператор сгруппировал их на отображении T и испускал каждую группировку как List<T>, или даже лучше какой-то общий накопитель, очень похожий на Collector из потоков Java 8.

Но вот хитрая часть, которую я не думаю, что groupBy() можете сделать. Я хочу взять две наблюдаемые объекты, проходящие через этот оператор, и предположить, что испускаемые элементы сортируются по этому свойству (входящие данные будут испускаться из отсортированного SQL-запроса и сопоставляться объекту T). Оператор будет последовательно накапливать элементы до тех пор, пока свойство не изменится, а затем он выдает эту группу и переходит к следующей. Таким образом, я могу взять каждую группу данных из каждой наблюдаемой, сжать и обработать эти два куска, а затем выбросить их и перейти к следующему. Таким образом, я может поддерживать полу-буферизованное состояние и поддерживать низкий уровень использования памяти.

Итак, если я сортировал, группировал и пролистывал PARTITION_ID, это визуально то, что я пытаюсь сделать.

Введите описание изображения здесь

Я делаю это только потому, что у меня может быть два запроса, каждый из которых содержит более миллиона записей, и мне нужно делать сложные сравнения бок о бок. У меня нет памяти, чтобы импортировать все данные с обеих сторон сразу, но я могу ограничить его до каждого отсортированного значения свойства и разбить его на партии. После каждой партии GC отбрасывает ее, и оператор может перейти к следующей.

Это код, который у меня есть до сих пор, но я не совсем понимаю, как действовать, поскольку я не хочу ничего выдавать, пока пакет не будет завершен. Как именно мне это сделать?

public final class  SortedPartitioner<T,P,C,R> implements Transformer<T,R> {

 private final Function<T,P> mappedPartitionProperty;
 private final Supplier<C> acculatorSupplier;
 private final BiConsumer<T,R> accumulator;
 private final Function<C,R> finalResult;


 private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
   BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
  this.mappedPartitionProperty = mappedPartitionProperty;
  this.acculatorSupplier = acculatorSupplier;
  this.accumulator = accumulator;
  this.finalResult = finalResult;
 }
 public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
   Function<T,P> mappedPartitionProperty, 
   Supplier<C> accumulatorSupplier,
   BiConsumer<T,R> accumulator,
   Function<C,R> finalResult) {

  return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);

 }
 @Override
 public Observable<R> call(Observable<T> t) {
  return null;
 }

}
2 4

2 ответа:

Еще один ответ для вас, который использует библиотеку на Maven Central и намного короче.

Добавьте эту зависимость к вашему pom.xml.

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.5.13</version>
</dependency>

В терминах группировки элементов с одинаковыми partition_id Делаем так:

import com.github.davidmoten.rx.Transformers;

Observable<List<Item>> grouped = items.compose(
    Transformers.toListWhile(
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

Тестирование довольно полно для этого метода (см. Также Transformers.collectWhile для структур данных, отличных от списков), но вы можете проверить источник самостоятельно наgithub .

Затем переходите к zip.

Это сложный вопрос, но я также часто сталкивался с ним.

Фокус в том, чтобы использовать materialize, scan и flatMap. scan накапливает список значений с одинаковым идентификатором partitionId и следующим другим значением, если оно существует. materialize необходимо, потому что нам нужно знать, когда источник завершается, чтобы мы могли излучать оставшееся значение по другому значению, если оно существует. flatMap принимает список и значение и выдает список, если значение присутствует (мы только что переключились на новый partitionId) и выдает значение (остаток), если поток завершается.

Ниже приведен модульный тест, который из списка 1, 1, 2, 2, 2, 3 выдает списки {1, 1}, {2, 2, 2}, {3}.

Для вашего варианта использования вам просто нужно применить эту технику к обоим источникам и застегнуть их вместе.

Код:

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import org.junit.Test;

import rx.Observable;

public class StateMachineExampleTest {

    @Test
    public void testForStackOverflow() {
        Observable<Integer> a = Observable.just(1, 1, 2, 2, 2, 3);
        State<Integer> initial = new State<Integer>(Collections.emptyList(), Optional.empty(),
                false);
        List<List<Integer>> lists = a.materialize()
                // accumulate lists and uses onCompleted notification to emit
                // left overs when source completes
                .scan(initial,
                        (state, notification) -> {
                            if (notification.isOnCompleted()) {
                                return new State<>(null, state.value, true);
                            } else if (notification.isOnError())
                                throw new RuntimeException(notification.getThrowable());
                            else if (state.list.size() == 0) {
                                return new State<>(Arrays.asList(notification.getValue()), Optional
                                        .empty(), false);
                            } else if (partitionId(notification.getValue()) == partitionId(state.list
                                    .get(0))) {
                                List<Integer> list = new ArrayList<>();
                                list.addAll(state.list);
                                list.add(notification.getValue());
                                return new State<>(list, Optional.empty(), false);
                            } else if (state.value.isPresent()) {
                                if (partitionId(state.value.get()) == partitionId(notification
                                        .getValue())) {
                                    return new State<>(Arrays.asList(state.value.get(),
                                            notification.getValue()), Optional.empty(), false);
                                } else {
                                    return new State<>(Arrays.asList(state.value.get()), Optional
                                            .of(notification.getValue()), false);
                                }
                            } else {
                                return new State<>(state.list,
                                        Optional.of(notification.getValue()), false);
                            }
                        })
                // emit lists from state
                .flatMap(state -> {
                    if (state.completed) {
                        if (state.value.isPresent())
                            return Observable.just(Arrays.asList(state.value.get()));
                        else
                            return Observable.empty();
                    } else if (state.value.isPresent()) {
                        return Observable.just(state.list);
                    } else {
                        return Observable.empty();
                    }
                })
                // get as a list of lists to check
                .toList().toBlocking().single();
        assertEquals(Arrays.asList(Arrays.asList(1, 1), Arrays.asList(2, 2, 2), Arrays.asList(3)),
                lists);
    }

    private static int partitionId(Integer n) {
        return n;
    }

    private static final class State<T> {
        final List<T> list;
        final Optional<T> value;
        final boolean completed;

        State(List<T> list, Optional<T> value, boolean completed) {
            this.list = list;
            this.value = value;
            this.completed = completed;
        }
    }

}
Имейте в виду, что этот код был быстро собран и может иметь дыры. Обязательно выполните полное модульное тестирование с вашей адаптацией этого кода.

Дополнительная заметка для вас заключается в том, что мы используем поддержку противодавления Операторы materialize, scan и flatMap результирующее преобразование также поддерживает обратное давление и, таким образом, может быть безопасно объединено с zip.