Rx.Net -получить изменения цен на акции и обработать их


Проблема, которую я пытаюсь решить

  1. получить биржевые тики
  2. Всегда учитывайте последнюю цену акций
  3. Каждый x во-вторых, сделайте снимок ТИКов и отправьте на обработку
Таким образом, у меня есть Observable Источник биржевых тиков. Он посылает только тики для акций, которые меня интересуют. Что мне нужно сделать, так это получить эти цены акций, и через каждые x секунд (для примера скажем, каждые 3 секунды) отправить в снимка цены для обработки. Если в течение 3 секунд я получаю 2 тика для одной и той же акции, мне нужен только последний ТИК. Эта обработка является вычислительно тяжелой, поэтому, если возможно, я хотел бы избежать отправки одной и той же цены акций для обработки дважды.

Приведу пример.

Допустим, в начале последовательности я получил 2 тика - > MSFT:1$, GOOG:2$.

В следующие 3 секунды я ничего не получаю, поэтому MSFT & GOOG тики должны быть отправлены для обработка.

Теперь в следующую секунду я получаю новые тики - > MSFT:1$, GOOG:3$, INTL:3$

Снова предположим, что в течение следующих 3 секунд ничего не происходит.

Здесь, посколькуMSFT цена не изменилась (это все еще 1$), толькоGOOG & INTL должны быть отправлены на обработку.

И это повторяется в течение всего дня.

Теперь я думаю, что Rx помогает легко и элегантно решать такого рода проблемы. Но у меня есть проблема, чтобы иметь правильный запросы. Это то, что я до сих пор, попытаюсь объяснить, что он делает и в чем проблема с ним

var finalQuery =
               from priceUpdate in **Observable<StockTick>**
               group priceUpdate by priceUpdate.Stock into grouped
               from combined in Observable.Interval(TimeSpan.FromSeconds(3))
                      .CombineLatest(grouped, (t, pei) => new { PEI = pei, Interval = t })
               group combined by new { combined.Interval } into combined
               select new
               {
                   Interval = combined.Key.Interval,
                   PEI = combined.Select(c => new StockTick(c.PEI.Stock, c.PEI.Price))
               };

            finalQuery
                .SelectMany(combined => combined.PEI)
                .Distinct(pu => new { pu.Stock, pu.Price })
                .Subscribe(priceUpdate =>
                {
                    Process(priceUpdate);
                });

public class StockTick
{
   public StockTick(string stock, decimal price)
   {    
      Stock = stock;
      Price = price;
   }
   public string Stock {get;set;}
   public decimal Price {get;set;}
}

Таким образом, это получает цену акции, группирует ее по акциям, а затем объединяет последние из этой группы

Последовательность Ed с Observable.Interval. Таким образом, я пытаюсь гарантировать, что обрабатываются только последние тики для акции, и она срабатывает каждые 3 секунды.

Затем он снова группирует его по интервалу на этот раз, в результате у меня есть группа последовательностей для каждого 3-секундного интервала, что пройденный.

И в качестве последнего шага я сглаживаю эту последовательность до последовательности обновлений цены акций с помощью SelectMany, а также применяю Distinct, чтобы гарантировать, что одна и та же цена для одной и той же акции не обрабатывается дважды.

Есть 2 проблемы с этим запросом, которые мне не нравятся. Во - первых, я действительно не люблю двойные группы-есть ли способ избежать этого ? Во-вторых - при таком подходе я должен обрабатывать цены одну за другой, что я действительно хотел бы иметь снимки - то есть в течение 3 секунд все, что у меня есть, я пристегну и отправлю на обработку, но не могу понять, как пристегнуться.

Я буду рад предложениям решить эту проблему другим способом, но я предпочел бы остаться в Rx, если только нет действительно чего-то намного лучшего.
1 2

1 ответ:

Пара вещей:

  1. вы захотите воспользоваться преимуществами Sample оператор:
  2. вы, вероятно, хотите DistinctUntilChanged вместо Distinct. Если вы используете Distinct, то если MSFT переходит от $1, к $2, а затем обратно к $1, Вы не получите событие на третьем ТИКе.

Я полагаю, что ваше решение будет выглядеть примерно так:

IObservable<StockTick> source;
source
    .GroupBy(st => st.Stock)
    .Select(stockObservable => stockObservable
        .Sample(TimeSpan.FromSeconds(3))
        .DistinctUntilChanged(st => st.Price)
    )
    .Merge()
    .Subscribe(st => Process(st));

Править (Distinct проблемы производительности):

Каждый оператор

Должен поддерживать в себе полную отчетливую историю. Если у вас есть дорогие акции, например AMZN, которые до сих пор колебались от $958 до$974, то вы можете получить много данных. Это ~1600 возможных точек данных, которые должны сидеть в памяти, пока вы не отпишетесь от Distinct. Это также в конечном счете ухудшит производительность, так как каждый тик AMZN должен быть сравнен с 1600-иш существующими точками данных перед прохождением. Если это для длительного процесса (охватывающего несколько торговых дней), то вы получите еще больше данных точки.

Учитывая N акций, у вас есть N операторов Distinct, которые должны работать соответственно. Умножьте это поведение на N акций, и вы столкнетесь с постоянно растущей проблемой.