Резервный процессор в многопроцессорных и многопоточных приложений
Я работаю над проектом с Raspberry Pi 3 для некоторого экологического контроля с рядом простых повторяющихся событий в непрерывном цикле. RP3 слишком квалифицирован для этой работы, но это позволяет мне сосредоточиться на других вещах.
Характеристики приложения:
-
Приложение должно собирать сенсорные данные (с переменным интервалом n-секунд) от десятка датчиков (температуры, влажности, pH, ORP и т. д.).
- основываясь на времени и этих сенсорных данных, контроллер вычисляет выходной сигнал (переключатели, клапаны и ШИМ-драйверы).
- почти ни одно событие не должно выполняться последовательно.
- Некоторые события относятся к категории "Безопасность" и должны запускаться мгновенно при срабатывании (отказоустойчивые датчики, аварийная кнопка). Большинство событий повторяются с интервалом в несколько секунд (каждую секунду, вплоть до каждых 30 секунд).
- Некоторые события запускают действие, активируя реле в течение от 1 до 120 секунд.
- Некоторые события используют значение, основанное на времени. Это значение должно быть вычисляется несколько раз в день и является довольно интенсивным процессором (использует несколько итеративных интерполяционных формул, и поэтому имеет переменную времени выполнения).
- отображение состояния среды (в непрерывном цикле)
Я знаком (не по профессии) с VB.NET, но решил сделать этот проект в Python 3.6. За последние несколько месяцев я много читал о таких предметах, как шаблоны проектирования, потоки, процессы, события, paralel prcessing и т. д.
Основываясь на моем чтении, я думаю, что Asyncio объединил с некоторыми задачами в Экзекьюторе справятся сами.
Большинство задач/событий не критичны по времени. На выходе контроллера можно использовать "самые последние" сенсорные данные. Некоторые задачи, с другой стороны, активируют ретранслятор на определенный период времени. Я хотел бы знать, как запрограммировать эти задачи без возможности того, что другая "трудоемкая" задача блокирует процессор в течение периода времени (например), когда открыт клапан CO2. Это может быть катастрофой для моего окружения.
Поэтому мне нужно немного совет.
Смотрите ниже мой код до сих пор. Я не уверен, что правильно использую функции Asyncio в Python. Для удобства чтения я буду хранить содержимое различных задач в отдельных модулях.
import asyncio
import concurrent.futures
import datetime
import time
import random
import math
# define a task...
async def firstTask():
while True:
await asyncio.sleep(1)
print("First task executed")
# define another task...
async def secondTask():
while True:
await asyncio.sleep(5)
print("Second Worker Executed")
# define/simulate heavy CPU-bound task
def heavy_load():
while True:
print('Heavy_load started')
i = 0
for i in range(50000000):
f = math.sqrt(i)*math.sqrt(i)
print('Heavy_load finished')
time.sleep(4)
def main():
# Create a process pool (for CPU bound tasks).
processpool = concurrent.futures.ProcessPoolExecutor()
# Create a thread pool (for I/O bound tasks).
threadpool = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
try:
# Add all tasks. (Correct use?)
asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Loop will be ended")
loop.close()
if __name__ == '__main__':
main()
2 ответа:
Большинство задач / событий не критичны по времени. Выход контроллера может использовать "самые последние" сенсорные данные. Некоторые задачи, с другой стороны, активируют ретранслятор на определенный период времени. Я хотел бы знать, как запрограммировать эти задачи без возможности того, что другая "трудоемкая" задача блокирует процессор в течение периода времени (например), когда открыт клапан CO2. Это может быть катастрофой для моего окружения.Позвольте мне подчеркнуть, что Python-это не реальное время. язык , и asyncio не является компонентом реального времени. Они не имеют инфраструктуры для выполнения в реальном времени (Python-это мусор, собираемый и обычно работающий на системах с общим временем), и они не были протестированы в таких средах на практике. Поэтому я настоятельно рекомендую не использовать их в любом сценарии, где оплошность может быть гибельной для вашего окружения.
С этим из пути, ваш код имеет проблему: в то время как
heavy_load
вычисление не будет блокируйте цикл событий, он также никогда не завершится и не предоставит информацию о своем прогрессе. Идея, лежащая в основеrun_in_executor
, заключается в том, что вычисление, которое вы выполняете, в конечном счете остановится, и что цикл событий захочет получить уведомление об этом. Идиоматическое использованиеrun_in_executor
может выглядеть следующим образом:def do_heavy_calc(param): print('Heavy_load started') f = 0 for i in range(50000000): f += math.sqrt(i)*math.sqrt(i) return f def heavy_calc(param): loop = asyncio.get_event_loop() return loop.run_in_executor(processpool, do_heavy_calc)
Выражение
heavy_calc(...)
не только выполняется без блокировки цикла событий, но и являетсяожидаемым . Это означает, что асинхронный код может ожидать своего результата, также без блокировки другие сопрограммы:async def sum_params(p1, p2): s1 = await heavy_calc(p1) s2 = await heavy_calc(p2) return s1 + s2
Выше приводятся два расчета один за другим. Это также можно сделать параллельно:
async def sum_params_parallel(p1, p2): s1, s2 = await asyncio.gather(heavy_calc(p1), heavy_calc(p2)) return s1 + s2
Еще одна вещь, которую можно было бы улучшить, - это код установки:
asyncio.ensure_future(firstTask()) asyncio.ensure_future(secondTask()) loop.run_in_executor(processpool, heavy_load) loop.run_forever()
Вызов
asyncio.ensure_future
и затем никогда не ожидание результата является чем-то вроде анти-паттерна asyncio. Исключения, вызванные неожиданными задачами, молча проглатываются, что почти наверняка не то, что вам нужно. Иногда люди просто забывают писатьawait
, Вот почему asyncio жалуется на невыполненные незавершенные задачи, когда цикл разрушается.Хорошая практика кодирования состоит в том, чтобы организовать каждую задачу, ожидаемуюкем-то , либо сразу с
await
, либоgather
, чтобы объединить ее с другой задачей, или в более поздний момент. Например, если задача должна выполняться в фоновом режиме, вы можете сохранить ее где-нибудь иawait
или отменить ее в конце жизненного цикла приложения. В вашем случае я бы объединилgather
сloop.run_until_complete
:everything = asyncio.gather(firstTask(), secondTask(), loop.run_in_executor(processpool, heavy_load)) loop.run_until_complete(everything)
Тогда я настоятельно рекомендую вам не полагаться на программное обеспечение для выполнения этой функции. Кнопки аварийной остановки, которые отключают питание, - это то, как это обычно делается. Если у вас есть программное обеспечение, делающее это, и вы действительно имеете ситуацию угрозы для жизни, чтобы справиться, вы находитесь в целой куче горя - почти наверняка есть тонна правил с которым вы должны согласиться.Некоторые события относятся к категории "Безопасность" и должны запускаться мгновенно при срабатывании (отказоустойчивые датчики, аварийная кнопка).