Это CorrelationManager.LogicalOperationStack совместим с Parallel.Для, задачи, потоки и т. д
Пожалуйста, смотрите этот вопрос для получения справочной информации:
Как задачи в параллельной библиотеке задач влияют на ActivityID?
Этот вопрос задает вопрос о том, как задачи влияют на трассировку .CorrelationManager.ActivityId . @Greg Samson ответил на свой собственный вопрос с помощью тестовой программы, показывающей, что ActivityId надежен в контексте задач. Программа испытаний устанавливает значение activityid в начале делегировать задачу, проживающих в имитации работы, а затем проверяет идентификатор activityid в конец, чтобы убедиться, что это то же самое значение (т. е. что оно не было изменено другим потоком). Программа работает успешно.Исследуя другие варианты "контекста" для потоковой обработки, задач и параллельных операций (в конечном счете, чтобы обеспечить лучший контекст для ведения журнала), я столкнулся со странной проблемой с трассировкой.CorrelationManager.LogicalOperationStack (мне все равно было странно). Я скопировал свой "ответ" на его вопрос ниже.
Я думаю, что это адекватно описывает проблему, с которой я столкнулся (след.CorrelationManager.LogicalOperationStack, по-видимому, повреждается - или что - то еще-при использовании в контексте Parallel.Ибо, но только если параллель.Для себя заключена в логическую операцию).
Вот мои вопросы:
-
Надо Проследить.CorrelationManager.LogicalOperationStack можно использовать с Parallel.За что? Если да, то должно ли это иметь значение, если логическая операция уже действует с Параллельный.Для начала?
-
Есть ли "правильный" способ использовать LogicalOperationStack с параллельным.За что? Могу ли я закодировать этот пример программы по-другому, чтобы он "работал"? Под "работами" я подразумеваю, что LogicalOperationStack всегда имеет ожидаемое количество записей, и сами записи являются ожидаемыми записями.
Я провел некоторые дополнительные тесты с использованием потоков и потоков ThreadPool, но мне придется вернуться и повторить эти тесты, чтобы увидеть, если я запустил в схожие проблемы.
Я скажу, что похоже, что Task / Parallel threads и ThreadPool threads действительно "наследуют" трассировку.CorrelationManager.ActivityId и след.CorrelationManager.LogicalOperationStack значения из родительского потока. Это ожидается, поскольку эти значения хранятся в CorrelationManager с помощью метода LogicalSetDataCallContext (в отличие от метода SetData).
Еще раз, пожалуйста, вернитесь к этому вопросу, чтобы получить исходный контекст для "ответ", который я разместил ниже:
Как задачи в параллельной библиотеке задач влияют на ActivityID?
Смотрите также этот аналогичный вопрос (на который до сих пор не было ответа) на форуме параллельных расширений Microsoft:
[НАЧАТЬ ВСТАВКУ]
Пожалуйста, простите, что я публикую это в качестве ответа, поскольку это не совсем ответ ваш вопрос, однако, связан с вашим вопросом, поскольку он имеет дело с поведением CorrelationManager и потоками / задачами / и т. д. Я рассматривал использование методов CorrelationManager LogicalOperationStack
(и StartLogicalOperation/StopLogicalOperation
) для обеспечения дополнительного контекста в многопоточных сценариях.
StartLogicalOperation/StopLogicalOperation
для скобок (внутри) DoLongRunningWork
. Концептуально DoLongRunningWork
делает что-то подобное каждый раз выполняется:
DoLongRunningWork
StartLogicalOperation
Thread.Sleep(3000)
StopLogicalOperation
Я обнаружил, что если я добавляю эти логические операции в ваш код (более или менее как есть), все логические операции остаются синхронизированными (всегда ожидаемое число операций в стеке и значения операций в стеке всегда ожидаются).
В некоторых из моих собственных тестов я обнаружил, что это не всегда так. Стек логических операций был "поврежден". Лучшее объяснение, которое я мог бы придумать, заключается в том, что" слияние" информация CallContext в контексте "родительского" потока при выходе из" дочернего "потока приводила к тому, что" старая "информация контекста дочернего потока (логическая операция)" наследуется " другим дочерним потоком-родителем.
Проблема может быть также связана с тем, что параллельна.For, по-видимому, использует основной поток (по крайней мере, в коде примера, как написано) в качестве одного из "рабочих потоков" (или как они должны называться в параллельном домене). Всякий раз, когда Долонграннингворк выполняется, новая логическая операция запускается (в начале) и останавливается (в конце) (то есть помещается в LogicalOperationStack и выскакивает из него). Если основной поток уже имеет логическую операцию в действии и если DoLongRunningWork выполняется в основном потоке, то запускается новая логическая операция, поэтому LogicalOperationStack основного потока теперь имеет две операции. Любые последующие выполнения DoLongRunningWork (до тех пор, пока выполняется эта" итерация " DoLongRunningWork на главном потоке) будет (по-видимому) наследовать LogicalOperationStack главного потока (который теперь имеет две операции над ним, а не только одну ожидаемую операцию).
Мне потребовалось много времени, чтобы понять, почему поведение LogicalOperationStack в моем примере отличалось от моей модифицированной версии вашего примера. Наконец я увидел, что в моем коде я заключил всю программу в скобки в логической операции, тогда как в моей модифицированной версии вашей тестовой программы я этого не сделал. То подразумевается, что в моей тестовой программе каждый раз, когда выполнялась моя "работа" (аналогично DoLongRunningWork), уже существовала логическая операция. В моей модифицированной версии вашей тестовой программы я не заключил всю программу в скобки в логической операции.Итак, когда я модифицировал вашу тестовую программу, чтобы заключить всю программу в скобки в логической операции, и если я использую параллель.Ибо я столкнулся с точно такой же проблемой.
Используя приведенную выше концептуальную модель, это позволит выполнить успешно:
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
В то время как это в конечном итоге будет утверждать из-за явно несинхронизированного LogicalOperationStack:
StartLogicalOperation
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
StopLogicalOperation
Вот мой пример программы. Он похож на ваш в том, что он имеет метод DoLongRunningWork, который манипулирует ActivityId, а также LogicalOperationStack. У меня также есть два вкуса пинка Долонграннинга. Один вкус использует задачи, которые используются параллельно.Для. Каждый аромат также может быть выполнен таким образом, что вся распараллеленная операция является заключен в логическую операцию или нет. Таким образом, существует в общей сложности 4 способа выполнения параллельной операции. Чтобы попробовать каждый из них, просто раскомментируйте желаемое "использование...- метод, перекомпиляция и запуск. UseTasks
, UseTasks(true)
, и UseParallelFor
должны все бежать к завершению. UseParallelFor(true)
будет утверждать в какой-то момент, потому что LogicalOperationStack не имеет ожидаемого количества записей.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CorrelationManagerParallelTest
{
class Program
{
static void Main(string[] args)
{
//UseParallelFor(true) will assert because LogicalOperationStack will not have expected
//number of entries, all others will run to completion.
UseTasks(); //Equivalent to original test program with only the parallelized
//operation bracketed in logical operation.
////UseTasks(true); //Bracket entire UseTasks method in logical operation
////UseParallelFor(); //Equivalent to original test program, but use Parallel.For
//rather than Tasks. Bracket only the parallelized
//operation in logical operation.
////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
private static int mainThreadUsedInDelegate = 0;
// baseCount is the expected number of entries in the LogicalOperationStack
// at the time that DoLongRunningWork starts. If the entire operation is bracketed
// externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise,
// it will be 0.
private static void DoLongRunningWork(int baseCount)
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
mainThreadUsedInDelegate++;
}
}
Guid lo1 = Guid.NewGuid();
Trace.CorrelationManager.StartLogicalOperation(lo1);
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
//This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
//in effect when the Parallel.For operation was started.
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
Trace.CorrelationManager.StopLogicalOperation();
}
private static void UseTasks(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
private static void UseParallelFor(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Parallel.For(0, totalThreads, i =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
});
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
}
}
Весь этот вопрос if LogicalOperationStack может быть использован с Parallel.Для (и/или других потоковых/задачных конструкций) или как его можно использовать, вероятно, заслуживает своего собственного вопроса. Может быть, я напишу вопрос. В то же время, я хотел бы знать, есть ли у вас какие-либо мысли по этому поводу (или, я хотел бы знать, рассматривали ли вы возможность использования LogicalOperationStack, так как ActivityId кажется безопасным).
[КОНЕЦ ВСТАВКИ]
Есть ли у кого-нибудь мысли по этому поводу?2 ответа:
[Начать Обновление]
Я также задал этот вопрос на параллельных расширениях Microsoft для форума поддержки .Net и в конечном итоге получил ответ от Стивена Тоуба. Оказывается, в LogicalCallContext есть ошибка, которая приводит к повреждению LogicalOperationStack. Есть также хорошее описание (в продолжении Стивена к ответу, который я сделал на его ответ), которое дает краткий обзор того, как параллельна.Для работ, касающихся раздача заданий и почему это делает параллель.Для восприимчивых к клопу.
В своем ответе ниже я предполагаю, что LogicalOperationStack не совместим с Parallel.Потому что параллельно.For использует основной поток в качестве одного из" рабочих " потоков. Исходя из объяснений Стивена, мои предположения были неверны. Параллельный.For использует основной поток как один из" рабочих " потоков, но он не просто используется "как есть". Первая задача выполняется в основном потоке, но выполняется таким образом, что она это как если бы он был запущен на новом потоке. Прочитайте описание Стивена для получения дополнительной информации.
[Завершить Обновление]
Из того, что я могу сказать, ответ таков:ActivityId и LogicalOperationStack хранятся через CallContext.LogicalSetData . Это означает, что эти значения будут "перетекать" в любые "дочерние" потоки. Это довольно круто, как вы могли бы, например, установить ActivityId в точке входа в многопоточный сервер (скажем, вызов службы) и все потоки то, что в конечном счете начинается с этой точки входа, может быть частью той же самой"деятельности". Аналогично, логические операции (через LogicalOperationStack) также поступают в дочерние потоки.
Что касается трассировки.CorrelationManager.ActivityId:
ActivityId, кажется, совместим со всеми потоковыми моделями, с которыми я его тестировал: с использованием потоков напрямую, с использованием ThreadPool, с использованием задач, с использованием Parallel.*. Во всех случаях ActivityId имеет ожидаемое значение.
В отношении След.CorrelationManager.LogicalOperationStack:
LogicalOperationStack, по-видимому, совместим с большинством потоковых моделей, но не с параллельными.*. Используя потоки напрямую, ThreadPool и задачи, LogicalOperationStack (как манипулируется в примере кода, представленном в моем вопросе) сохраняет свою целостность. Во всех случаях содержимое пакета LogicalOperationStack соответствует ожидаемому.LogicalOperationStack не совместим с Parallel.Для. Если логическая операция "в действии", то есть, если вы вызвали CorrelationManager.StartLogicalOperation, до начала параллели.* операция, а затем вы начинаете новую логическую операцию в контексте Paralle.* (то есть в делегате), то LogicalOperationStack будет поврежден. (Я должен сказать, что он, вероятно, будет испорчен. Параллельный.* может не создавать никаких дополнительных потоков, что означает, что LogicalOperationStack будет безопасным).
Проблема проистекает из того, что параллельна.* использует основные поток (или, возможно, более правильно, поток, который запускает параллельную операцию) как один из своих "рабочих" потоков. Это означает, что по мере запуска и остановки" логических операций" в "рабочем" потоке, который является тем же самым, что и "главный" поток, изменяется LogicalOperationStack "главного" потока. Даже если вызывающий код (то есть делегат) поддерживает стек правильно (гарантируя, что каждая StartLogicalOperation "остановлена" с соответствующей StopLogicalOperation)," основные " потоки стек модифицирован. В конечном счете кажется (по крайней мере, мне), что LogicalOperationStack "основного" потока существенно модифицируется двумя различными "логическими" потоками: "основным" потоком и "рабочим" потоком, которые оба оказываются одним и тем же потоком.
Я не знаю глубинных особенностей того, почему это не работает (по крайней мере, как я ожидал бы, что это сработает). Мое лучшее предположение состоит в том, что каждый раз, когда делегат выполняется в потоке (это не то же самое, что основной поток), поток "наследует" текущее состояние LogicalOperationStack основного потока. Если делегат в данный момент выполняется в основном потоке (повторно используется как рабочий поток) и запустил логическую операцию, то один (или более) из других распараллеленных делегатов "наследует" LogicalOperationStack основного потока, который теперь имеет одну (или более) новых логических операций в действии!
FWIW, я реализовал (в основном для тестирования, я фактически не использую его в данный момент), следующий "логический стек" должен имитировать LogicalOperationStack, но делать это так, чтобы он работал параллельно.* Не стесняйтесь попробовать его и / или использовать. Для проверки замените вызовы на
Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation
В примере кода из моего исходного вопроса с вызовами
LogicalOperation.OperationStack.Push()/Pop(). //OperationStack.cs using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.Remoting.Messaging; namespace LogicalOperation { public static class OperationStack { private const string OperationStackSlot = "OperationStackSlot"; public static IDisposable Push(string operation) { OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem; OperationStackItem op = new OperationStackItem(parent, operation); CallContext.LogicalSetData(OperationStackSlot, op); return op; } public static object Pop() { OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem; if (current != null) { CallContext.LogicalSetData(OperationStackSlot, current.Parent); return current.Operation; } else { CallContext.FreeNamedDataSlot(OperationStackSlot); } return null; } public static object Peek() { OperationStackItem top = Top(); return top != null ? top.Operation : null; } internal static OperationStackItem Top() { OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem; return top; } public static IEnumerable<object> Operations() { OperationStackItem current = Top(); while (current != null) { yield return current.Operation; current = current.Parent; } } public static int Count { get { OperationStackItem top = Top(); return top == null ? 0 : top.Depth; } } public static IEnumerable<string> OperationStrings() { foreach (object o in Operations()) { yield return o.ToString(); } } } } //OperationStackItem.cs using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace LogicalOperation { public class OperationStackItem : IDisposable { private OperationStackItem parent = null; private object operation; private int depth; private bool disposed = false; internal OperationStackItem(OperationStackItem parentOperation, object operation) { parent = parentOperation; this.operation = operation; depth = parent == null ? 1 : parent.Depth + 1; } internal object Operation { get { return operation; } } internal int Depth { get { return depth; } } internal OperationStackItem Parent { get { return parent; } } public override string ToString() { return operation != null ? operation.ToString() : ""; } #region IDisposable Members public void Dispose() { if (disposed) return; OperationStack.Pop(); disposed = true; } #endregion } }
Это было вдохновлено объектами scope, описанными Брентом Вандермейдом здесь: http://www.dnrtv.com/default.aspx?showNum=114
Вы можете использовать этот класс следующим образом:
public void MyFunc() { using (LogicalOperation.OperationStack.Push("MyFunc")) { MyOtherFunc(); } } public void MyOtherFunc() { using (LogicalOperation.OperationStack.Push("MyOtherFunc")) { MyFinalFunc(); } } public void MyFinalFunc() { using (LogicalOperation.OperationStack.Push("MyFinalFunc")) { Console.WriteLine("Hello"); } }
Я исследовал способ иметь логический стек, который должен легко работать в приложении, использующем TPL в большой степени. Я решил использовать LogicalOperationStack, потому что он делал все, что мне было нужно, без изменения существующего кода. Но потом я прочитал об ошибке в LogicalCallContext:
Поэтому я попытался найти обходной путь для эта ошибка, и я думаю, что я получил его работу для TPL (спасибо ILSpy):
public static class FixLogicalOperationStackBug { private static bool _fixed = false; public static void Fix() { if (!_fixed) { _fixed = true; Type taskType = typeof(Task); var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback"); ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null); ContextCallback injectedCallback = new ContextCallback(obj => { // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack); s_ecCallback(obj); }); s_ecCallbackField.SetValue(null, injectedCallback); } } }