Объединение нескольких функций генератора блокировок в python


У меня есть два итератора. Каждый из них представляет собой, возможно, бесконечный поток данных, поступающих из блокирующего ресурса, например сокета.

Я хочу объединить данные в двух итераторах, в том порядке, в котором они поступают - т. е. недетерминированно. Более подробно, если у меня есть итераторы iter1 и iter2, я хочу, чтобы мой результат был итератором, эквивалентным merged.

iter1 : 1 2 3     4   5 ...
iter2 :       1 2   3   ... 
merged: 1 2 3 1 2 4 3 5 ...

   --- > increasing time ---> 
Я предполагаю, что мне понадобится параллельная программа, но я не уверен, что есть пифонический способ сделать это. Я бы очень хотел получить ответ. это работает в Python 2.6.

Например, предположим, что у меня есть два итератора, которые "под колпаком" читают из сокета. Вот быстрый серверный "прослушиватель", который многократно повторяет дату / время подключения клиента:

==> message.sh <==
#!/usr/bin/env bash
set -e;

# Repeatedly echo the date/time of client connection
MSG=$(date)
while true; do
  echo $MSG;
  sleep 1;
done

==> server.sh <==
#!/usr/bin/env bash
socat TCP-LISTEN:8008,reuseaddr,fork system:"./message.sh"

Вы можете запустить сервер с помощью ./server.sh.

Ниже приведен пример скрипта python, который пытается объединить сообщения из двух сокетов. Однако это неверно - он должен получать значение от каждого итератора, чтобы продолжить. Используя приведенный выше пример, "слияние" результатом будет:

iter1 : 1 2 3     4   5 ...
iter2 :       1 2   3   ... 
merged: 1     1 2 2 3 3 4     ...

Вот сценарий:

#!/usr/bin/env python2
import socket
import time

HOST = "127.0.0.1"
PORT = 8008


def iterate_socket(sock):
    while True:
        yield sock.recv(1024)


def merge(xs, ys):
    iters = [xs, ys]
    while True:
        for it in iters:
            try:
                i = it.next()
                yield i
            except StopIteration:
                pass

sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))

iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)

for msg in merge(iter1, iter2):
    print msg,

Наконец: я получаю итераторы из библиотеки, поэтому, пожалуйста, предположите для целей этого вопроса, что я должен иметь дело с итераторами, и я не могу сделать что-то вроде установки сокета на неблокирующий и опрос.

1 2

1 ответ:

Можно переместить итерацию сокета в фоновые потоки, а затем использовать Queue для отправки данных, полученных каждым из них, в основной поток. Тогда ваш основной поток может просто потреблять данные из очереди по мере поступления:

import socket
import time
from Queue import Queue
from threading import Thread

HOST = "127.0.0.1"
PORT = 8008


def iterate_socket(sock):
    while True:
        data = sock.recv(1024)
        yield data
        if not data: # End of the stream
            return

def consume(q, s):
    for i in s:
        q.put(i)

def merge(xs, ys):
    q = Queue()
    iters = [xs, ys]
    for it in iters:
        t = Thread(target=consume, args=(q, it))
        t.start()

    done = 0
    while True:
        out = q.get()
        if out == '':  # End of the stream.
            done += 1
            if done == len(iters): # When all iters are done, break out.
                return
        else:
            yield out

sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock1.connect((HOST, PORT))
time.sleep(1)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock2.connect((HOST, PORT))

iter1 = iterate_socket(sock1)
iter2 = iterate_socket(sock2)

for msg in merge(iter1, iter2):
    print msg,