Неблокирующий ФИФО


Как я могу сделать fifo между двумя процессами python, которые позволяют отбрасывать строки, Если читатель не в состоянии обработать ввод?

  • Если читатель пытается read или readline быстрее, чем пишет писатель, он должен блокировать.
  • Если читатель не может работать так же быстро, как пишет писатель, писатель не должен блокировать. Строки не должны быть буферизованы (за исключением одной строки за раз) , и только последняя строка должна быть получена читателем на его следующем readline попытка.

Возможно ли это с именем fifo, или есть какой-либо другой простой способ для достижения этого?

2 3

2 ответа:

Следующий код использует именованный FIFO для обеспечения связи между двумя сценариями.

  • Если читатель пытается read быстрее, чем писатель, он блокирует.
  • Если читатель не может идти в ногу с писателем, писатель не блокирует.
  • операции ориентированы на буфер. Линейно-ориентированные операции в настоящее время не выполняются.
  • Этот код следует рассматривать как доказательство концепции. Задержки и размеры буфера: произвольный.

Код

import argparse
import errno
import os
from select import select
import time

class OneFifo(object):
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        if os.path.exists(self.name):
            os.unlink(self.name)
        os.mkfifo(self.name)
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        if os.path.exists(self.name):
            os.unlink(self.name)

    def write(self, data):
        print "Waiting for client to open FIFO..."
        try:
            server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK)
        except OSError as exc:
            if exc.errno == errno.ENXIO:
                server_file = None
            else:
                raise
        if server_file is not None:
            print "Writing line to FIFO..."
            try:
                os.write(server_file, data)
                print "Done."
            except OSError as exc:
                if exc.errno == errno.EPIPE:
                    pass
                else:
                    raise
            os.close(server_file)

    def read_nonblocking(self):
        result = None
        try:
            client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                client_file = None
            else:
                raise
        if client_file is not None:
            try:
                rlist = [client_file]
                wlist = []
                xlist = []
                rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01)
                if client_file in rlist:
                    result = os.read(client_file, 1024)
            except OSError as exc:
                if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK:
                    result = None
                else:
                    raise
            os.close(client_file)
        return result

    def read(self):
        try:
            with open(self.name, 'r') as client_file:
                result = client_file.read()
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                result = None
            else:
                raise
        if not len(result):
            result = None
        return result

def parse_argument():
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--client', action='store_true',
                        help='Set this flag for the client')
    parser.add_argument('-n', '--non-blocking', action='store_true',
                        help='Set this flag to read without blocking')
    result = parser.parse_args()
    return result

if __name__ == '__main__':
    args = parse_argument()
    if not args.client:
        with OneFifo('known_name') as one_fifo:
            while True:
                one_fifo.write('one line')
                time.sleep(0.1)
    else:
        one_fifo = OneFifo('known_name')
        while True:
            if args.non_blocking:
                result = one_fifo.read_nonblocking()
            else:
                result = one_fifo.read()
            if result is not None:
                print result

server проверяет, открыл ли client FIFO. Если client открыл FIFO, то server записывает строку. В противном случае server продолжает работать. Я реализовал неблокирующее чтение, потому что блокирующее чтение вызывает проблему: если server перезапускается, большую часть времени client остается заблокированным и никогда не восстанавливается. С неблокирующим client перезапуском server проще допущенный.

Вывод

[user@machine:~] python onefifo.py
Waiting for client to open FIFO...
Waiting for client to open FIFO...
Writing line to FIFO...           
Done.
Waiting for client to open FIFO...
Writing line to FIFO...
Done.

[user@machine:~] python onefifo.py -c
one line
one line

Примечания

При запуске, если server обнаруживает, что FIFO уже существует, он удаляет его. Это самый простой способ уведомить clients, что server перезапустился. Это уведомление обычно игнорируется блокирующей версией client.

Ну, это на самом деле не FIFO (очередь), насколько мне известно - это единственная переменная. Я предполагаю, что это может быть реализовано, если вы настроите очередь или канал с максимальным размером 1, но кажется, что это будет работать лучше использовать Lock об одном объекте в одном из процессов, на который другой процесс ссылается через прокси-объект . Читатель будет устанавливать его в None всякий раз, когда он читает, и писатель будет перезаписывать содержимое каждый раз, когда он пишет.

Вы можете передайте их другим процессам, передав прокси объекта и прокси блокировки в качестве аргумента всем соответствующим процессам. Чтобы получить его немного удобнее, вы можете использовать Manager, который предоставляет один объект с прокси, который вы можете передать, который содержит и предоставляет прокси для любых других объектов (включая блокировки), которые вы хотите поместить в него. Этот ответ дает полезный пример правильного использования менеджера для передачи объектов в новый процесс.