В RX как объединить два источника разных типов
Настройка:
- First
IObservable
производит значения типа A - Second
IObservable
производит значения типа B - они производят значение в разном темпе (довольно быстро, до каждых 10 МС)
Чего я пытаюсь достичь:
Каждый N раз (N довольно медленно около 500 мс) вызов должен быть сделан в службу и предоставить последние значения как из Первого, так и из второго IObservable
.
Вопрос: Интересно, как я могу использовать RX.
Текущее решение (не работает):
var stateObs = from drag in dragObs.MostRecent(0).ToObservable()
from roll in rollObs.MostRecent(0).ToObservable()
select new ClientState
{
FileDragPerc = drag,
PhoneRoll = roll,
PendingFileType = FileType.Image,
TransferState = TransferState.SelectiveTransfer
};
stateObs.Sample(TimeSpan.FromMilliseconds(300))
.Subscribe(x => _lsService.SetClientStateAsync(x),
x => Debug.WriteLine("Error in observable "),
() => Debug.WriteLine("Error observable finished! "));
2 ответа:
Вы правы. Вот что делает оператор
CombineLatest
:Последнее значение любой последовательности сохраняется для создания пары, на которую действует селектор. Выходной поток будетA: 1...2...3...4...5... B: a.....b.........c...
(1,a) (2,a) (2,b) (3,b)
и так далее.Если вам нужно построить правильные пары из любого потока, используйте оператор Zip, который даст вам
(1,a) (2,b) (3,c)
и так далее.P.S.
Я бы предложил попытаться лучше понять, как компилятор переписывает понимание запросов. Она разрешит большую часть из ваше замешательство.
from a in oA from b in oB select ...
Эффективно
SelectMany(oA, oB)
Я думаю, что сам нашел ответ
Операция CombineLatest () делает то, что мне просто нужно, Вот что я получаю:
var stateObs = dragObs.CombineLatest(rollObs, (d, r) => new ClientState { FileDragPerc = d, PhoneRoll = r, TransferState = TransferState.SelectiveTransfer, PendingFileType = FileType.Image }); stateObs.Sample(TimeSpan.FromMilliseconds(300)) .Subscribe(x => _lsService.SetClientStateAsync(x), x => Debug.WriteLine("Error in observable "), () => Debug.WriteLine("Error observable finished! "));