12.31.2015

Python: Do Not Use Pool.map Method in multiprocessing Module

Python: multiprocessing モジュールの Pool.map を使ったときの罠

 

Pool.map を使った並行処理をバッチ処理などで実行した際、キーボードによる中断(KeyboardInterrupt)をすると
プロセスがハングアップすることがある。

コード
#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
    time.sleep(5)
    print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    return x

pool = Pool(2)
ret = pool.map(f, range(4))
print('[%s] result: %s' % (datetime.now(), ret))
通常の実行例
[2015-12-31 22:10:02.319094] start 0: PoolWorker-1
[2015-12-31 22:10:02.319267] start 1: PoolWorker-2
[2015-12-31 22:10:07.320538] end   1: PoolWorker-2
[2015-12-31 22:10:07.320538] end   0: PoolWorker-1
[2015-12-31 22:10:07.321345] start 2: PoolWorker-2
[2015-12-31 22:10:07.321434] start 3: PoolWorker-1
[2015-12-31 22:10:12.321765] end   2: PoolWorker-2
[2015-12-31 22:10:12.321765] end   3: PoolWorker-1
[2015-12-31 22:10:12.322721] result: [0, 1, 2, 3]
Ctrl-C を押した場合
[2015-12-31 22:05:42.108238] start 0: PoolWorker-1
[2015-12-31 22:05:42.109395] start 1: PoolWorker-2
^CProcess PoolWorker-1:
Process PoolWorker-2:
Traceback (most recent call last):
Traceback (most recent call last):
(snip)
    return map(*args)
    time.sleep(5)
  File "./x.py", line 9, in f
KeyboardInterrupt
    time.sleep(5)
KeyboardInterrupt
[2015-12-31 22:05:42.631444] start 2: PoolWorker-3
[2015-12-31 22:05:42.632162] start 3: PoolWorker-4
[2015-12-31 22:05:47.632713] end   3: PoolWorker-4
[2015-12-31 22:05:47.632664] end   2: PoolWorker-3

プログラムが停止せず、kill コマンドでプロセスを終了する必要に迫られる。
何度 Ctrl-C を押しても駄目である。

回避策1

try 節で KeyboardInterrupt をトラップすれば、プログラムのハングアップは防げる。
しかし、中断した後もプログラムが続行してしまう。

#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    try:
        print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
        time.sleep(5)
        print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    except KeyboardInterrupt:
        pass
    return x

pool = Pool(2)
ret = pool.map(f, range(4))
print('[%s] result: %s' % (datetime.now(), ret))

実行例

[2015-12-31 22:21:41.647775] start 0: PoolWorker-1
[2015-12-31 22:21:41.647928] start 1: PoolWorker-2
^C[2015-12-31 22:21:42.716623] start 2: PoolWorker-2
[2015-12-31 22:21:42.716755] start 3: PoolWorker-1
[2015-12-31 22:21:47.717815] end   2: PoolWorker-2
[2015-12-31 22:21:47.717832] end   3: PoolWorker-1
Traceback (most recent call last):
(snip)
    waiter.acquire()
KeyboardInterrupt
回避策2

map の代わりに map_async を使えば、問題を回避できる。この場合は即座にプログラムが終了する。
ただし、結果を利用する際には get メソッドとともにタイムアウトの時間(秒数)を指定する必要がある。

#!/usr/bin/env python

from multiprocessing import Pool, current_process
import time
from datetime import datetime

def f(x):
    print('[%s] start %d: %s' % (datetime.now(), x, current_process().name))
    time.sleep(5)
    print('[%s] end   %d: %s' % (datetime.now(), x, current_process().name))
    return x

pool = Pool(2)
p = pool.map_async(f, range(4))
ret = p.get(86400)
print('[%s] result: %s' % (datetime.now(), ret))

実行例

[2015-12-31 22:24:32.482984] start 0: PoolWorker-1
[2015-12-31 22:24:32.483816] start 1: PoolWorker-2
^CTraceback (most recent call last):
(snip)
    time.sleep(5)
KeyboardInterrupt

 

References

12.21.2015

color-ssh: Runs Remote Commands, Colorfully!

color-ssh: カラフルなリモートコマンドの実行ツール

 

color-ssh というコマンドライン・ツールを作った。

 

インストール

# pip install color-ssh
  • 環境によっては「sudo」コマンドが必要
  • バージョンアップする場合は、「pip install --upgrade color-ssh」

 

「color-cat」と「color-ssh」という 2個のコマンドがインストールされる。
以下に、基本的な使い方を説明する。

$ color-cat --version
color-cat 0.1.0
$ color-ssh --version
color-ssh 0.1.0

 

color-cat

UNIX/Linux の cat コマンドにターミナル上で色を付けるだけのツール。

x

  • 引数なしで実行すれば、標準入力から読み取った文字列が、ターミナルに赤色で表示される。
  • 「-l」オプションでラベルを指定。各行の左側に反転色で表示される。
    • ラベルを変えれば色も変わる。
      ラベルの文字列によって、色が一意に決まるというのが最大の特徴。
  • 「-c」オプションで色を直接指定することも可能。
  • 「-s」オプションでセパレータ文字列も変えられる。
  • もちろん、通常の cat コマンド同様、引数にファイルパスを指定すれば、その内容を全て表示する。
  • 「color-cat --help」でヘルプを表示。

 

color-ssh

ssh コマンドの出力を色付きで表示するためのツール。
内部では color-cat コマンドが実行されている。

x

  • ssh コマンドを使ってリモートサーバ上でコマンドを実行し、その出力を色付きで表示する。
  • サーバの名前が color-cat のラベルとして利用される。そのため同じサーバは同じ色で出力される。
  • 標準エラー出力はセパレータが「+」に変わる。
  • 複数のサーバ上で、同じコマンドを同時に実行することが可能。(parallel-ssh と同じような機能)
    • ホスト名のリストは、「-h」オプションでファイルから読み込むか、「-H」オプションで文字列として指定する。
  • 「-p」オプションで多重度を制限可能。
  • 「color-ssh --help」でヘルプを表示。

 

コマンドライン引数の「分配」

「--distribute」オプションを指定すれば、引数を各サーバに分割・分配して実行することができる。
(これをやりたかった)

x

上記の例では、「a」「b」「c」「d」「e」という 5個の引数を、各サーバにほぼ均等になるように割り当てて、そのコマンドを並行実行する。

server-1 では「echo xxx a b c」
server-2 では「echo xxx d e」

というコマンドが同時に実行されている。

この機能は、多数の入力ファイルと複数のサーバがある場合、手軽に仕事を分割するのに役立つと思う。

12.18.2015

How to Handle Binary-data stdin/stdout and Command-line Arguments in Python3

Python: 標準入出力およびコマンドライン引数でバイナリデータを取り扱う方法

 

Python2 の場合

str = bytes なので、普通に書けばバイナリデータにも対応できる。

import sys

sys.stdout.write('### sys.argv\n')
for arg in sys.argv:
    sys.stdout.write(arg)
    sys.stdout.write('\n')

sys.stderr.write('### sys.stdin\n')
for line in iter(sys.stdin.readline, ''):
    sys.stderr.write(line.rstrip())
    sys.stderr.write('\n')
  • 実行例 (?? の箇所は表示非対応)
$ echo $'abc\nあいう\n\xff\xfe' | python2 ./bin_stdin.py abc あいう $'\xff\xfe'
### sys.argv
./bin_stdin.py
abc
あいう
??
### sys.stdin
abc
あいう
??

 

Python3 の場合

上記のコードはエンコードエラーになる。
これは、Python3 の sys.stdout.write が str = unicode を引数に取るため。

$ echo $'abc\nあいう\n\xff\xfe' | python3 ./bin_stdin.py abc あいう $'\xff\xfe'
### sys.argv
./bin_stdin.py
abc
あいう
Traceback (most recent call last):
  File "./bin_stdin.py", line 5, in
    sys.stdout.write(arg)
UnicodeEncodeError: 'utf-8' codec can't encode character '\udcff' in position 0: surrogates not allowed

 

ポイント
  • 1. sys.stdXXX.buffer を利用する
  • 2. sys.args の要素を os.fsencode でエンコードする
    • ただしOptionParserなどのパース処理はエンコード前に適用しないと正しく処理されない模様

以下のようなコードを書けば、Python2/3 両方に対応できる。 (flush は必ずしも必須ではない)

import sys
import os

PY3 = sys.version_info >= (3, )

stdin = sys.stdin.buffer if PY3 else sys.stdin
stdout = sys.stdout.buffer if PY3 else sys.stdout
stderr = sys.stderr.buffer if PY3 else sys.stderr

stdout.write(b'### sys.argv\n')
for arg in sys.argv:
    stdout.write(os.fsencode(arg) if PY3 else arg)
    stdout.write(b'\n')
    stdout.flush()

stderr.write(b'### sys.stdin\n')
for line in iter(stdin.readline, b''):
    stderr.write(line.rstrip())
    stderr.write(b'\n')
    stderr.flush()
  • 実行例
$ echo $'abc\nあいう\n\xff\xfe' | python3 ./bin_stdin.py abc あいう $'\xff\xfe'
### sys.argv
./bin_stdin.py
abc
あいう
??
### sys.stdin
abc
あいう
??

12.17.2015

How to Color the Output from SSH Commands

SSH の出力をカラフルにする方法

 

SSH に限ったことではないが、ターミナルに表示されるテキストを目的に応じて着色したい場合、
パイプラインと sed を使うのが一番簡単だ。

$ ssh server-1 'python -c "import this"' \
  | sed -e $'s/^\\(.*\\)$/\e[96mserver-1\e[0m|\e[36m\\1\e[0m/'

出力は以下のようになる。
x

\e[96m や \e[36m の部分を変更すれば、色が変わる。

この応用として、複数のサーバに対して SSH をバックグラウンドで実行し(コマンドラインの末尾に「&」を付ける)
最後に wait コマンドで処理を待つようなシェルを書けば、サーバに応じた色のアウトプットをリアルタイムに観察できる。
y

12.16.2015

Manipulating Many Servers by Using parallel-ssh

parallel-ssh を使って複数サーバを同時に操作する

 

tl; dr

  • 2台〜100台程度のサーバに対して、同じオペレーションを同時に実行したい
  • parallel-ssh (pssh) の各種コマンドを使うと便利
  • ただし、リモートサーバで sudo が必要な場合は Ansible 等の他ツールが必要

 

parallel-ssh のセットアップ

 

インストール

parallel-ssh - PSSH: Parallel SSH Tools - Google Project Hosting
  • Mac
    $ brew install pssh
  • RHEL系
    # yum install pssh
ホストリストの準備
  • 任意のパスにテキストファイルを作成
    server-1
    server-2
    server-3 root
  • 1行ずつ、接続先サーバのホスト名またはIPアドレスを記述
  • ログインユーザの指定も可能

 

目的別コマンド実行例

 

SSH 認証のための設定

parallel-ssh を使うには、各サーバに SSH で接続できることが前提となる。
ssh-copy-id コマンドを利用して、~/.ssh/id_rsa の内容を登録する場合は以下。

$ for h in $(cat ~/hosts); do ssh-copy-id $h; done

 

pssh の使い方
  • 基本形
    $ pssh -h ~/hosts -i 'hostname'
    [1] 04:09:13 [SUCCESS] server-1
    server-1
    [2] 04:09:13 [SUCCESS] server-2
    server-2
  • ホスト名を直接指定
    $ pssh -H server-1 -H server-2 -i 'hostname'
    [1] 04:09:13 [SUCCESS] server-1
    server-1
    [2] 04:09:13 [SUCCESS] server-2
    server-2
  • タイムアウトを無制限に
    $ pssh -h ~/hosts -t 0 -i 'hostname'
    [1] 04:09:13 [SUCCESS] server-1
    server-1
    [2] 04:09:13 [SUCCESS] server-2
    server-2
  • アウトプットをすぐに表示
    $ pssh -h ~/hosts -P 'hostname'
    server-1: server-1
    [1] 04:13:28 [SUCCESS] server-1
    server-2: server-2
    [2] 04:13:28 [SUCCESS] server-2
    
  • リモートでバックグラウンド実行
    $ pssh -h ~/hosts -i 'nohup your_command > /path/to/output 2>&1 </dev/null &'
    [1] 04:13:28 [SUCCESS] server-1
    [2] 04:13:28 [SUCCESS] server-2
    
    バッファの影響で、標準出力の出力先ファイルがリアルタイムに更新されない場合がある。
    対応にはハックが必要: linux - redirect nohup stdout and flush - Stack Overflow

 

ファイル転送
  • ローカルからリモートに転送
    $ prsync -h ~/hosts -av /data/input/ /data/input/
    • パスの末尾にスラッシュを付けると、ディレクトリ間の同期となる。(rsync の仕様)
    • -z (圧縮オプション) は付けると CPU がボトルネックになって大幅に性能が劣化する場合がある。ナローバンドでなければ、付けないほうが良さそうだ。
  • リモートからローカルに転送
    $ pssh -h ~/hosts -p 4 -i 'rsync -e "ssh -o StrictHostKeyChecking=no" -av \
    /data/output/ server-1:/data/output/'
    • 全サーバから server-1 に集約する例
    • -p オプションを指定して、並列度を制限している。
      こうしないと、コネクションが多すぎて接続できない場合がある。
    • rsync -e "ssh -o ..." という書式で SSH オプションを指定。
      ここではホストキーのチェックを回避している。

 

リモートサーバで sudo が必要な場合
  • Ansible で代用
    $ ansible -i ~/hosts all --sudo -K \
    -a 'bash -c "cd /path/to/your/app && make install"'
    

 

References

12.08.2015

Python: How to Execute Parallel Processing using multiprocessing.Pool

Python: multiprocessing.Pool を使った並列処理の実行

 

Python で並列処理を行う場合、自分でスレッドを書くこともできるが、グローバルインタプリタロック (GIL) の制約を受けて意図しない結果に陥ることが少なくない。

multithreading モジュールの Pool を利用するのが最も簡単かつ安全である。

 

基本形

引数を1個取る関数を定義し、Pool#map を実行する。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(x):
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d' % (proc_name, datetime.now(), x))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d' % (proc_name, datetime.now(), x))
    return x * x

pool = Pool(4)
print(pool.map(f, range(8)))

尚、Python 3.4 以降なら with 句を使った表現もできる。

実行結果は以下のようになるはずだ。

(PoolWorker-1) [2015-12-08 01:49:03.666114] Started: x=0
(PoolWorker-2) [2015-12-08 01:49:03.666246] Started: x=1
(PoolWorker-3) [2015-12-08 01:49:03.666400] Started: x=2
(PoolWorker-4) [2015-12-08 01:49:03.666620] Started: x=3
(PoolWorker-1) [2015-12-08 01:49:04.667421] Ended  : x=0
(PoolWorker-3) [2015-12-08 01:49:04.667421] Ended  : x=2
(PoolWorker-2) [2015-12-08 01:49:04.667423] Ended  : x=1
(PoolWorker-4) [2015-12-08 01:49:04.667511] Ended  : x=3
(PoolWorker-3) [2015-12-08 01:49:04.668070] Started: x=4
(PoolWorker-2) [2015-12-08 01:49:04.668151] Started: x=5
(PoolWorker-1) [2015-12-08 01:49:04.668282] Started: x=6
(PoolWorker-4) [2015-12-08 01:49:04.668382] Started: x=7
(PoolWorker-3) [2015-12-08 01:49:05.669206] Ended  : x=4
(PoolWorker-2) [2015-12-08 01:49:05.669206] Ended  : x=5
(PoolWorker-4) [2015-12-08 01:49:05.669218] Ended  : x=7
(PoolWorker-1) [2015-12-08 01:49:05.669230] Ended  : x=6
[0, 1, 4, 9, 16, 25, 36, 49]

実行中はプロセスがフォークされているので、特にメモリ使用量は気にかける必要がある。

 

複数のパラメータを取る

タプルを渡して、ワーカー側で分解するのが常套手段。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(args):
    x, y = args
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return x * y

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

実行例

(PoolWorker-1) [2015-12-08 01:55:01.700005] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 01:55:01.700078] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 01:55:01.700196] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 01:55:01.700463] Started: x=3, y=13
(PoolWorker-3) [2015-12-08 01:55:02.701283] Ended  : x=2, y=12
(PoolWorker-2) [2015-12-08 01:55:02.701275] Ended  : x=1, y=11
(PoolWorker-1) [2015-12-08 01:55:02.701261] Ended  : x=0, y=10
(PoolWorker-4) [2015-12-08 01:55:02.701293] Ended  : x=3, y=13
(PoolWorker-3) [2015-12-08 01:55:02.701914] Started: x=4, y=14
(PoolWorker-1) [2015-12-08 01:55:02.702046] Started: x=5, y=15
(PoolWorker-2) [2015-12-08 01:55:02.702165] Started: x=6, y=16
(PoolWorker-4) [2015-12-08 01:55:02.702361] Started: x=7, y=17
(PoolWorker-1) [2015-12-08 01:55:03.703123] Ended  : x=5, y=15
(PoolWorker-3) [2015-12-08 01:55:03.703106] Ended  : x=4, y=14
(PoolWorker-4) [2015-12-08 01:55:03.703131] Ended  : x=7, y=17
(PoolWorker-2) [2015-12-08 01:55:03.703140] Ended  : x=6, y=16
[0, 11, 24, 39, 56, 75, 96, 119]

 

ワーカーのスタックトレースを出力する

何らかの例外が発生する場合を考える。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(args):
    x, y = args
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return y / x

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

そのままだと、スタックトレースの内容が分かりづらい。

(PoolWorker-1) [2015-12-08 01:59:54.088732] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 01:59:54.088835] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 01:59:54.088960] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 01:59:54.089304] Started: x=3, y=13
(PoolWorker-3) [2015-12-08 01:59:55.090067] Ended  : x=2, y=12
(PoolWorker-2) [2015-12-08 01:59:55.090095] Ended  : x=1, y=11
(PoolWorker-1) [2015-12-08 01:59:55.090049] Ended  : x=0, y=10
(PoolWorker-4) [2015-12-08 01:59:55.090160] Ended  : x=3, y=13
(PoolWorker-2) [2015-12-08 01:59:55.090750] Started: x=4, y=14
(PoolWorker-3) [2015-12-08 01:59:55.090926] Started: x=5, y=15
(PoolWorker-4) [2015-12-08 01:59:55.091020] Started: x=6, y=16
(PoolWorker-1) [2015-12-08 01:59:55.091336] Started: x=7, y=17
Traceback (most recent call last):
  File "xxx.py", line 14, in
    print(pool.map(f, zip(range(8), range(10, 18))))
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
ZeroDivisionError: integer division or modulo by zero

デコレータでスタックトレースを表示してみる。副産物として、関数 f が複数のパラメータを取れるようになった。

import time
from datetime import datetime
from multiprocessing import Pool, current_process
import traceback

def with_stacktrace(func):
    import functools

    @functools.wraps(func)
    def wrapper(args):
        try:
            return func(*args)
        except:
            proc_name = current_process().name
            for line in traceback.format_exc().splitlines():
                print('[TRACE:%s] %s' % (proc_name, line))
            raise
    return wrapper

@with_stacktrace
def f(x, y):
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return y / x

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

実行例

(PoolWorker-1) [2015-12-08 02:31:36.959575] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 02:31:36.959657] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 02:31:36.959721] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 02:31:36.960174] Started: x=3, y=13
(PoolWorker-4) [2015-12-08 02:31:37.960911] Ended  : x=3, y=13
(PoolWorker-2) [2015-12-08 02:31:37.960904] Ended  : x=1, y=11
(PoolWorker-3) [2015-12-08 02:31:37.960920] Ended  : x=2, y=12
(PoolWorker-1) [2015-12-08 02:31:37.960888] Ended  : x=0, y=10
(PoolWorker-2) [2015-12-08 02:31:37.961423] Started: x=4, y=14
(PoolWorker-3) [2015-12-08 02:31:37.961634] Started: x=5, y=15
(PoolWorker-4) [2015-12-08 02:31:37.961724] Started: x=6, y=16
[TRACE:PoolWorker-1] Traceback (most recent call last):
[TRACE:PoolWorker-1]   File "xxx.py", line 9, in f
[TRACE:PoolWorker-1]     return func(*args)
[TRACE:PoolWorker-1]   File "xxx.py", line 23, in f
[TRACE:PoolWorker-1]     return y / x
[TRACE:PoolWorker-1] ZeroDivisionError: integer division or modulo by zero
(PoolWorker-1) [2015-12-08 02:31:37.962384] Started: x=7, y=17
Traceback (most recent call last):
  File "xxx.py", line 26, in
    print(pool.map(f, zip(range(8), range(10, 18))))
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
ZeroDivisionError: integer division or modulo by zero

これでデバッグが捗る。

 

 

References