Многопроцессорная обработка и сокеты в Python
Я пытаюсь заставить многопроцессорную обработку и программирование сокетов работать вместе, но я застрял на этом этапе. Проблема в том, что я получаю эту ошибку:
File "multiprocesssockserv.py", line 11, in worker
clientsocket = socket.fromfd(clientfileno, socket.AF_INET, socket.SOCK_STREAM)
error: [Errno 9] Bad file descriptor
Полный код, вызывающий ошибку, выглядит следующим образом:
import multiprocessing as mp
import logging
import socket
logger = mp.log_to_stderr(logging.WARN)
def worker(queue):
while True:
clientfileno = queue.get()
print clientfileno
clientsocket = socket.fromfd(clientfileno, socket.AF_INET, socket.SOCK_STREAM)
clientsocket.recv()
clientsocket.send("Hello World")
clientsocket.close()
if __name__ == '__main__':
num_workers = 5
socket_queue = mp.Queue()
workers = [mp.Process(target=worker, args=(socket_queue,)) for i in
range(num_workers)]
for p in workers:
p.daemon = True
p.start()
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('',9090))
serversocket.listen(5)
while True:
client, address = serversocket.accept()
socket_queue.put(client.fileno())
Edit: я использую сокет.fromfd, потому что я не могу поместить сокеты в очередь :) мне нужен способ доступа к одним и тем же сокетам из разных процессов. Вот в чем суть моей проблемы.
3 ответа:
Поработав над этим некоторое время, я решил подойти к этой проблеме с другой стороны, и следующий метод, кажется, работает для меня.
import multiprocessing as mp import logging import socket import time logger = mp.log_to_stderr(logging.DEBUG) def worker(socket): while True: client, address = socket.accept() logger.debug("{u} connected".format(u=address)) client.send("OK") client.close() if __name__ == '__main__': num_workers = 5 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.bind(('',9090)) serversocket.listen(5) workers = [mp.Process(target=worker, args=(serversocket,)) for i in range(num_workers)] for p in workers: p.daemon = True p.start() while True: try: time.sleep(10) except: break
Я не эксперт, поэтому не могу дать реальное объяснение, но если вы хотите использовать очереди, вам нужно уменьшить дескриптор, а затем воссоздать его:
В вашем главном :
client, address = serversocket.accept() client_handle = multiprocessing.reduction.reduce_handle(client.fileno()) socket_queue.put(client_handle)
И в вашем работнике:
clientHandle = queue.get() file_descriptor = multiprocessing.reduction.rebuild_handle(client_handle) clientsocket = socket.fromfd(file_descriptor, socket.AF_INET, socket.SOCK_STREAM)
Также
import multiprocessing.reduction
Это будет работать с вашим исходным кодом. Однако в настоящее время у меня возникают проблемы с закрытием сокетов в рабочих процессах после того, как они были созданы, как я описал.
Вот некоторый рабочий код на то, что упомянуто выше - https://gist.github.com/sunilmallya/4662837 многопроцессорная обработка.сокращение сокета сервера с родительской обработкой передачи соединений клиенту после принятия соединений