Возвращаемое значение из потока
Как я могу получить поток, чтобы вернуть кортеж или любое значение по моему выбору обратно к родителю в Python?
11 ответов:
Я предлагаю вам создать очереди.Очередь перед запуском потока и передайте его как один из аргументов потока: до завершения потока он
.put
s результат в очереди, который он получил в качестве аргумента. Родитель может.get
или.get_nowait
это по желанию.очереди, как правило, лучший способ организовать синхронизацию потоков и связь в Python: они являются внутренне потокобезопасными, передающими сообщения транспортными средствами - лучший способ организовать многозадачность в генерал!- )
Если вы вызывали join (), чтобы дождаться завершения потока, вы можете просто прикрепить результат к самому экземпляру потока, а затем получить его из основного потока после возврата join ().
с другой стороны, вы не расскажите, как вы собираетесь обнаружить, что нить сделать и результат налицо. Если у вас уже есть способ сделать это, он, вероятно, укажет вам (и нам, если вы скажете нам) на лучший способ получения результатов из.
вы должны передать экземпляр очереди в качестве параметра, то вы должны .поместите () возвращаемый объект в очередь. Вы можете собрать возвращаемое значение через очередь.получить() любой объект, который вы поставили.
пример:
queue = Queue.Queue() thread_ = threading.Thread( target=target_method, name="Thread1", args=[params, queue], ) thread_.start() thread_.join() queue.get() def target_method(self, params, queue): """ Some operations right here """ your_return = "Whatever your object is" queue.put(your_return)
использовать для нескольких потоков:
#Start all threads in thread pool for thread in pool: thread.start() response = queue.get() thread_results.append(response) #Kill all threads for thread in pool: thread.join()
Я использую эту реализацию, и она отлично работает для меня. Я хочу, чтобы вы это сделали.
использовать лямда - чтобы обернуть целевую функцию потока и передать ее возвращаемое значение обратно в родительский поток с помощью очереди. (Исходная целевая функция остается неизменной без дополнительного параметра очереди.)
пример кода:
import threading import queue def dosomething(param): return param * 2 que = queue.Queue() thr = threading.Thread(target = lambda q, arg : q.put(dosomething(arg)), args = (que, 2)) thr.start() thr.join() while not que.empty(): print(que.get())
выход:
4
Я удивлен, что никто не упомянул, что вы можете просто передать его изменчивое:
>>> thread_return={'success': False} >>> from threading import Thread >>> def task(thread_return): ... thread_return['success'] = True ... >>> Thread(target=task, args=(thread_return,)).start() >>> thread_return {'success': True}
возможно, это имеет серьезные проблемы, о которых я не знаю.
другой подход заключается в передаче функции обратного вызова потоку. Это дает простой, безопасный и гибкий способ вернуть значение родителю в любое время из Нового потока.
# A sample implementation import threading import time class MyThread(threading.Thread): def __init__(self, cb): threading.Thread.__init__(self) self.callback = cb def run(self): for i in range(10): self.callback(i) time.sleep(1) # test import sys def count(x): print x sys.stdout.flush() t = MyThread(count) t.start()
вы можете использовать synchronised очереди модуль.
Рассмотрим, что вам нужно проверить информацию пользователя из базы данных с известным идентификатором:def check_infos(user_id, queue): result = send_data(user_id) queue.put(result)
теперь вы можете получить ваши данные следующим образом:
import queue, threading queued_request = queue.Queue() check_infos_thread = threading.Thread(target=check_infos, args=(user_id, queued_request)) check_infos_thread.start() final_result = queued_request.get()
POC:
import random import threading class myThread( threading.Thread ): def __init__( self, arr ): threading.Thread.__init__( self ) self.arr = arr self.ret = None def run( self ): self.myJob( self.arr ) def join( self ): threading.Thread.join( self ) return self.ret def myJob( self, arr ): self.ret = sorted( self.arr ) return #Call the main method if run from the command line. if __name__ == '__main__': N = 100 arr = [ random.randint( 0, 100 ) for x in range( N ) ] th = myThread( arr ) th.start( ) sortedArr = th.join( ) print "arr2: ", sortedArr
Ну, в модуле Python threading есть объекты условий, которые связаны с блокировками. Один метод
acquire()
вернет любое значение, возвращаемое из базового метода. Для получения дополнительной информации: Python Condition Objects
на основе предложения jcomeau_ictx. Самый простой, с которым я столкнулся. Требование здесь состояло в том, чтобы получить статус выхода staus из трех разных процессов, запущенных на сервере, и запустить другой скрипт, если все три будут успешными. Это, кажется, работает нормально
class myThread(threading.Thread): def __init__(self,threadID,pipePath,resDict): threading.Thread.__init__(self) self.threadID=threadID self.pipePath=pipePath self.resDict=resDict def run(self): print "Starting thread %s " % (self.threadID) if not os.path.exists(self.pipePath): os.mkfifo(self.pipePath) pipe_fd = os.open(self.pipePath, os.O_RDWR | os.O_NONBLOCK ) with os.fdopen(pipe_fd) as pipe: while True: try: message = pipe.read() if message: print "Received: '%s'" % message self.resDict['success']=message break except: pass tResSer={'success':'0'} tResWeb={'success':'0'} tResUisvc={'success':'0'} threads = [] pipePathSer='/tmp/path1' pipePathWeb='/tmp/path2' pipePathUisvc='/tmp/path3' th1=myThread(1,pipePathSer,tResSer) th2=myThread(2,pipePathWeb,tResWeb) th3=myThread(3,pipePathUisvc,tResUisvc) th1.start() th2.start() th3.start() threads.append(th1) threads.append(th2) threads.append(th3) for t in threads: print t.join() print "Res: tResSer %s tResWeb %s tResUisvc %s" % (tResSer,tResWeb,tResUisvc) # The above statement prints updated values which can then be further processed
следующая функция-оболочка обернет существующую функцию и вернет объект, который указывает как на поток (так что вы можете вызвать
start()
,join()
и т. д. на нем), а также доступ/просмотр его возможного возвращаемого значения.def threadwrap(func,args,kwargs): class res(object): result=None def inner(*args,**kwargs): res.result=func(*args,**kwargs) import threading t = threading.Thread(target=inner,args=args,kwargs=kwargs) res.thread=t return res def myFun(v,debug=False): import time if debug: print "Debug mode ON" time.sleep(5) return v*2 x=threadwrap(myFun,[11],{"debug":True}) x.thread.start() x.thread.join() print x.result
это выглядит нормально, и
threading.Thread
класс, кажется, легко расширяется (*) с такой функциональностью, поэтому мне интересно, почему его еще нет. Есть ли недостаток в приведенном выше методе?(*) обратите внимание, что ответ Хусану для этого вопрос делает именно это, подклассы
threading.Thread
в результате получается версия, гдеjoin()
дает возвращаемое значение.