Python: multiprocessing.Pool を使った並列処理の実行
Python で並列処理を行う場合、自分でスレッドを書くこともできるが、グローバルインタプリタロック (GIL) の制約を受けて意図しない結果に陥ることが少なくない。
multithreading モジュールの Pool を利用するのが最も簡単かつ安全である。
基本形
引数を1個取る関数を定義し、Pool#map を実行する。
import time
from datetime import datetime
from multiprocessing import Pool, current_process
def f(x):
proc_name = current_process().name
print('(%s) [%s] Started: x=%d' % (proc_name, datetime.now(), x))
time.sleep(1)
print('(%s) [%s] Ended : x=%d' % (proc_name, datetime.now(), x))
return x * x
pool = Pool(4)
print(pool.map(f, range(8)))
尚、Python 3.4 以降なら with 句を使った表現もできる。
実行結果は以下のようになるはずだ。
(PoolWorker-1) [2015-12-08 01:49:03.666114] Started: x=0
(PoolWorker-2) [2015-12-08 01:49:03.666246] Started: x=1
(PoolWorker-3) [2015-12-08 01:49:03.666400] Started: x=2
(PoolWorker-4) [2015-12-08 01:49:03.666620] Started: x=3
(PoolWorker-1) [2015-12-08 01:49:04.667421] Ended : x=0
(PoolWorker-3) [2015-12-08 01:49:04.667421] Ended : x=2
(PoolWorker-2) [2015-12-08 01:49:04.667423] Ended : x=1
(PoolWorker-4) [2015-12-08 01:49:04.667511] Ended : x=3
(PoolWorker-3) [2015-12-08 01:49:04.668070] Started: x=4
(PoolWorker-2) [2015-12-08 01:49:04.668151] Started: x=5
(PoolWorker-1) [2015-12-08 01:49:04.668282] Started: x=6
(PoolWorker-4) [2015-12-08 01:49:04.668382] Started: x=7
(PoolWorker-3) [2015-12-08 01:49:05.669206] Ended : x=4
(PoolWorker-2) [2015-12-08 01:49:05.669206] Ended : x=5
(PoolWorker-4) [2015-12-08 01:49:05.669218] Ended : x=7
(PoolWorker-1) [2015-12-08 01:49:05.669230] Ended : x=6
[0, 1, 4, 9, 16, 25, 36, 49]
実行中はプロセスがフォークされているので、特にメモリ使用量は気にかける必要がある。
複数のパラメータを取る
タプルを渡して、ワーカー側で分解するのが常套手段。
import time
from datetime import datetime
from multiprocessing import Pool, current_process
def f(args):
x, y = args
proc_name = current_process().name
print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
time.sleep(1)
print('(%s) [%s] Ended : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
return x * y
pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))
実行例
(PoolWorker-1) [2015-12-08 01:55:01.700005] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 01:55:01.700078] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 01:55:01.700196] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 01:55:01.700463] Started: x=3, y=13
(PoolWorker-3) [2015-12-08 01:55:02.701283] Ended : x=2, y=12
(PoolWorker-2) [2015-12-08 01:55:02.701275] Ended : x=1, y=11
(PoolWorker-1) [2015-12-08 01:55:02.701261] Ended : x=0, y=10
(PoolWorker-4) [2015-12-08 01:55:02.701293] Ended : x=3, y=13
(PoolWorker-3) [2015-12-08 01:55:02.701914] Started: x=4, y=14
(PoolWorker-1) [2015-12-08 01:55:02.702046] Started: x=5, y=15
(PoolWorker-2) [2015-12-08 01:55:02.702165] Started: x=6, y=16
(PoolWorker-4) [2015-12-08 01:55:02.702361] Started: x=7, y=17
(PoolWorker-1) [2015-12-08 01:55:03.703123] Ended : x=5, y=15
(PoolWorker-3) [2015-12-08 01:55:03.703106] Ended : x=4, y=14
(PoolWorker-4) [2015-12-08 01:55:03.703131] Ended : x=7, y=17
(PoolWorker-2) [2015-12-08 01:55:03.703140] Ended : x=6, y=16
[0, 11, 24, 39, 56, 75, 96, 119]
ワーカーのスタックトレースを出力する
何らかの例外が発生する場合を考える。
import time
from datetime import datetime
from multiprocessing import Pool, current_process
def f(args):
x, y = args
proc_name = current_process().name
print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
time.sleep(1)
print('(%s) [%s] Ended : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
return y / x
pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))
そのままだと、スタックトレースの内容が分かりづらい。
(PoolWorker-1) [2015-12-08 01:59:54.088732] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 01:59:54.088835] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 01:59:54.088960] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 01:59:54.089304] Started: x=3, y=13
(PoolWorker-3) [2015-12-08 01:59:55.090067] Ended : x=2, y=12
(PoolWorker-2) [2015-12-08 01:59:55.090095] Ended : x=1, y=11
(PoolWorker-1) [2015-12-08 01:59:55.090049] Ended : x=0, y=10
(PoolWorker-4) [2015-12-08 01:59:55.090160] Ended : x=3, y=13
(PoolWorker-2) [2015-12-08 01:59:55.090750] Started: x=4, y=14
(PoolWorker-3) [2015-12-08 01:59:55.090926] Started: x=5, y=15
(PoolWorker-4) [2015-12-08 01:59:55.091020] Started: x=6, y=16
(PoolWorker-1) [2015-12-08 01:59:55.091336] Started: x=7, y=17
Traceback (most recent call last):
File "xxx.py", line 14, in
print(pool.map(f, zip(range(8), range(10, 18))))
File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
ZeroDivisionError: integer division or modulo by zero
デコレータでスタックトレースを表示してみる。副産物として、関数 f が複数のパラメータを取れるようになった。
import time
from datetime import datetime
from multiprocessing import Pool, current_process
import traceback
def with_stacktrace(func):
import functools
@functools.wraps(func)
def wrapper(args):
try:
return func(*args)
except:
proc_name = current_process().name
for line in traceback.format_exc().splitlines():
print('[TRACE:%s] %s' % (proc_name, line))
raise
return wrapper
@with_stacktrace
def f(x, y):
proc_name = current_process().name
print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
time.sleep(1)
print('(%s) [%s] Ended : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
return y / x
pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))
実行例
(PoolWorker-1) [2015-12-08 02:31:36.959575] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 02:31:36.959657] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 02:31:36.959721] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 02:31:36.960174] Started: x=3, y=13
(PoolWorker-4) [2015-12-08 02:31:37.960911] Ended : x=3, y=13
(PoolWorker-2) [2015-12-08 02:31:37.960904] Ended : x=1, y=11
(PoolWorker-3) [2015-12-08 02:31:37.960920] Ended : x=2, y=12
(PoolWorker-1) [2015-12-08 02:31:37.960888] Ended : x=0, y=10
(PoolWorker-2) [2015-12-08 02:31:37.961423] Started: x=4, y=14
(PoolWorker-3) [2015-12-08 02:31:37.961634] Started: x=5, y=15
(PoolWorker-4) [2015-12-08 02:31:37.961724] Started: x=6, y=16
[TRACE:PoolWorker-1] Traceback (most recent call last):
[TRACE:PoolWorker-1] File "xxx.py", line 9, in f
[TRACE:PoolWorker-1] return func(*args)
[TRACE:PoolWorker-1] File "xxx.py", line 23, in f
[TRACE:PoolWorker-1] return y / x
[TRACE:PoolWorker-1] ZeroDivisionError: integer division or modulo by zero
(PoolWorker-1) [2015-12-08 02:31:37.962384] Started: x=7, y=17
Traceback (most recent call last):
File "xxx.py", line 26, in
print(pool.map(f, zip(range(8), range(10, 18))))
File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
ZeroDivisionError: integer division or modulo by zero
これでデバッグが捗る。
References