Потокобезопасное свойство списка


Я хочу реализацию List<T> как свойство, которое может быть использовано потокобезопасно без каких-либо сомнений.

что-то вроде этого:

private List<T> _list;

private List<T> MyT
{
    get { // return a copy of _list; }
    set { _list = value; }
}

Кажется, мне все еще нужно вернуть копию (клонированную) коллекции, поэтому, если где-то мы повторяем коллекцию и в то же время коллекция установлена, то никаких исключений не возникает.

Как реализовать потокобезопасное свойство коллекции?

12 79

12 ответов:

Если вы нацелены на .Net 4 есть несколько вариантов в система.Коллекции.Одновременно пространство имен

вы могли бы использовать ConcurrentBag<T> в данном случае вместо List<T>

даже если он получил наибольшее количество голосов, обычно никто не может взять System.Collections.Concurrent.ConcurrentBag<T> Как потокобезопасная замена для System.Collections.Generic.List<T> как есть (Радек Стромский уже указывал на это) не заказывал.

но есть класс с именем System.Collections.Generic.SynchronizedCollection<T> это уже начиная с .NET 3.0 часть фреймворка, но это то, что хорошо скрыто в месте, где никто не ожидает, что это мало известно, и, вероятно, вы никогда не натыкались на него (по крайней мере, я никогда не делал).

SynchronizedCollection<T> компилируется в сборка

Я думаю, что сделать пример класса ThreadSafeList будет легко:

public class ThreadSafeList<T> : IList<T>
{
    protected List<T> _interalList = new List<T>();

    // Other Elements of IList implementation

    public IEnumerator<T> GetEnumerator()
    {
        return Clone().GetEnumerator();
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return Clone().GetEnumerator();
    }

    protected static object _lock = new object();

    public List<T> Clone()
    {
        List<T> newList = new List<T>();

        lock (_lock)
        {
            _interalList.ForEach(x => newList.Add(x));
        }

        return newList;
    }
}

вы просто клонируете список перед запросом перечислителя, и таким образом любое перечисление отрабатывает копию, которая не может быть изменена во время работы.

даже принятый ответ ConcurrentBag, я не думаю, что это реальная замена списка во всех случаях, так как комментарий Радека к ответу говорит: "ConcurrentBag-это неупорядоченная коллекция, поэтому в отличие от списка он не гарантирует порядок. Также вы не можете получить доступ к элементам по индексу".

Так что если вы используете .NET 4.0 или выше, обходной путь может быть использовать ConcurrentDictionary с целочисленными TKey и TValue как индекс массива, а значения массива. Это рекомендуемый способ замены списка в Pluralsight это C# параллельные коллекции курс. ConcurrentDictionary решает обе проблемы, упомянутые выше: доступ к индексам и порядок (мы не можем полагаться на порядок, поскольку это хэш-таблица под капотом, но текущая реализация .NET сохраняет порядок добавления элементов).

вы можете использовать:

var threadSafeArrayList = ArrayList.Synchronized(new ArrayList());

для создания потокобезопасного ArrayLsit

Если вы посмотрите на исходный код для списка T (https://referencesource.microsoft.com/#mscorlib/system/collections/generic/list.cs,c66df6f36c131877) Вы заметите, что там есть класс (который, конечно, внутренний-почему, Microsoft, почему?!?!) называется SynchronizedList Т. Я копировать вставить вот этот код:

   [Serializable()]
    internal class SynchronizedList : IList<T> {
        private List<T> _list;
        private Object _root;

        internal SynchronizedList(List<T> list) {
            _list = list;
            _root = ((System.Collections.ICollection)list).SyncRoot;
        }

        public int Count {
            get {
                lock (_root) { 
                    return _list.Count; 
                }
            }
        }

        public bool IsReadOnly {
            get {
                return ((ICollection<T>)_list).IsReadOnly;
            }
        }

        public void Add(T item) {
            lock (_root) { 
                _list.Add(item); 
            }
        }

        public void Clear() {
            lock (_root) { 
                _list.Clear(); 
            }
        }

        public bool Contains(T item) {
            lock (_root) { 
                return _list.Contains(item);
            }
        }

        public void CopyTo(T[] array, int arrayIndex) {
            lock (_root) { 
                _list.CopyTo(array, arrayIndex);
            }
        }

        public bool Remove(T item) {
            lock (_root) { 
                return _list.Remove(item);
            }
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() {
            lock (_root) { 
                return _list.GetEnumerator();
            }
        }

        IEnumerator<T> IEnumerable<T>.GetEnumerator() {
            lock (_root) { 
                return ((IEnumerable<T>)_list).GetEnumerator();
            }
        }

        public T this[int index] {
            get {
                lock(_root) {
                    return _list[index];
                }
            }
            set {
                lock(_root) {
                    _list[index] = value;
                }
            }
        }

        public int IndexOf(T item) {
            lock (_root) {
                return _list.IndexOf(item);
            }
        }

        public void Insert(int index, T item) {
            lock (_root) {
                _list.Insert(index, item);
            }
        }

        public void RemoveAt(int index) {
            lock (_root) {
                _list.RemoveAt(index);
            }
        }
    }

лично я думаю, что они знали лучшую реализацию с помощью SemaphoreSlim может быть создан, но не попал в него.

вы также можете использовать более примитивное

Monitor.Enter(lock);
Monitor.Exit(lock);

какой замок использует (см. Этот пост C# блокировка объекта, который переназначен в блоке блокировки).

Если вы ожидаете исключения в коде это не безопасно, но это позволяет вам сделать что-то вроде следующего:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Linq;

public class Something
{
    private readonly object _lock;
    private readonly List<string> _contents;

    public Something()
    {
        _lock = new object();

        _contents = new List<string>();
    }

    public Modifier StartModifying()
    {
        return new Modifier(this);
    }

    public class Modifier : IDisposable
    {
        private readonly Something _thing;

        public Modifier(Something thing)
        {
            _thing = thing;

            Monitor.Enter(Lock);
        }

        public void OneOfLotsOfDifferentOperations(string input)
        {
            DoSomethingWith(input);
        }

        private void DoSomethingWith(string input)
        {
            Contents.Add(input);
        }

        private List<string> Contents
        {
            get { return _thing._contents; }
        }

        private object Lock
        {
            get { return _thing._lock; }
        }

        public void Dispose()
        {
            Monitor.Exit(Lock);
        }
    }
}

public class Caller
{
    public void Use(Something thing)
    {
        using (var modifier = thing.StartModifying())
        {
            modifier.OneOfLotsOfDifferentOperations("A");
            modifier.OneOfLotsOfDifferentOperations("B");

            modifier.OneOfLotsOfDifferentOperations("A");
            modifier.OneOfLotsOfDifferentOperations("A");
            modifier.OneOfLotsOfDifferentOperations("A");
        }
    }
}

одна из приятных вещей в этом заключается в том, что вы получите блокировку на время выполнения серии операций (а не блокировку в каждой операции). Что означает, что выход должен выходить в правильных кусках (мое использование этого получало некоторый выход на экран из внешнего процесса)

мне очень нравится простота + прозрачность ThreadSafeList + что делает важный бит в остановке сбоев

Я считаю _list.ToList() сделает вам копию. Вы также можете запросить его, если вам нужно, например :

_list.Select("query here").ToList(); 

в любом случае, msdn говорит, что это действительно копия, а не просто ссылка. О, И да, вам нужно будет заблокировать метод set, как указывали другие.

похоже, что многие из людей, обнаружив это, хотят потокобезопасную индексированную коллекцию динамического размера. Самое близкое и простое, что я знаю, что будет.

System.Collections.Concurrent.ConcurrentDictionary<int, YourDataType>

Это потребует от вас убедиться, что ваш ключ правильно инкриминируется, если вы хотите нормальное поведение индексирования. Если вы будете осторожны .count может быть достаточным в качестве ключа для любых новых пар значений ключей, которые вы добавляете.

вот класс, который вы просили:

namespace AI.Collections {
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Linq;
    using System.Runtime.Serialization;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;

    /// <summary>
    ///     Just a simple thread safe collection.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <value>Version 1.5</value>
    /// <remarks>TODO replace locks with AsyncLocks</remarks>
    [DataContract( IsReference = true )]
    public class ThreadSafeList<T> : IList<T> {
        /// <summary>
        ///     TODO replace the locks with a ReaderWriterLockSlim
        /// </summary>
        [DataMember]
        private readonly List<T> _items = new List<T>();

        public ThreadSafeList( IEnumerable<T> items = null ) { this.Add( items ); }

        public long LongCount {
            get {
                lock ( this._items ) {
                    return this._items.LongCount();
                }
            }
        }

        public IEnumerator<T> GetEnumerator() { return this.Clone().GetEnumerator(); }

        IEnumerator IEnumerable.GetEnumerator() { return this.GetEnumerator(); }

        public void Add( T item ) {
            if ( Equals( default( T ), item ) ) {
                return;
            }
            lock ( this._items ) {
                this._items.Add( item );
            }
        }

        public Boolean TryAdd( T item ) {
            try {
                if ( Equals( default( T ), item ) ) {
                    return false;
                }
                lock ( this._items ) {
                    this._items.Add( item );
                    return true;
                }
            }
            catch ( NullReferenceException ) { }
            catch ( ObjectDisposedException ) { }
            catch ( ArgumentNullException ) { }
            catch ( ArgumentOutOfRangeException ) { }
            catch ( ArgumentException ) { }
            return false;
        }

        public void Clear() {
            lock ( this._items ) {
                this._items.Clear();
            }
        }

        public bool Contains( T item ) {
            lock ( this._items ) {
                return this._items.Contains( item );
            }
        }

        public void CopyTo( T[] array, int arrayIndex ) {
            lock ( this._items ) {
                this._items.CopyTo( array, arrayIndex );
            }
        }

        public bool Remove( T item ) {
            lock ( this._items ) {
                return this._items.Remove( item );
            }
        }

        public int Count {
            get {
                lock ( this._items ) {
                    return this._items.Count;
                }
            }
        }

        public bool IsReadOnly { get { return false; } }

        public int IndexOf( T item ) {
            lock ( this._items ) {
                return this._items.IndexOf( item );
            }
        }

        public void Insert( int index, T item ) {
            lock ( this._items ) {
                this._items.Insert( index, item );
            }
        }

        public void RemoveAt( int index ) {
            lock ( this._items ) {
                this._items.RemoveAt( index );
            }
        }

        public T this[ int index ] {
            get {
                lock ( this._items ) {
                    return this._items[ index ];
                }
            }
            set {
                lock ( this._items ) {
                    this._items[ index ] = value;
                }
            }
        }

        /// <summary>
        ///     Add in an enumerable of items.
        /// </summary>
        /// <param name="collection"></param>
        /// <param name="asParallel"></param>
        public void Add( IEnumerable<T> collection, Boolean asParallel = true ) {
            if ( collection == null ) {
                return;
            }
            lock ( this._items ) {
                this._items.AddRange( asParallel
                                              ? collection.AsParallel().Where( arg => !Equals( default( T ), arg ) )
                                              : collection.Where( arg => !Equals( default( T ), arg ) ) );
            }
        }

        public Task AddAsync( T item ) {
            return Task.Factory.StartNew( () => { this.TryAdd( item ); } );
        }

        /// <summary>
        ///     Add in an enumerable of items.
        /// </summary>
        /// <param name="collection"></param>
        public Task AddAsync( IEnumerable<T> collection ) {
            if ( collection == null ) {
                throw new ArgumentNullException( "collection" );
            }

            var produce = new TransformBlock<T, T>( item => item, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount } );

            var consume = new ActionBlock<T>( action: async obj => await this.AddAsync( obj ), dataflowBlockOptions: new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount } );
            produce.LinkTo( consume );

            return Task.Factory.StartNew( async () => {
                collection.AsParallel().ForAll( item => produce.SendAsync( item ) );
                produce.Complete();
                await consume.Completion;
            } );
        }

        /// <summary>
        ///     Returns a new copy of all items in the <see cref="List{T}" />.
        /// </summary>
        /// <returns></returns>
        public List<T> Clone( Boolean asParallel = true ) {
            lock ( this._items ) {
                return asParallel
                               ? new List<T>( this._items.AsParallel() )
                               : new List<T>( this._items );
            }
        }

        /// <summary>
        ///     Perform the <paramref name="action" /> on each item in the list.
        /// </summary>
        /// <param name="action">
        ///     <paramref name="action" /> to perform on each item.
        /// </param>
        /// <param name="performActionOnClones">
        ///     If true, the <paramref name="action" /> will be performed on a <see cref="Clone" /> of the items.
        /// </param>
        /// <param name="asParallel">
        ///     Use the <see cref="ParallelQuery{TSource}" /> method.
        /// </param>
        /// <param name="inParallel">
        ///     Use the
        ///     <see
        ///         cref="Parallel.ForEach{TSource}(System.Collections.Generic.IEnumerable{TSource},System.Action{TSource})" />
        ///     method.
        /// </param>
        public void ForEach( Action<T> action, Boolean performActionOnClones = true, Boolean asParallel = true, Boolean inParallel = false ) {
            if ( action == null ) {
                throw new ArgumentNullException( "action" );
            }
            var wrapper = new Action<T>( obj => {
                try {
                    action( obj );
                }
                catch ( ArgumentNullException ) {
                    //if a null gets into the list then swallow an ArgumentNullException so we can continue adding
                }
            } );
            if ( performActionOnClones ) {
                var clones = this.Clone( asParallel: asParallel );
                if ( asParallel ) {
                    clones.AsParallel().ForAll( wrapper );
                }
                else if ( inParallel ) {
                    Parallel.ForEach( clones, wrapper );
                }
                else {
                    clones.ForEach( wrapper );
                }
            }
            else {
                lock ( this._items ) {
                    if ( asParallel ) {
                        this._items.AsParallel().ForAll( wrapper );
                    }
                    else if ( inParallel ) {
                        Parallel.ForEach( this._items, wrapper );
                    }
                    else {
                        this._items.ForEach( wrapper );
                    }
                }
            }
        }

        /// <summary>
        ///     Perform the <paramref name="action" /> on each item in the list.
        /// </summary>
        /// <param name="action">
        ///     <paramref name="action" /> to perform on each item.
        /// </param>
        /// <param name="performActionOnClones">
        ///     If true, the <paramref name="action" /> will be performed on a <see cref="Clone" /> of the items.
        /// </param>
        /// <param name="asParallel">
        ///     Use the <see cref="ParallelQuery{TSource}" /> method.
        /// </param>
        /// <param name="inParallel">
        ///     Use the
        ///     <see
        ///         cref="Parallel.ForEach{TSource}(System.Collections.Generic.IEnumerable{TSource},System.Action{TSource})" />
        ///     method.
        /// </param>
        public void ForAll( Action<T> action, Boolean performActionOnClones = true, Boolean asParallel = true, Boolean inParallel = false ) {
            if ( action == null ) {
                throw new ArgumentNullException( "action" );
            }
            var wrapper = new Action<T>( obj => {
                try {
                    action( obj );
                }
                catch ( ArgumentNullException ) {
                    //if a null gets into the list then swallow an ArgumentNullException so we can continue adding
                }
            } );
            if ( performActionOnClones ) {
                var clones = this.Clone( asParallel: asParallel );
                if ( asParallel ) {
                    clones.AsParallel().ForAll( wrapper );
                }
                else if ( inParallel ) {
                    Parallel.ForEach( clones, wrapper );
                }
                else {
                    clones.ForEach( wrapper );
                }
            }
            else {
                lock ( this._items ) {
                    if ( asParallel ) {
                        this._items.AsParallel().ForAll( wrapper );
                    }
                    else if ( inParallel ) {
                        Parallel.ForEach( this._items, wrapper );
                    }
                    else {
                        this._items.ForEach( wrapper );
                    }
                }
            }
        }
    }
}

в основном, если вы хотите перечислить безопасно, вам нужно использовать блокировку.

пожалуйста, обратитесь к MSDN на этом. http://msdn.microsoft.com/en-us/library/6sh2ey19.aspx

вот часть MSDN, которая может вас заинтересовать:

открытые статические (shared в Visual Basic) члены этого типа являются потокобезопасными. Члены экземпляров не гарантируется потокобезопасность.

список может поддерживать несколько читателей одновременно, пока коллекция не изменяется. Перечисление через коллекцию по своей сути не является потокобезопасной процедурой. В редких случаях, когда перечисление конкурирует с одним или несколькими правами доступа на запись, единственным способом обеспечения потокобезопасности является блокировка коллекции на протяжении всего перечисления. Чтобы разрешить доступ к коллекции нескольким потокам для чтения и записи, необходимо реализовать собственную синхронизацию.

использовать lock заявление для этого. (читать здесь для получения дополнительной информации.)

private List<T> _list;

private List<T> MyT
{
    get { return _list; }
    set
    {
        //Lock so only one thread can change the value at any given time.
        lock (_list)
        {
            _list = value;
        }
    }
}

к вашему сведению, это, вероятно, не совсем то, что вы просите - вы, вероятно, хотите заблокировать дальше в своем коде, но я не могу этого предположить. Взгляните на lock сайта и адаптировать его к конкретной ситуации.

Если вам нужно, вы могли бы lock и в get и set блок с помощью _list переменная, которая сделает его таким, что чтение / запись может не происходят одновременно.