Неблокирующий ФИФО
Как я могу сделать fifo между двумя процессами python, которые позволяют отбрасывать строки, Если читатель не в состоянии обработать ввод?
- Если читатель пытается
read
илиreadline
быстрее, чем пишет писатель, он должен блокировать.
Если читатель не может работать так же быстро, как пишет писатель, писатель не должен блокировать. Строки не должны быть буферизованы (за исключением одной строки за раз) , и только последняя строка должна быть получена читателем на его следующем
readline
попытка.
Возможно ли это с именем fifo, или есть какой-либо другой простой способ для достижения этого?
2 ответа:
Следующий код использует именованный FIFO для обеспечения связи между двумя сценариями.
- Если читатель пытается
Если читатель не может идти в ногу с писателем, писатель не блокирует.read
быстрее, чем писатель, он блокирует.- операции ориентированы на буфер. Линейно-ориентированные операции в настоящее время не выполняются.
Этот код следует рассматривать как доказательство концепции. Задержки и размеры буфера: произвольный.Код
import argparse import errno import os from select import select import time class OneFifo(object): def __init__(self, name): self.name = name def __enter__(self): if os.path.exists(self.name): os.unlink(self.name) os.mkfifo(self.name) return self def __exit__(self, exc_type, exc_value, exc_traceback): if os.path.exists(self.name): os.unlink(self.name) def write(self, data): print "Waiting for client to open FIFO..." try: server_file = os.open(self.name, os.O_WRONLY | os.O_NONBLOCK) except OSError as exc: if exc.errno == errno.ENXIO: server_file = None else: raise if server_file is not None: print "Writing line to FIFO..." try: os.write(server_file, data) print "Done." except OSError as exc: if exc.errno == errno.EPIPE: pass else: raise os.close(server_file) def read_nonblocking(self): result = None try: client_file = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK) except OSError as exc: if exc.errno == errno.ENOENT: client_file = None else: raise if client_file is not None: try: rlist = [client_file] wlist = [] xlist = [] rlist, wlist, xlist = select(rlist, wlist, xlist, 0.01) if client_file in rlist: result = os.read(client_file, 1024) except OSError as exc: if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK: result = None else: raise os.close(client_file) return result def read(self): try: with open(self.name, 'r') as client_file: result = client_file.read() except OSError as exc: if exc.errno == errno.ENOENT: result = None else: raise if not len(result): result = None return result def parse_argument(): parser = argparse.ArgumentParser() parser.add_argument('-c', '--client', action='store_true', help='Set this flag for the client') parser.add_argument('-n', '--non-blocking', action='store_true', help='Set this flag to read without blocking') result = parser.parse_args() return result if __name__ == '__main__': args = parse_argument() if not args.client: with OneFifo('known_name') as one_fifo: while True: one_fifo.write('one line') time.sleep(0.1) else: one_fifo = OneFifo('known_name') while True: if args.non_blocking: result = one_fifo.read_nonblocking() else: result = one_fifo.read() if result is not None: print result
server
проверяет, открыл лиclient
FIFO. Еслиclient
открыл FIFO, тоserver
записывает строку. В противном случаеserver
продолжает работать. Я реализовал неблокирующее чтение, потому что блокирующее чтение вызывает проблему: еслиserver
перезапускается, большую часть времениclient
остается заблокированным и никогда не восстанавливается. С неблокирующимclient
перезапускомserver
проще допущенный.Вывод
[user@machine:~] python onefifo.py Waiting for client to open FIFO... Waiting for client to open FIFO... Writing line to FIFO... Done. Waiting for client to open FIFO... Writing line to FIFO... Done. [user@machine:~] python onefifo.py -c one line one line
Примечания
При запуске, если
server
обнаруживает, что FIFO уже существует, он удаляет его. Это самый простой способ уведомитьclients
, чтоserver
перезапустился. Это уведомление обычно игнорируется блокирующей версиейclient
.
Ну, это на самом деле не FIFO (очередь), насколько мне известно - это единственная переменная. Я предполагаю, что это может быть реализовано, если вы настроите очередь или канал с максимальным размером 1, но кажется, что это будет работать лучше использовать
Lock
об одном объекте в одном из процессов, на который другой процесс ссылается через прокси-объект . Читатель будет устанавливать его вNone
всякий раз, когда он читает, и писатель будет перезаписывать содержимое каждый раз, когда он пишет.Вы можете передайте их другим процессам, передав прокси объекта и прокси блокировки в качестве аргумента всем соответствующим процессам. Чтобы получить его немного удобнее, вы можете использовать
Manager
, который предоставляет один объект с прокси, который вы можете передать, который содержит и предоставляет прокси для любых других объектов (включая блокировки), которые вы хотите поместить в него. Этот ответ дает полезный пример правильного использования менеджера для передачи объектов в новый процесс.