Python: multiprocessing.Pool を使った並列処理の実行
Python で並列処理を行う場合、自分でスレッドを書くこともできるが、グローバルインタプリタロック (GIL) の制約を受けて意図しない結果に陥ることが少なくない。
multithreading モジュールの Pool を利用するのが最も簡単かつ安全である。
基本形
引数を1個取る関数を定義し、Pool#map を実行する。
import time from datetime import datetime from multiprocessing import Pool, current_process def f(x): proc_name = current_process().name print('(%s) [%s] Started: x=%d' % (proc_name, datetime.now(), x)) time.sleep(1) print('(%s) [%s] Ended : x=%d' % (proc_name, datetime.now(), x)) return x * x pool = Pool(4) print(pool.map(f, range(8)))
尚、Python 3.4 以降なら with 句を使った表現もできる。
実行結果は以下のようになるはずだ。
(PoolWorker-1) [2015-12-08 01:49:03.666114] Started: x=0 (PoolWorker-2) [2015-12-08 01:49:03.666246] Started: x=1 (PoolWorker-3) [2015-12-08 01:49:03.666400] Started: x=2 (PoolWorker-4) [2015-12-08 01:49:03.666620] Started: x=3 (PoolWorker-1) [2015-12-08 01:49:04.667421] Ended : x=0 (PoolWorker-3) [2015-12-08 01:49:04.667421] Ended : x=2 (PoolWorker-2) [2015-12-08 01:49:04.667423] Ended : x=1 (PoolWorker-4) [2015-12-08 01:49:04.667511] Ended : x=3 (PoolWorker-3) [2015-12-08 01:49:04.668070] Started: x=4 (PoolWorker-2) [2015-12-08 01:49:04.668151] Started: x=5 (PoolWorker-1) [2015-12-08 01:49:04.668282] Started: x=6 (PoolWorker-4) [2015-12-08 01:49:04.668382] Started: x=7 (PoolWorker-3) [2015-12-08 01:49:05.669206] Ended : x=4 (PoolWorker-2) [2015-12-08 01:49:05.669206] Ended : x=5 (PoolWorker-4) [2015-12-08 01:49:05.669218] Ended : x=7 (PoolWorker-1) [2015-12-08 01:49:05.669230] Ended : x=6 [0, 1, 4, 9, 16, 25, 36, 49]
実行中はプロセスがフォークされているので、特にメモリ使用量は気にかける必要がある。
複数のパラメータを取る
タプルを渡して、ワーカー側で分解するのが常套手段。
import time from datetime import datetime from multiprocessing import Pool, current_process def f(args): x, y = args proc_name = current_process().name print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y)) time.sleep(1) print('(%s) [%s] Ended : x=%d, y=%d' % (proc_name, datetime.now(), x, y)) return x * y pool = Pool(4) print(pool.map(f, zip(range(8), range(10, 18))))
実行例
(PoolWorker-1) [2015-12-08 01:55:01.700005] Started: x=0, y=10 (PoolWorker-2) [2015-12-08 01:55:01.700078] Started: x=1, y=11 (PoolWorker-3) [2015-12-08 01:55:01.700196] Started: x=2, y=12 (PoolWorker-4) [2015-12-08 01:55:01.700463] Started: x=3, y=13 (PoolWorker-3) [2015-12-08 01:55:02.701283] Ended : x=2, y=12 (PoolWorker-2) [2015-12-08 01:55:02.701275] Ended : x=1, y=11 (PoolWorker-1) [2015-12-08 01:55:02.701261] Ended : x=0, y=10 (PoolWorker-4) [2015-12-08 01:55:02.701293] Ended : x=3, y=13 (PoolWorker-3) [2015-12-08 01:55:02.701914] Started: x=4, y=14 (PoolWorker-1) [2015-12-08 01:55:02.702046] Started: x=5, y=15 (PoolWorker-2) [2015-12-08 01:55:02.702165] Started: x=6, y=16 (PoolWorker-4) [2015-12-08 01:55:02.702361] Started: x=7, y=17 (PoolWorker-1) [2015-12-08 01:55:03.703123] Ended : x=5, y=15 (PoolWorker-3) [2015-12-08 01:55:03.703106] Ended : x=4, y=14 (PoolWorker-4) [2015-12-08 01:55:03.703131] Ended : x=7, y=17 (PoolWorker-2) [2015-12-08 01:55:03.703140] Ended : x=6, y=16 [0, 11, 24, 39, 56, 75, 96, 119]
ワーカーのスタックトレースを出力する
何らかの例外が発生する場合を考える。
import time from datetime import datetime from multiprocessing import Pool, current_process def f(args): x, y = args proc_name = current_process().name print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y)) time.sleep(1) print('(%s) [%s] Ended : x=%d, y=%d' % (proc_name, datetime.now(), x, y)) return y / x pool = Pool(4) print(pool.map(f, zip(range(8), range(10, 18))))
そのままだと、スタックトレースの内容が分かりづらい。
(PoolWorker-1) [2015-12-08 01:59:54.088732] Started: x=0, y=10 (PoolWorker-2) [2015-12-08 01:59:54.088835] Started: x=1, y=11 (PoolWorker-3) [2015-12-08 01:59:54.088960] Started: x=2, y=12 (PoolWorker-4) [2015-12-08 01:59:54.089304] Started: x=3, y=13 (PoolWorker-3) [2015-12-08 01:59:55.090067] Ended : x=2, y=12 (PoolWorker-2) [2015-12-08 01:59:55.090095] Ended : x=1, y=11 (PoolWorker-1) [2015-12-08 01:59:55.090049] Ended : x=0, y=10 (PoolWorker-4) [2015-12-08 01:59:55.090160] Ended : x=3, y=13 (PoolWorker-2) [2015-12-08 01:59:55.090750] Started: x=4, y=14 (PoolWorker-3) [2015-12-08 01:59:55.090926] Started: x=5, y=15 (PoolWorker-4) [2015-12-08 01:59:55.091020] Started: x=6, y=16 (PoolWorker-1) [2015-12-08 01:59:55.091336] Started: x=7, y=17 Traceback (most recent call last): File "xxx.py", line 14, in print(pool.map(f, zip(range(8), range(10, 18)))) File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get() File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get raise self._value ZeroDivisionError: integer division or modulo by zero
デコレータでスタックトレースを表示してみる。副産物として、関数 f が複数のパラメータを取れるようになった。
import time from datetime import datetime from multiprocessing import Pool, current_process import traceback def with_stacktrace(func): import functools @functools.wraps(func) def wrapper(args): try: return func(*args) except: proc_name = current_process().name for line in traceback.format_exc().splitlines(): print('[TRACE:%s] %s' % (proc_name, line)) raise return wrapper @with_stacktrace def f(x, y): proc_name = current_process().name print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y)) time.sleep(1) print('(%s) [%s] Ended : x=%d, y=%d' % (proc_name, datetime.now(), x, y)) return y / x pool = Pool(4) print(pool.map(f, zip(range(8), range(10, 18))))
実行例
(PoolWorker-1) [2015-12-08 02:31:36.959575] Started: x=0, y=10 (PoolWorker-2) [2015-12-08 02:31:36.959657] Started: x=1, y=11 (PoolWorker-3) [2015-12-08 02:31:36.959721] Started: x=2, y=12 (PoolWorker-4) [2015-12-08 02:31:36.960174] Started: x=3, y=13 (PoolWorker-4) [2015-12-08 02:31:37.960911] Ended : x=3, y=13 (PoolWorker-2) [2015-12-08 02:31:37.960904] Ended : x=1, y=11 (PoolWorker-3) [2015-12-08 02:31:37.960920] Ended : x=2, y=12 (PoolWorker-1) [2015-12-08 02:31:37.960888] Ended : x=0, y=10 (PoolWorker-2) [2015-12-08 02:31:37.961423] Started: x=4, y=14 (PoolWorker-3) [2015-12-08 02:31:37.961634] Started: x=5, y=15 (PoolWorker-4) [2015-12-08 02:31:37.961724] Started: x=6, y=16 [TRACE:PoolWorker-1] Traceback (most recent call last): [TRACE:PoolWorker-1] File "xxx.py", line 9, in f [TRACE:PoolWorker-1] return func(*args) [TRACE:PoolWorker-1] File "xxx.py", line 23, in f [TRACE:PoolWorker-1] return y / x [TRACE:PoolWorker-1] ZeroDivisionError: integer division or modulo by zero (PoolWorker-1) [2015-12-08 02:31:37.962384] Started: x=7, y=17 Traceback (most recent call last): File "xxx.py", line 26, in print(pool.map(f, zip(range(8), range(10, 18)))) File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get() File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get raise self._value ZeroDivisionError: integer division or modulo by zero
これでデバッグが捗る。
0 件のコメント:
コメントを投稿