Rx.Net -получить изменения цен на акции и обработать их
Проблема, которую я пытаюсь решить
- получить биржевые тики
- Всегда учитывайте последнюю цену акций
- Каждый x во-вторых, сделайте снимок ТИКов и отправьте на обработку
Observable
Источник биржевых тиков. Он посылает только тики для акций, которые меня интересуют. Что мне нужно сделать, так это получить эти цены акций, и через каждые x секунд (для примера скажем, каждые 3 секунды) отправить в снимка цены для обработки. Если в течение 3 секунд я получаю 2 тика для одной и той же акции, мне нужен только последний ТИК. Эта обработка является вычислительно тяжелой, поэтому, если возможно, я хотел бы избежать отправки одной и той же цены акций для обработки дважды.
Приведу пример.
Допустим, в начале последовательности я получил 2 тика - > MSFT:1$, GOOG:2$.В следующие 3 секунды я ничего не получаю, поэтому MSFT & GOOG тики должны быть отправлены для обработка.
Теперь в следующую секунду я получаю новые тики - > MSFT:1$, GOOG:3$, INTL:3$
Снова предположим, что в течение следующих 3 секунд ничего не происходит.Здесь, посколькуMSFT цена не изменилась (это все еще 1$), толькоGOOG & INTL должны быть отправлены на обработку.
И это повторяется в течение всего дня.Теперь я думаю, что Rx помогает легко и элегантно решать такого рода проблемы. Но у меня есть проблема, чтобы иметь правильный запросы. Это то, что я до сих пор, попытаюсь объяснить, что он делает и в чем проблема с ним
var finalQuery =
from priceUpdate in **Observable<StockTick>**
group priceUpdate by priceUpdate.Stock into grouped
from combined in Observable.Interval(TimeSpan.FromSeconds(3))
.CombineLatest(grouped, (t, pei) => new { PEI = pei, Interval = t })
group combined by new { combined.Interval } into combined
select new
{
Interval = combined.Key.Interval,
PEI = combined.Select(c => new StockTick(c.PEI.Stock, c.PEI.Price))
};
finalQuery
.SelectMany(combined => combined.PEI)
.Distinct(pu => new { pu.Stock, pu.Price })
.Subscribe(priceUpdate =>
{
Process(priceUpdate);
});
public class StockTick
{
public StockTick(string stock, decimal price)
{
Stock = stock;
Price = price;
}
public string Stock {get;set;}
public decimal Price {get;set;}
}
Таким образом, это получает цену акции, группирует ее по акциям, а затем объединяет последние из этой группы
Последовательность Ed с Observable.Interval
. Таким образом, я пытаюсь гарантировать, что обрабатываются только последние тики для акции, и она срабатывает каждые 3 секунды.
Затем он снова группирует его по интервалу на этот раз, в результате у меня есть группа последовательностей для каждого 3-секундного интервала, что пройденный.
И в качестве последнего шага я сглаживаю эту последовательность до последовательности обновлений цены акций с помощью SelectMany
, а также применяю Distinct
, чтобы гарантировать, что одна и та же цена для одной и той же акции не обрабатывается дважды.
Есть 2 проблемы с этим запросом, которые мне не нравятся. Во - первых, я действительно не люблю двойные группы-есть ли способ избежать этого ? Во-вторых - при таком подходе я должен обрабатывать цены одну за другой, что я действительно хотел бы иметь снимки - то есть в течение 3 секунд все, что у меня есть, я пристегну и отправлю на обработку, но не могу понять, как пристегнуться.
Я буду рад предложениям решить эту проблему другим способом, но я предпочел бы остаться в Rx, если только нет действительно чего-то намного лучшего.1 ответ:
Пара вещей:
- вы захотите воспользоваться преимуществами
Sample
оператор:- вы, вероятно, хотите
DistinctUntilChanged
вместоDistinct
. Если вы используетеDistinct
, то если MSFT переходит от $1, к $2, а затем обратно к $1, Вы не получите событие на третьем ТИКе.Я полагаю, что ваше решение будет выглядеть примерно так:
IObservable<StockTick> source; source .GroupBy(st => st.Stock) .Select(stockObservable => stockObservable .Sample(TimeSpan.FromSeconds(3)) .DistinctUntilChanged(st => st.Price) ) .Merge() .Subscribe(st => Process(st));
Править (
Каждый операторDistinct
проблемы производительности):Должен поддерживать в себе полную отчетливую историю. Если у вас есть дорогие акции, например AMZN, которые до сих пор колебались от $958 до$974, то вы можете получить много данных. Это ~1600 возможных точек данных, которые должны сидеть в памяти, пока вы не отпишетесь от
Distinct
. Это также в конечном счете ухудшит производительность, так как каждый тик AMZN должен быть сравнен с 1600-иш существующими точками данных перед прохождением. Если это для длительного процесса (охватывающего несколько торговых дней), то вы получите еще больше данных точки.Учитывая N акций, у вас есть N операторов
Distinct
, которые должны работать соответственно. Умножьте это поведение на N акций, и вы столкнетесь с постоянно растущей проблемой.