Снятие нескольких блокировок без инверсии приоритета
Краткая версия: как я могу освободить несколько блокировок из одного потока, не будучи вытесненным на полпути?
У меня есть программа, которая предназначена для работы на N-core машине. Он состоит из одного основного потока и N рабочих потоков. Каждый поток (включая основной поток) имеет семафор, который он может блокировать. Обычно каждый рабочий поток блокируется при уменьшении его семафора, и основной поток выполняется. Время от времени, однако, главная нить должна будить работника. нити делают свое дело в течение определенного количества времени, а затем блокируют свой собственный семафор, ожидая, пока все они снова заснут. Вот так:def main_thread(n):
for i = 1 to n:
worker_semaphore[i] = semaphore(0)
spawn_thread(worker_thread, i)
main_semaphore = semaphore(0)
while True:
...do some work...
workers_to_wake = foo()
for i in workers_to_wake:
worker_semaphore[i].increment() # wake up worker n
for i in workers_to_wake:
main_semaphore.decrement() # wait for all workers
def worker_thread(i):
while True:
worker_semaphore(i).decrement() # wait to be woken
...do some work...
main_semaphore.increment() # report done with step
Все хорошо и хорошо. Проблема в том, что один из проснувшихся рабочих может в конечном итоге опередить основной поток на полпути к пробуждению рабочих: это может произойти, например, когда планировщик Windows решит повысить приоритет этого рабочего. Это не приводит к взаимоблокировке, но это неэффективно, потому что остальные потоки остаются спящими до тех пор, пока вытесняющий работник заканчивает свою работу. Это в основном инверсия приоритетов, когда главный поток ожидает одного из рабочих потоков, а некоторые рабочие потоки ждут главного потока.
Я, вероятно, могу вычислить специфические для ОС и планировщика хаки для этого, такие как отключение повышения приоритета под Windows и возня с приоритетами потоков и сходством процессоров, но я хотел бы что-то кросс-платформенное, надежное и чистое. Итак: как я могу разбудить кучу нитей атомарно?
4 ответа:
TL; DR
Если вам действительно нужно получить как можно больше от ваших работников, просто используйте семафор событий, блок управления и барьер вместо ваших семафоров. Обратите внимание, однако, что это более хрупкое решение, и поэтому вам нужно сбалансировать любые потенциальные выгоды с этой обратной стороной.Контекст
Сначала я должен подвести итог более широкому контексту нашей дискуссии...
У вас есть графическое приложение Windows. Он имеет желаемую частоту кадров, и поэтому вам нужно основной поток должен работать с такой скоростью, планируя всех своих работников точно по расписанию так, чтобы они завершили свою работу в интервале обновления. Это означает, что у вас есть очень жесткие ограничения на время запуска и выполнения для каждого потока. Кроме того, рабочие потоки не все идентичны, поэтому вы не можете просто использовать одну рабочую очередь.
Проблема
Как и любая современная операционная система, Windows имеет множество примитивов синхронизации . Однако, ни один из них напрямую не обеспечивает механизм уведомления сразу нескольких примитивов. Просматривая другие операционные системы, я вижу похожую картину; все они предоставляют способы ожидания на нескольких примитивах, но ни один не предоставляет атомарный способ запуска их. Так что же мы можем сделать вместо этого? Проблемы, которые вам нужно решить:
- точное время начала работы всех необходимых работников.
- подталкивание рабочих, которые на самом деле должны работать в следующий кадр.
Варианты
Наиболее очевидным решением проблемы 1 является использование только одного семафора событий, но вы также можете использовать блокировку чтения/записи (приобретая блокировку записи после того, как рабочие закончат и заставляя рабочих использовать блокировку чтения). Все остальные параметры больше не являются атомарными, и поэтому потребуется дальнейшая синхронизация, чтобы заставить потоки делать то, что вы хотите - например, предложение lossleader для блокировок внутри ваших семафоров.Но мы хотим оптимального решение, которое максимально сокращает переключение контекста из-за жестких временных ограничений вашего приложения, поэтому давайте посмотрим, можно ли использовать любой из них для решения проблемы 2... Как вы можете выбрать, какие рабочие потоки должны выполняться из основного, если у нас есть только семафор событий или блокировка чтения/записи?
Хорошо... Блокировка чтения / записи-отличный способ для одного потока записать некоторые важные данные в блок управления, а для многих других-прочитать из него. Почему бы просто не иметь простой массив boolean флаги (по одному для каждого рабочего потока), которые ваш основной поток обновляет каждый кадр? К сожалению, вам все еще нужно остановить выполнение рабочих, пока таймер не выскочит. Короче говоря, мы снова вернулись к решению семафора и блокировки.
Однако, в силу характера вашего приложения, вы можете сделать еще один шаг. Вы можете положиться на тот факт, что вы знаете, что ваши работники не работают вне вашего времени нарезки и использовать семафор событий в качестве грубой формы блокировки вместо этого.Окончательная оптимизация (если ваше окружение поддерживает их), это использовать барьер вместо основного семафора. Вы знаете, что все N потоков должны быть простаивающими, прежде чем вы сможете продолжить, поэтому просто настаивайте на этом.
Решение
Применяя вышеизложенное, ваш псевдокод будет выглядеть примерно так:
def main_thread(n): main_event = event() for i = 1 to n: worker_scheduled[i] = False spawn_thread(worker_thread, i) main_barrier = barrier(n+1) while True: ...do some work... workers_to_wake = foo() for i in workers_to_wake: worker_scheduled[i] = True main_event.set() main_barrier.enter() # wait for all workers main_event.reset() def worker_thread(i): while True: main_event.wait() if worker_scheduled[i]: worker_scheduled[i] = False ...do some work... main_barrier.enter() # report finished for this frame. main_event.reset() # to catch the case that a worker is scheduled before the main thread
Поскольку нет явного контроля массива worker_scheduled, это гораздо более хрупкое решение.
Поэтому я лично использовал бы его только в том случае, если бы мне пришлось выжать все до последней унции. обработки из моего процессора, но похоже, что это именно то, что вы ищете.
Это невозможно при использовании нескольких объектов синхронизации (семафоров), когда сложность алгоритма пробуждения равна O(n). Есть несколько способов, как решить эту проблему.
Отпустите все сразу
Я не уверен, есть ли у Python необходимый метод (ваш вопрос специфичен для Python?), но, как правило, семафоры имеют операции с аргументом, указывающим число на декременты / инкременты. Таким образом, вы просто ставите все свои нити на один и тот же семафор и будите их все вместе. Аналогичный подход заключается в использовании условной переменной и уведомить всех .
Циклы событий
Если вы все еще хотите иметь возможность управлять каждым потоком индивидуально, но как подход с уведомлением один ко многим, попробуйте библиотеки для асинхронного ввода-вывода, такие как
libuv
(и его Python-аналог). Здесь вы можете создать одно событие, которое пробуждает все потоки сразу, а также создать для каждого потока свое отдельное событие, а затем просто дождаться обоих (или нескольких) объектов событий в событии петли в каждой нити. Есть еще одна библиотекаpevents
который реализуетWaitForMultipleObjects
поверх условных переменных pthreads.Делегат пробуждается
Другой подход заключается в замене вашего алгоритма O(n) древовидным алгоритмом ( O(log n)), где каждый поток пробуждает только фиксированное число других потоков, но делегирует их пробуждению других. В крайнем случае основная нить может разбудить только одну другую нить, которая разбудит всех остальных или запустит цепную реакцию. Это может быть полезно если вы хотите уменьшить задержку для основного потока за счет задержек пробуждения других потоков.
Блокировка Чтения/Записи
Решение, которое я обычно использую в системах POSIX для отношения один ко многим, - это блокировка чтения / записи. Меня удивляет, что они не являются полной универсалией, но большинство языков либо реализуют версию, либо, по крайней мере, имеют пакет, доступный для реализации их на любых существующих примитивах, например, python'S prwlock:
from prwlock import RWLock def main_thread(n): for i = 1 to n: worker_semaphore[i] = semaphore(0) spawn_thread(worker_thread, i) main_lock = RWLock() while True: main_lock.acquire_write() ...do some work... workers_to_wake = foo() # The above acquire could be moved as low as here, # depending on how independent the above processing is.. for i in workers_to_wake: worker_semaphore[i].increment() # wake up worker n main_lock.release() def worker_thread(i): while True: worker_semaphore(i).decrement() # wait to be woken main_lock.acquire_read() ...do some work... main_lock.release() # report done with step
Барьеры
Барьеры кажутся наиболее близким по назначению встроенным механизмом Python к задержите все нити, пока они все не будут предупреждены, но:
Это довольно необычное решение, поэтому они сделают ваш код/опыт более трудным для перевода на другие языки.
Я бы не хотел использовать их для этого случая, когда количество потоков, которые нужно разбудить, постоянно меняется. Учитывая, что ваш n звучит мало, я бы хотел использовать константу
Barrier(n)
и уведомить все потоки, чтобы проверить, работают ли они в этом цикле. Но:Я был бы обеспокоен это использование барьера приведет к обратным результатам, так как любой из потоков, удерживаемых чем-то внешним, будет удерживать их все, и даже планировщик с повышением зависимости от ресурсов может пропустить эту связь. Необходимость во всех n, чтобы достичь барьера, могла только ухудшить ситуацию.
Решение Питера Бриттена плюс предложение Антона о "пробуждении, подобном дереву", привели меня к другому решению: цепные пробуждения. В принципе, вместо того, чтобы основная нить делала все пробуждения, она пробуждает только одну нить; и тогда каждая нить отвечает за пробуждение следующей. Элегантный бит здесь заключается в том, что есть только один приостановленный поток, готовый к запуску, поэтому потоки редко заканчиваются переключением ядер. На самом деле, это прекрасно работает со строгими процессорными сродствами, даже если один из рабочих нити имеют общее сродство с основной нитью.
Другая вещь, которую я сделал, это использовал атомарный счетчик, который рабочие потоки декрементируют перед сном; таким образом, только последний из них будит основной поток, так что нет также никакого шанса, что основной поток будет разбужен несколько раз, просто чтобы сделать больше ожидания семафора.
workers_to_wake = [] main_semaphore = semaphore(0) num_woken_workers = atomic_integer() def main_thread(n): for i = 1 to n: worker_semaphore[i] = semaphore(0) spawn_thread(worker_thread, i) main_semaphore = semaphore(0) while True: ...do some work... workers_to_wake = foo() num_woken_workers.atomic_set(len(workers_to_wake)) # set completion countdown one_to_wake = workers_to_wake.pop() worker_semaphore[one_to_wake].increment() # wake the first worker main_semaphore.decrement() # wait for all workers def worker_thread(i): while True: worker_semaphore[i].decrement() # wait to be woken if workers_to_wake.len() > 0: # more pending wakeups one_to_wake = workers_to_wake.pop() worker_semaphore[one_to_wake].increment() # wake the next worker ...do some work... if num_woken_workers.atomic_decrement() == 0: # see whether we're the last one main_semaphore.increment() # report all done with step