Это CorrelationManager.LogicalOperationStack совместим с Parallel.Для, задачи, потоки и т. д


Пожалуйста, смотрите этот вопрос для получения справочной информации:

Как задачи в параллельной библиотеке задач влияют на ActivityID?

Этот вопрос задает вопрос о том, как задачи влияют на трассировку .CorrelationManager.ActivityId . @Greg Samson ответил на свой собственный вопрос с помощью тестовой программы, показывающей, что ActivityId надежен в контексте задач. Программа испытаний устанавливает значение activityid в начале делегировать задачу, проживающих в имитации работы, а затем проверяет идентификатор activityid в конец, чтобы убедиться, что это то же самое значение (т. е. что оно не было изменено другим потоком). Программа работает успешно.

Исследуя другие варианты "контекста" для потоковой обработки, задач и параллельных операций (в конечном счете, чтобы обеспечить лучший контекст для ведения журнала), я столкнулся со странной проблемой с трассировкой.CorrelationManager.LogicalOperationStack (мне все равно было странно). Я скопировал свой "ответ" на его вопрос ниже.

Я думаю, что это адекватно описывает проблему, с которой я столкнулся (след.CorrelationManager.LogicalOperationStack, по-видимому, повреждается - или что - то еще-при использовании в контексте Parallel.Ибо, но только если параллель.Для себя заключена в логическую операцию).

Вот мои вопросы:

  1. Надо Проследить.CorrelationManager.LogicalOperationStack можно использовать с Parallel.За что? Если да, то должно ли это иметь значение, если логическая операция уже действует с Параллельный.Для начала?

  2. Есть ли "правильный" способ использовать LogicalOperationStack с параллельным.За что? Могу ли я закодировать этот пример программы по-другому, чтобы он "работал"? Под "работами" я подразумеваю, что LogicalOperationStack всегда имеет ожидаемое количество записей, и сами записи являются ожидаемыми записями.

Я провел некоторые дополнительные тесты с использованием потоков и потоков ThreadPool, но мне придется вернуться и повторить эти тесты, чтобы увидеть, если я запустил в схожие проблемы.

Я скажу, что похоже, что Task / Parallel threads и ThreadPool threads действительно "наследуют" трассировку.CorrelationManager.ActivityId и след.CorrelationManager.LogicalOperationStack значения из родительского потока. Это ожидается, поскольку эти значения хранятся в CorrelationManager с помощью метода LogicalSetDataCallContext (в отличие от метода SetData).

Еще раз, пожалуйста, вернитесь к этому вопросу, чтобы получить исходный контекст для "ответ", который я разместил ниже:

Как задачи в параллельной библиотеке задач влияют на ActivityID?

Смотрите также этот аналогичный вопрос (на который до сих пор не было ответа) на форуме параллельных расширений Microsoft:

Http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[НАЧАТЬ ВСТАВКУ]

Пожалуйста, простите, что я публикую это в качестве ответа, поскольку это не совсем ответ ваш вопрос, однако, связан с вашим вопросом, поскольку он имеет дело с поведением CorrelationManager и потоками / задачами / и т. д. Я рассматривал использование методов CorrelationManager LogicalOperationStackStartLogicalOperation/StopLogicalOperation) для обеспечения дополнительного контекста в многопоточных сценариях.

Я взял ваш пример и немного изменил его, чтобы добавить возможность выполнять работу параллельно, используя Parallel.Для. Кроме того, я использую StartLogicalOperation/StopLogicalOperation для скобок (внутри) DoLongRunningWork. Концептуально DoLongRunningWork делает что-то подобное каждый раз выполняется:
DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation
Я обнаружил, что если я добавляю эти логические операции в ваш код (более или менее как есть), все логические операции остаются синхронизированными (всегда ожидаемое число операций в стеке и значения операций в стеке всегда ожидаются). В некоторых из моих собственных тестов я обнаружил, что это не всегда так. Стек логических операций был "поврежден". Лучшее объяснение, которое я мог бы придумать, заключается в том, что" слияние" информация CallContext в контексте "родительского" потока при выходе из" дочернего "потока приводила к тому, что" старая "информация контекста дочернего потока (логическая операция)" наследуется " другим дочерним потоком-родителем.

Проблема может быть также связана с тем, что параллельна.For, по-видимому, использует основной поток (по крайней мере, в коде примера, как написано) в качестве одного из "рабочих потоков" (или как они должны называться в параллельном домене). Всякий раз, когда Долонграннингворк выполняется, новая логическая операция запускается (в начале) и останавливается (в конце) (то есть помещается в LogicalOperationStack и выскакивает из него). Если основной поток уже имеет логическую операцию в действии и если DoLongRunningWork выполняется в основном потоке, то запускается новая логическая операция, поэтому LogicalOperationStack основного потока теперь имеет две операции. Любые последующие выполнения DoLongRunningWork (до тех пор, пока выполняется эта" итерация " DoLongRunningWork на главном потоке) будет (по-видимому) наследовать LogicalOperationStack главного потока (который теперь имеет две операции над ним, а не только одну ожидаемую операцию).

Мне потребовалось много времени, чтобы понять, почему поведение LogicalOperationStack в моем примере отличалось от моей модифицированной версии вашего примера. Наконец я увидел, что в моем коде я заключил всю программу в скобки в логической операции, тогда как в моей модифицированной версии вашей тестовой программы я этого не сделал. То подразумевается, что в моей тестовой программе каждый раз, когда выполнялась моя "работа" (аналогично DoLongRunningWork), уже существовала логическая операция. В моей модифицированной версии вашей тестовой программы я не заключил всю программу в скобки в логической операции.

Итак, когда я модифицировал вашу тестовую программу, чтобы заключить всю программу в скобки в логической операции, и если я использую параллель.Ибо я столкнулся с точно такой же проблемой.

Используя приведенную выше концептуальную модель, это позволит выполнить успешно:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

В то время как это в конечном итоге будет утверждать из-за явно несинхронизированного LogicalOperationStack:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

Вот мой пример программы. Он похож на ваш в том, что он имеет метод DoLongRunningWork, который манипулирует ActivityId, а также LogicalOperationStack. У меня также есть два вкуса пинка Долонграннинга. Один вкус использует задачи, которые используются параллельно.Для. Каждый аромат также может быть выполнен таким образом, что вся распараллеленная операция является заключен в логическую операцию или нет. Таким образом, существует в общей сложности 4 способа выполнения параллельной операции. Чтобы попробовать каждый из них, просто раскомментируйте желаемое "использование...- метод, перекомпиляция и запуск. UseTasks, UseTasks(true), и UseParallelFor должны все бежать к завершению. UseParallelFor(true) будет утверждать в какой-то момент, потому что LogicalOperationStack не имеет ожидаемого количества записей.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

Весь этот вопрос if LogicalOperationStack может быть использован с Parallel.Для (и/или других потоковых/задачных конструкций) или как его можно использовать, вероятно, заслуживает своего собственного вопроса. Может быть, я напишу вопрос. В то же время, я хотел бы знать, есть ли у вас какие-либо мысли по этому поводу (или, я хотел бы знать, рассматривали ли вы возможность использования LogicalOperationStack, так как ActivityId кажется безопасным).

[КОНЕЦ ВСТАВКИ]

Есть ли у кого-нибудь мысли по этому поводу?
2 9

2 ответа:

[Начать Обновление]

Я также задал этот вопрос на параллельных расширениях Microsoft для форума поддержки .Net и в конечном итоге получил ответ от Стивена Тоуба. Оказывается, в LogicalCallContext есть ошибка, которая приводит к повреждению LogicalOperationStack. Есть также хорошее описание (в продолжении Стивена к ответу, который я сделал на его ответ), которое дает краткий обзор того, как параллельна.Для работ, касающихся раздача заданий и почему это делает параллель.Для восприимчивых к клопу.

В своем ответе ниже я предполагаю, что LogicalOperationStack не совместим с Parallel.Потому что параллельно.For использует основной поток в качестве одного из" рабочих " потоков. Исходя из объяснений Стивена, мои предположения были неверны. Параллельный.For использует основной поток как один из" рабочих " потоков, но он не просто используется "как есть". Первая задача выполняется в основном потоке, но выполняется таким образом, что она это как если бы он был запущен на новом потоке. Прочитайте описание Стивена для получения дополнительной информации.

[Завершить Обновление]

Из того, что я могу сказать, ответ таков:

ActivityId и LogicalOperationStack хранятся через CallContext.LogicalSetData . Это означает, что эти значения будут "перетекать" в любые "дочерние" потоки. Это довольно круто, как вы могли бы, например, установить ActivityId в точке входа в многопоточный сервер (скажем, вызов службы) и все потоки то, что в конечном счете начинается с этой точки входа, может быть частью той же самой"деятельности". Аналогично, логические операции (через LogicalOperationStack) также поступают в дочерние потоки.

Что касается трассировки.CorrelationManager.ActivityId:

ActivityId, кажется, совместим со всеми потоковыми моделями, с которыми я его тестировал: с использованием потоков напрямую, с использованием ThreadPool, с использованием задач, с использованием Parallel.*. Во всех случаях ActivityId имеет ожидаемое значение.

В отношении След.CorrelationManager.LogicalOperationStack:

LogicalOperationStack, по-видимому, совместим с большинством потоковых моделей, но не с параллельными.*. Используя потоки напрямую, ThreadPool и задачи, LogicalOperationStack (как манипулируется в примере кода, представленном в моем вопросе) сохраняет свою целостность. Во всех случаях содержимое пакета LogicalOperationStack соответствует ожидаемому.

LogicalOperationStack не совместим с Parallel.Для. Если логическая операция "в действии", то есть, если вы вызвали CorrelationManager.StartLogicalOperation, до начала параллели.* операция, а затем вы начинаете новую логическую операцию в контексте Paralle.* (то есть в делегате), то LogicalOperationStack будет поврежден. (Я должен сказать, что он, вероятно, будет испорчен. Параллельный.* может не создавать никаких дополнительных потоков, что означает, что LogicalOperationStack будет безопасным).

Проблема проистекает из того, что параллельна.* использует основные поток (или, возможно, более правильно, поток, который запускает параллельную операцию) как один из своих "рабочих" потоков. Это означает, что по мере запуска и остановки" логических операций" в "рабочем" потоке, который является тем же самым, что и "главный" поток, изменяется LogicalOperationStack "главного" потока. Даже если вызывающий код (то есть делегат) поддерживает стек правильно (гарантируя, что каждая StartLogicalOperation "остановлена" с соответствующей StopLogicalOperation)," основные " потоки стек модифицирован. В конечном счете кажется (по крайней мере, мне), что LogicalOperationStack "основного" потока существенно модифицируется двумя различными "логическими" потоками: "основным" потоком и "рабочим" потоком, которые оба оказываются одним и тем же потоком.

Я не знаю глубинных особенностей того, почему это не работает (по крайней мере, как я ожидал бы, что это сработает). Мое лучшее предположение состоит в том, что каждый раз, когда делегат выполняется в потоке (это не то же самое, что основной поток), поток "наследует" текущее состояние LogicalOperationStack основного потока. Если делегат в данный момент выполняется в основном потоке (повторно используется как рабочий поток) и запустил логическую операцию, то один (или более) из других распараллеленных делегатов "наследует" LogicalOperationStack основного потока, который теперь имеет одну (или более) новых логических операций в действии!

FWIW, я реализовал (в основном для тестирования, я фактически не использую его в данный момент), следующий "логический стек" должен имитировать LogicalOperationStack, но делать это так, чтобы он работал параллельно.* Не стесняйтесь попробовать его и / или использовать. Для проверки замените вызовы на

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation

В примере кода из моего исходного вопроса с вызовами

LogicalOperation.OperationStack.Push()/Pop().


//OperationStack.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Runtime.Remoting.Messaging;

namespace LogicalOperation
{
  public static class OperationStack
  {
    private const string OperationStackSlot = "OperationStackSlot";

    public static IDisposable Push(string operation)
    {
      OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      OperationStackItem op = new OperationStackItem(parent, operation);
      CallContext.LogicalSetData(OperationStackSlot, op);
      return op;
    }

    public static object Pop()
    {
      OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;

      if (current != null)
      {
        CallContext.LogicalSetData(OperationStackSlot, current.Parent);
        return current.Operation;
      }
      else
      {
        CallContext.FreeNamedDataSlot(OperationStackSlot);
      }
      return null;
    }

    public static object Peek()
    {
      OperationStackItem top = Top();
      return top != null ? top.Operation : null;
    }

    internal static OperationStackItem Top()
    {
      OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      return top;
    }

    public static IEnumerable<object> Operations()
    {
      OperationStackItem current = Top();
      while (current != null)
      {
        yield return current.Operation;
        current = current.Parent;
      }
    }

    public static int Count
    {
      get
      {
        OperationStackItem top = Top();
        return top == null ? 0 : top.Depth;
      }
    }

    public static IEnumerable<string> OperationStrings()
    {
      foreach (object o in Operations())
      {
        yield return o.ToString();
      }
    }
  }
}


//OperationStackItem.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace LogicalOperation
{
  public class OperationStackItem : IDisposable
  {
    private OperationStackItem parent = null;
    private object operation;
    private int depth;
    private bool disposed = false;

    internal OperationStackItem(OperationStackItem parentOperation, object operation)
    {
      parent = parentOperation;
      this.operation = operation;
      depth = parent == null ? 1 : parent.Depth + 1;
    }

    internal object Operation { get { return operation; } }
    internal int Depth { get { return depth; } }

    internal OperationStackItem Parent { get { return parent; } }

    public override string ToString()
    {
      return operation != null ? operation.ToString() : "";
    }

    #region IDisposable Members

    public void Dispose()
    {
      if (disposed) return;

      OperationStack.Pop();

      disposed = true;
    }

    #endregion
  }
}

Это было вдохновлено объектами scope, описанными Брентом Вандермейдом здесь: http://www.dnrtv.com/default.aspx?showNum=114

Вы можете использовать этот класс следующим образом:

public void MyFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFunc"))
  {
    MyOtherFunc();
  }
}

public void MyOtherFunc()
{
  using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
  {
    MyFinalFunc();
  }
}

public void MyFinalFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
  {
    Console.WriteLine("Hello");
  }
}

Я исследовал способ иметь логический стек, который должен легко работать в приложении, использующем TPL в большой степени. Я решил использовать LogicalOperationStack, потому что он делал все, что мне было нужно, без изменения существующего кода. Но потом я прочитал об ошибке в LogicalCallContext:

Https://connect.microsoft.com/VisualStudio/feedback/details/609929/logicalcallcontext-clone-bug-when-correlationmanager-slot-is-present

Поэтому я попытался найти обходной путь для эта ошибка, и я думаю, что я получил его работу для TPL (спасибо ILSpy):

public static class FixLogicalOperationStackBug
{
    private static bool _fixed = false;

    public static void Fix()
    {
        if (!_fixed)
        {
            _fixed = true;

            Type taskType = typeof(Task);
            var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback");
            ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null);

            ContextCallback injectedCallback = new ContextCallback(obj =>
            {
                // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned
                CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack);
                s_ecCallback(obj);
            });

            s_ecCallbackField.SetValue(null, injectedCallback);
        }
    }
}