Многопроцессорная обработка и сокеты в Python


Я пытаюсь заставить многопроцессорную обработку и программирование сокетов работать вместе, но я застрял на этом этапе. Проблема в том, что я получаю эту ошибку:

  File "multiprocesssockserv.py", line 11, in worker
    clientsocket = socket.fromfd(clientfileno, socket.AF_INET, socket.SOCK_STREAM)
error: [Errno 9] Bad file descriptor

Полный код, вызывающий ошибку, выглядит следующим образом:

import multiprocessing as mp
import logging
import socket

logger = mp.log_to_stderr(logging.WARN)

def worker(queue):
    while True:
        clientfileno = queue.get()
        print clientfileno
        clientsocket = socket.fromfd(clientfileno, socket.AF_INET, socket.SOCK_STREAM)
        clientsocket.recv()
        clientsocket.send("Hello World")
        clientsocket.close()

if __name__ == '__main__':
    num_workers = 5
    socket_queue = mp.Queue()
    workers = [mp.Process(target=worker, args=(socket_queue,)) for i in
            range(num_workers)]

    for p in workers:
        p.daemon = True
        p.start()

    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.bind(('',9090))
    serversocket.listen(5)
    while True:
        client, address = serversocket.accept()
        socket_queue.put(client.fileno())

Edit: я использую сокет.fromfd, потому что я не могу поместить сокеты в очередь :) мне нужен способ доступа к одним и тем же сокетам из разных процессов. Вот в чем суть моей проблемы.

3 4

3 ответа:

Поработав над этим некоторое время, я решил подойти к этой проблеме с другой стороны, и следующий метод, кажется, работает для меня.

import multiprocessing as mp
import logging
import socket
import time

logger = mp.log_to_stderr(logging.DEBUG)

def worker(socket):
    while True:
        client, address = socket.accept()
        logger.debug("{u} connected".format(u=address))
        client.send("OK")
        client.close()
if __name__ == '__main__':
    num_workers = 5

    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.bind(('',9090))
    serversocket.listen(5)

    workers = [mp.Process(target=worker, args=(serversocket,)) for i in
            range(num_workers)]

    for p in workers:
        p.daemon = True
        p.start()

    while True:
        try:
            time.sleep(10)
        except:
            break

Я не эксперт, поэтому не могу дать реальное объяснение, но если вы хотите использовать очереди, вам нужно уменьшить дескриптор, а затем воссоздать его:

В вашем главном :

client, address = serversocket.accept()
client_handle = multiprocessing.reduction.reduce_handle(client.fileno())
socket_queue.put(client_handle)

И в вашем работнике:

clientHandle = queue.get()
file_descriptor = multiprocessing.reduction.rebuild_handle(client_handle)
clientsocket = socket.fromfd(file_descriptor, socket.AF_INET, socket.SOCK_STREAM)

Также

import multiprocessing.reduction

Это будет работать с вашим исходным кодом. Однако в настоящее время у меня возникают проблемы с закрытием сокетов в рабочих процессах после того, как они были созданы, как я описал.

Вот некоторый рабочий код на то, что упомянуто выше - https://gist.github.com/sunilmallya/4662837 многопроцессорная обработка.сокращение сокета сервера с родительской обработкой передачи соединений клиенту после принятия соединений