Должен ли я использовать список или просто действие, чтобы отслеживать подписчиков IObservable?


Я реализую интерфейс IObservable<T> На некоторых классах. Я использовал Reflector , чтобы выяснить, как это обычно делается в Rx. Относительно того, как наблюдаемый отслеживает своих подписчиков и уведомляет их с помощью своего метода OnNext, я наткнулся на код, подобный этому:

private List<Observer<T>> observers;

// subscribe a new observer:
public IDisposable Subscribe(IObserver<T> observer)
{
    observers.Add(observer);
    ...
}

// trigger all observers' OnNext method:
...
foreach (IObserver<T> observer in observers)
{
    observer.OnNext(value);
}

Поскольку все делегаты являются мультиспециализированными, не может ли это быть упрощено до:

Action<T> observers;

// subscribe observer:
public IDisposable Subscribe(IObserver<T> observer)
{
    observers += observer.OnNext;
    ...
}

// trigger observers' OnNext:
...
observers(value);

Или есть определенные преимущества первого подхода (производительность, проблемы потоковой передачи / параллелизма, ...)?

2 3

2 ответа:

В общем, вызов делегатов по отдельности дает вам больше контроля над поведением:

  • Если один делегат вызывает исключение, вы можете продолжать вызывать другие, например, или удалить неисправный делегат из списка.
  • Если вы хотите вызвать делегатов параллельно, это действительно легко.
  • Если вам нужно вызвать их в определенном порядке, вы можете легко гарантировать правильный порядок (я не уверен, что порядок вызовов делегатов многоадресной рассылки определен).

Обычно вы не реализуете IObservable<T> самостоятельно, вы возвращаете IObservable<T> из метода, используя один из методов генерации (например, Observable.Create).

Однако, если вы собираетесь реализовать интерфейс самостоятельно, вы должны обернуть внутренний Subject<T> , который будет обрабатывать все проблемы параллелизма для вас:

public class CustomObservable<T> : IObservable<T>
{
    private Subject<T> subject = new Subject<T>();

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return subject.Subscribe(observer);
    }

    private void EmitValue(T value)
    {
        subject.OnNext(value);
    }
}

NB: Если вы решили придерживаться делегата (по какой-либо причине), по крайней мере убедитесь, что вы отписываетесь в своем IDisposable возвращаемом значении:

observers += observer.OnNext;
return Disposable.Create(() => observers -= observer.OnNext);