В RX как объединить два источника разных типов


Настройка:

  • First IObservable производит значения типа A
  • Second IObservable производит значения типа B
  • они производят значение в разном темпе (довольно быстро, до каждых 10 МС)

Чего я пытаюсь достичь:

Каждый N раз (N довольно медленно около 500 мс) вызов должен быть сделан в службу и предоставить последние значения как из Первого, так и из второго IObservable.

Вопрос: Интересно, как я могу использовать RX.

Текущее решение (не работает):

var stateObs = from drag in dragObs.MostRecent(0).ToObservable()
                from roll in rollObs.MostRecent(0).ToObservable()
                select new ClientState
                            {
                                FileDragPerc = drag,
                                PhoneRoll = roll,
                                PendingFileType = FileType.Image,
                                TransferState = TransferState.SelectiveTransfer
                            };

stateObs.Sample(TimeSpan.FromMilliseconds(300))
        .Subscribe(x => _lsService.SetClientStateAsync(x),
                    x => Debug.WriteLine("Error in observable "),
                    () => Debug.WriteLine("Error observable finished! "));
2 3

2 ответа:

Вы правы. Вот что делает оператор CombineLatest:

A: 1...2...3...4...5...

B: a.....b.........c...
Последнее значение любой последовательности сохраняется для создания пары, на которую действует селектор. Выходной поток будет (1,a) (2,a) (2,b) (3,b) и так далее.

Если вам нужно построить правильные пары из любого потока, используйте оператор Zip, который даст вам (1,a) (2,b) (3,c) и так далее.

P.S.

Я бы предложил попытаться лучше понять, как компилятор переписывает понимание запросов. Она разрешит большую часть из ваше замешательство.

from a in oA
from b in oB
select ...

Эффективно SelectMany(oA, oB)

Я думаю, что сам нашел ответ

Операция CombineLatest () делает то, что мне просто нужно, Вот что я получаю:

var stateObs = dragObs.CombineLatest(rollObs, (d, r) => new ClientState
                                                       {
                                                           FileDragPerc = d,
                                                           PhoneRoll = r,
                                                           TransferState = TransferState.SelectiveTransfer,
                                                           PendingFileType = FileType.Image
                                                       });

    stateObs.Sample(TimeSpan.FromMilliseconds(300))
            .Subscribe(x => _lsService.SetClientStateAsync(x),
                        x => Debug.WriteLine("Error in observable "),
                        () => Debug.WriteLine("Error observable finished! "));