Резервный процессор в многопроцессорных и многопоточных приложений


Я работаю над проектом с Raspberry Pi 3 для некоторого экологического контроля с рядом простых повторяющихся событий в непрерывном цикле. RP3 слишком квалифицирован для этой работы, но это позволяет мне сосредоточиться на других вещах.

Характеристики приложения:

    Приложение должно собирать сенсорные данные (с переменным интервалом n-секунд) от десятка датчиков (температуры, влажности, pH, ORP и т. д.).
  1. основываясь на времени и этих сенсорных данных, контроллер вычисляет выходной сигнал (переключатели, клапаны и ШИМ-драйверы).
  2. почти ни одно событие не должно выполняться последовательно.
  3. Некоторые события относятся к категории "Безопасность" и должны запускаться мгновенно при срабатывании (отказоустойчивые датчики, аварийная кнопка).
  4. Большинство событий повторяются с интервалом в несколько секунд (каждую секунду, вплоть до каждых 30 секунд).
  5. Некоторые события запускают действие, активируя реле в течение от 1 до 120 секунд.
  6. Некоторые события используют значение, основанное на времени. Это значение должно быть вычисляется несколько раз в день и является довольно интенсивным процессором (использует несколько итеративных интерполяционных формул, и поэтому имеет переменную времени выполнения).
  7. отображение состояния среды (в непрерывном цикле)

Я знаком (не по профессии) с VB.NET, но решил сделать этот проект в Python 3.6. За последние несколько месяцев я много читал о таких предметах, как шаблоны проектирования, потоки, процессы, события, paralel prcessing и т. д.

Основываясь на моем чтении, я думаю, что Asyncio объединил с некоторыми задачами в Экзекьюторе справятся сами.

Большинство задач/событий не критичны по времени. На выходе контроллера можно использовать "самые последние" сенсорные данные. Некоторые задачи, с другой стороны, активируют ретранслятор на определенный период времени. Я хотел бы знать, как запрограммировать эти задачи без возможности того, что другая "трудоемкая" задача блокирует процессор в течение периода времени (например), когда открыт клапан CO2. Это может быть катастрофой для моего окружения.

Поэтому мне нужно немного совет.

Смотрите ниже мой код до сих пор. Я не уверен, что правильно использую функции Asyncio в Python. Для удобства чтения я буду хранить содержимое различных задач в отдельных модулях.

import asyncio
import concurrent.futures
import datetime
import time
import random
import math

# define a task...
async def firstTask():
    while True:
        await asyncio.sleep(1)
        print("First task executed")

# define another task...        
async def secondTask():
    while True:
        await asyncio.sleep(5)
        print("Second Worker Executed")

# define/simulate heavy CPU-bound task
def heavy_load():
    while True:
        print('Heavy_load started')
        i = 0
        for i in range(50000000):
            f = math.sqrt(i)*math.sqrt(i)
        print('Heavy_load finished')
        time.sleep(4)

def main():

    # Create a process pool (for CPU bound tasks).
    processpool = concurrent.futures.ProcessPoolExecutor()

    #  Create a thread pool (for I/O bound tasks).
    threadpool = concurrent.futures.ThreadPoolExecutor()

    loop = asyncio.get_event_loop()

    try:
        # Add all tasks. (Correct use?)
        asyncio.ensure_future(firstTask())
        asyncio.ensure_future(secondTask())
        loop.run_in_executor(processpool, heavy_load)
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        print("Loop will be ended")
        loop.close()    

if __name__ == '__main__':
    main()
2 2

2 ответа:

Большинство задач / событий не критичны по времени. Выход контроллера может использовать "самые последние" сенсорные данные. Некоторые задачи, с другой стороны, активируют ретранслятор на определенный период времени. Я хотел бы знать, как запрограммировать эти задачи без возможности того, что другая "трудоемкая" задача блокирует процессор в течение периода времени (например), когда открыт клапан CO2. Это может быть катастрофой для моего окружения.

Позвольте мне подчеркнуть, что Python-это не реальное время. язык , и asyncio не является компонентом реального времени. Они не имеют инфраструктуры для выполнения в реальном времени (Python-это мусор, собираемый и обычно работающий на системах с общим временем), и они не были протестированы в таких средах на практике. Поэтому я настоятельно рекомендую не использовать их в любом сценарии, где оплошность может быть гибельной для вашего окружения.

С этим из пути, ваш код имеет проблему: в то время как heavy_load вычисление не будет блокируйте цикл событий, он также никогда не завершится и не предоставит информацию о своем прогрессе. Идея, лежащая в основе run_in_executor, заключается в том, что вычисление, которое вы выполняете, в конечном счете остановится, и что цикл событий захочет получить уведомление об этом. Идиоматическое использование run_in_executor может выглядеть следующим образом:

def do_heavy_calc(param):
    print('Heavy_load started')
    f = 0
    for i in range(50000000):
        f += math.sqrt(i)*math.sqrt(i)
    return f

def heavy_calc(param):
    loop = asyncio.get_event_loop()
    return loop.run_in_executor(processpool, do_heavy_calc)

Выражение heavy_calc(...) не только выполняется без блокировки цикла событий, но и являетсяожидаемым . Это означает, что асинхронный код может ожидать своего результата, также без блокировки другие сопрограммы:

async def sum_params(p1, p2):
    s1 = await heavy_calc(p1)
    s2 = await heavy_calc(p2)
    return s1 + s2

Выше приводятся два расчета один за другим. Это также можно сделать параллельно:

async def sum_params_parallel(p1, p2):
    s1, s2 = await asyncio.gather(heavy_calc(p1), heavy_calc(p2))
    return s1 + s2

Еще одна вещь, которую можно было бы улучшить, - это код установки:

asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()

Вызов asyncio.ensure_future и затем никогда не ожидание результата является чем-то вроде анти-паттерна asyncio. Исключения, вызванные неожиданными задачами, молча проглатываются, что почти наверняка не то, что вам нужно. Иногда люди просто забывают писать await, Вот почему asyncio жалуется на невыполненные незавершенные задачи, когда цикл разрушается.

Хорошая практика кодирования состоит в том, чтобы организовать каждую задачу, ожидаемуюкем-то , либо сразу с await, либо gather, чтобы объединить ее с другой задачей, или в более поздний момент. Например, если задача должна выполняться в фоновом режиме, вы можете сохранить ее где-нибудь и await или отменить ее в конце жизненного цикла приложения. В вашем случае я бы объединил gather с loop.run_until_complete:

everything = asyncio.gather(firstTask(), secondTask(),
                           loop.run_in_executor(processpool, heavy_load))
loop.run_until_complete(everything)

Некоторые события относятся к категории "Безопасность" и должны запускаться мгновенно при срабатывании (отказоустойчивые датчики, аварийная кнопка).

Тогда я настоятельно рекомендую вам не полагаться на программное обеспечение для выполнения этой функции. Кнопки аварийной остановки, которые отключают питание, - это то, как это обычно делается. Если у вас есть программное обеспечение, делающее это, и вы действительно имеете ситуацию угрозы для жизни, чтобы справиться, вы находитесь в целой куче горя - почти наверняка есть тонна правил с которым вы должны согласиться.