Построение наблюдаемого репозитория с помощью Rx


Я работаю в общем сценарии, в котором я хотел бы получить доступ к подмножеству репозитория и не беспокоиться о том, чтобы держать его обновленным, например, "получить все заказы, цена которых больше 10". Я реализовал решение, но у меня есть две проблемы с ним (перечислены в конце).

Подмножество репозитория может быть достигнуто с помощью чего-то эквивалентного

var expensiveOrders = Repository.GetOrders().Where(o => o.Price > 10);

Но это IEnumerable и не будет обновляться при обновлении исходной коллекции. Я мог бы добавить обработчиков для CollectionChanged, но что, если мы хотим получить доступ к следующему подмножеству?

var expensiveOrdersFromBob = expensiveOrders.Where(o => o.Name == Bob);

Мы должны были бы подключить коллекцию-измененную и для этого. Концепция живых обновлений привела меня к мысли о Rx, поэтому я приступил к созданиюObservableCache , который содержит как ObservableCollection элементов, которые автоматически обновляют себя, так и поток RX для уведомления. (Поток-это также то, что обновляет кэш под капотом.)

class ObservableCache<T> : IObservableCache<T>
{
    private readonly ObservableCollection<T> _cache;
    private readonly IObservable<Tuple<T, CRUDOperationType>> _updates;

    public ObservableCache(IEnumerable<T> initialCache
       , IObservable<Tuple<T, CRUDOperationType>> currentStream, Func<T, bool> filter)
        {
          _cache = new ObservableCollection<T>(initialCache.Where(filter));
          _updates = currentStream.Where(tuple => filter(tuple.Item1));
          _updates.Subscribe(ProcessUpdate);
        }

    private void ProcessUpdate(Tuple<T, CRUDOperationType> update)
    {
        var item = update.Item1;
        lock (_cache)
        {
            switch (update.Item2)
            {
                case CRUDOperationType.Create:
                    _cache.Add(item);
                    break;
                case CRUDOperationType.Delete:
                    _cache.Remove(item);
                    break;
                case CRUDOperationType.Replace:
                case CRUDOperationType.Update:
                    _cache.Remove(item); // ToDo: implement some key-based equality
                    _cache.Add(item);
                    break;
            }
        }
    }

    public ObservableCollection<T> Cache
    {
        get { return _cache; }
    }

    public IObservable<T> Updates
    {
        get { return _updates.Select(tuple => tuple.Item1); }
    }

    public IObservableCache<T> Where(Func<T, bool> predicate)
    {
        return new ObservableCache<T>(_cache, _updates, predicate);
    }
}

Затем вы можете использовать его следующим образом:

var expensiveOrders = new ObservableCache<Order>(_orders
                                                 , updateStream
                                                 , o => o.Price > 10);
expensiveOrders.Updates.Subscribe
     (o => Console.WriteLine("Got new expensive order: " + o));
_observableBoundToSomeCtrl = expensiveOrders.Cache;

var expensiveOrdersFromBob = expensiveOrders
                             .Where(o => o.Name == "Bob");
expensiveOrdersFromBob.Updates.Subscribe
         (o => Console.WriteLine("Got new expensive order from Bob: " + o));
_observableBoundToSomeOtherCtrl = expensiveOrdersFromBob.Cache;

И так далее, идея это означает, что вы можете проецировать кэш на все более узкие подмножества и никогда не беспокоиться о том, что он не синхронизирован. Так в чем же тогда моя проблема?

  1. я задаюсь вопросом, Могу ли я покончить с этой дрянью, если RX внутренне обновит коллекции. Может быть, "проектировать" обновления с помощью выбора или что-то в этом роде?
  2. существует условие гонки, присущее шаблону repository-with-update, в котором я могу пропустить некоторые обновления во время построения новый кэш. Я думаю, что мне нужна какая-то последовательность, но это означало бы, что все мои t-объекты реализуют интерфейс ISequenceableItem. Есть ли лучший способ сделать это? RX великолепен, потому что он обрабатывает все потоки для вас. Я бы хотел воспользоваться этим.
4 4

4 ответа:

Вам не нужно будет запирать _cache внутри ProcessUpdate. Если ваш источник observable currentStream соблюдает рекомендации Rx, вы гарантированно будете находиться только в пределах одного вызова OnNext одновременно. В других словах вы не получите другое значение из потока, пока вы все еще обрабатываете Предыдущее значение.

Единственный надежный способ решить проблему вашей расы - это убедиться, что вы создали кэш до того, как updateStream начнет генерировать данные.

Возможно, вы захотите взглянуть на расширения для реактивных расширений (Rxx) . Я считаю, что Дэйв создал ряд утилит для привязки элементов управления пользовательского интерфейса к наблюдаемым данным. Документация скудна. Я не знаю, есть ли там что-нибудь для того, что вы делаете.

Проект OLinq в http://github.com/wasabii/OLinq предназначен для такого рода реактивного обновления, и ObservableView, я думаю, то, что вы ищете.

Взгляните на эти два проекта, которые достигают того, что вы хотите, хотя и разными способами:

Https://github.com/RolandPheasant/DynamicData

Https://bitbucket.org/mendelmonteiro/reactivetables [Отказ от ответственности: это мой проект]

Предположим, что у вас есть такое определение:

class SetOp<T>
{
    public T Value { get; private set; }
    public bool Include { get; private set; }
    public SetOp(T value, bool include)
    {
        Value = value;
        Include = include;
    }
}

Использование Observable.Scan и еще System.Collections.Immutable вы можете сделать что-то вроде этого:

IObservable<SetOp<int>> ops = ...;
IImmutableSet<int> empty = ImmutableSortedSet<int>.Empty;
var observableSet = ops
    .Scan(empty, (s, op) => op.Include ? s.Add(op.Value) : s.Remove(op.Value))
    .StartWith(empty);
Ключевым трюком здесь является использование неизменяемого типа коллекции: любой наблюдатель observableSet может делать все, что захочет, со значениями, которые ему навязывают, потому что они неизменны. Добавить его эффективно, потому что он повторно использует большую часть структуры данных набора между последовательными значениями.

Вот пример потока ops и соответствующий observableSet.

ops       observableSet
--------  ------------------
          {}
Add 7     {7}
Add 4     {4,7}
Add 5     {4,5,7}
Add 6     {4,5,6,7}
Remove 5  {4,6,7}
Add 8     {4,6,7,8}
Remove 4  {6,7,8}