Объединение нескольких функций генератора блокировок в python
У меня есть два итератора. Каждый из них представляет собой, возможно, бесконечный поток данных, поступающих из блокирующего ресурса, например сокета.
Я хочу объединить данные в двух итераторах, в том порядке, в котором они поступают - т. е. недетерминированно. Более подробно, если у меня есть итераторы iter1
и iter2
, я хочу, чтобы мой результат был итератором, эквивалентным merged
.
iter1 : 1 2 3 4 5 ...
iter2 : 1 2 3 ...
merged: 1 2 3 1 2 4 3 5 ...
--- > increasing time --->
Я предполагаю, что мне понадобится параллельная программа, но я не уверен, что есть пифонический способ сделать это. Я бы очень хотел получить ответ. это работает в Python 2.6.
Например, предположим, что у меня есть два итератора, которые "под колпаком" читают из сокета. Вот быстрый серверный "прослушиватель", который многократно повторяет дату / время подключения клиента:
==> message.sh <==
#!/usr/bin/env bash
set -e;
# Repeatedly echo the date/time of client connection
MSG=$(date)
while true; do
echo $MSG;
sleep 1;
done
==> server.sh <==
#!/usr/bin/env bash
socat TCP-LISTEN:8008,reuseaddr,fork system:"./message.sh"
Вы можете запустить сервер с помощью ./server.sh
.
Ниже приведен пример скрипта python, который пытается объединить сообщения из двух сокетов. Однако это неверно - он должен получать значение от каждого итератора, чтобы продолжить. Используя приведенный выше пример, "слияние" результатом будет:
iter1 : 1 2 3 4 5 ...
iter2 : 1 2 3 ...
merged: 1 1 2 2 3 3 4 ...
Вот сценарий:
#!/usr/bin/env python2
import socket
import time
HOST = "127.0.0.1"
PORT = 8008
def iterate_socket(sock):
while True:
yield sock.recv(1024)
def merge(xs, ys):
iters = [xs, ys]
while True:
for it in iters:
try:
i = it.next()
yield i
except StopIteration:
pass
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))
iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)
for msg in merge(iter1, iter2):
print msg,
Наконец: я получаю итераторы из библиотеки, поэтому, пожалуйста, предположите для целей этого вопроса, что я должен иметь дело с итераторами, и я не могу сделать что-то вроде установки сокета на неблокирующий и опрос.
1 ответ:
Можно переместить итерацию сокета в фоновые потоки, а затем использовать
Queue
для отправки данных, полученных каждым из них, в основной поток. Тогда ваш основной поток может просто потреблять данные из очереди по мере поступления:import socket import time from Queue import Queue from threading import Thread HOST = "127.0.0.1" PORT = 8008 def iterate_socket(sock): while True: data = sock.recv(1024) yield data if not data: # End of the stream return def consume(q, s): for i in s: q.put(i) def merge(xs, ys): q = Queue() iters = [xs, ys] for it in iters: t = Thread(target=consume, args=(q, it)) t.start() done = 0 while True: out = q.get() if out == '': # End of the stream. done += 1 if done == len(iters): # When all iters are done, break out. return else: yield out sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock1.connect((HOST, PORT)) time.sleep(1) sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock2.connect((HOST, PORT)) iter1 = iterate_socket(sock1) iter2 = iterate_socket(sock2) for msg in merge(iter1, iter2): print msg,