Python: как я могу запускать функции python параллельно?


Я исследовал сначала и не мог найти ответ на мой вопрос. Я пытаюсь запустить несколько функций параллельно в Python.

у меня что-то вроде этого:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:folder1'
dir2 = 'C:folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Я хочу вызвать func1 и func2 и запустить их одновременно. Функции не взаимодействуют друг с другом или с одним и тем же объектом. Прямо сейчас я должен ждать func1, чтобы закончить, прежде чем func2, чтобы начать. Как мне сделать что-то вроде ниже:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Я хочу быть в состоянии создайте оба каталога довольно близко к тому же времени, потому что каждую минуту я подсчитываю, сколько файлов создается. Если каталога там нет, он сбросит мое время.

3 55

3 ответа:

вы могли бы использовать threading или multiprocessing.

из-за особенности CPython,threading вряд ли удастся достичь истинного параллелизма. По этой причине, multiprocessing как правило, лучше ставка.

вот пример:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

механика запуска / присоединения дочерних процессов может быть легко инкапсулирована в функцию по линиям вашего runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

нет никакого способа гарантировать, что две функции будут выполняться синхронно друг с другом, что, кажется, то, что вы хотите сделать.

лучшее, что вы можете сделать, это разделить функцию на несколько шагов, а затем дождаться завершения обоих в критических точках синхронизации с помощью Process.join как упоминает ответ @aix.

это лучше, чем time.sleep(10) потому что вы не можете гарантировать точные сроки. С явным ожиданием вы говорите, что функции должны быть выполнены этот шаг перед переходом к следующему, вместо того, чтобы предполагать, что это будет сделано в течение 10 мс, что не гарантируется на основе того, что еще происходит на машине.

Если вы являетесь пользователем windows и используете python 3, то этот пост поможет вам сделать параллельное программирование на python.когда вы запускаете обычное программирование пула многопроцессорной библиотеки, вы получите ошибку относительно основной функции в вашей программе. Это связано с тем, что windows не имеет функции fork (). Ниже Сообщение дает решение указанной проблемы .

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Так как я использовал python 3, я изменил программу немного так:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

после этой функции приведенный выше код проблемы также немного изменяется следующим образом:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

и я получил выход как:

[1, 8, 27, 64, 125, 216]

Я думаю, что этот пост может быть полезным для некоторых пользователей Windows.