Pythonのsubprocessで標準出力をリアルタイムに取得しつつthreading.Timerとpsutilでタイムアウトを実装してみる
Pythonのsubprocessで何かコマンドを実行する際にタイムアウト処理を実装したいことがあると思います。subprocessはrun()、Popen.communicate()、call()、check_call()、check_output()でタイムアウトの秒数を設定できますが、どれも子プロセスの標準出力をリアルタイムで取得することはできません。
run()でのtimeout値の指定は内部的にはcommunicate()の使用と同じで、check_output()はrun()でcheck=Trueにすれば同じことができます。それらはコマンドが終了するまで標準出力の結果を受け取れません。そしてcall()、check_call()は標準出力を取得する目的の関数ではなく、普通に使うとコマンドの実行結果がコマンドの終了を待たずリアルタイムにシェルに出力されますが、それは内部的に使用されているPopenでstdout/stderrに何も指定されていないためです。そんな感じでタイムアウトをサポートしたsubprocessの関数は標準出力のリアルタイム取得には活かせそうにありません。なお後述しますがPopen.wait()のタイムアウトについては標準出力のリアルタイム取得とは相性が良くないです。
この記事ではsubprocess.Popenを使用して標準出力をリアルタイムに取得することを前提とした上で、タイムアウトによる子プロセスのkillを実装してみます。
タイムアウトのタイマーはthreadingモジュールのTimerを用いて実装します。デコレータでタイムアウトが実装できるwrapt-timeout-decoratorも使えることは確認しましたが、どうもWindows環境では書き方が制限されるっぽいのと、Timerの書き方の方が個人的に好みなのでTimerを使うことにしました。
プロセスのkillについてはpsutilを使用します。これはkillの書き方や子孫プロセスのkillまでを考えた時にOS(UNIX、Windows)の違いやexecコマンドの有無等で悩まなくて良いようにするためです。
subprocess.Popen.kill()では子孫プロセスまでkillすることができないため、タイムアウトを実装するならpsutilを使うのが楽です。この記事ではpsutilのchildren()で子孫プロセスの取得を体験するためにsubprocessにshell=Trueを設定しています。
環境は Windows 10、Python 3.12.1
sub.py (出力テスト用)
まずテスト用に以下のようなsub.pyを用意しました。printで出力したら1秒待って繰り返すだけです。subprocessを使用してリアルタイムに標準出力を受け取れるようにprintではflush=Trueを設定しています。
import time
for i in range(0, 10):
print(i, flush=True)
time.sleep(1)
main.py (標準出力のリアルタイム取得)
標準出力のリアルタイム取得の確認のために以下のようなmain.pyを用意しました。ここでは標準エラー出力を標準出力として受け取れるようにstderr=subprocess.STDOUTにしていますが、stderrが不要な場合やstderrを分けて取得する場合等、ケースバイケースです。
import subprocess
def main(cmd):
proc = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
for line in iter(proc.stdout.readline, b""):
line = line.rstrip().decode()
print(line)
proc.wait()
if __name__ == "__main__":
cmd = "python sub.py"
main(cmd)
ここでforの中でタイムアウトにPopen.wait()が使えなくはないです。ただ使った時点でリアルタイム取得ができなくなるので、やるなら何かしらの条件を設けた上でwait()を使い、それ以降のリアルタイム取得は諦めて正常に終了するのを待つかTimeoutExpiredを待つかという感じになると思います。
補足
標準出力と標準エラー出力を完全に分けて取得したい場合(stdout=PIPE, stderr=PIPE)、状況次第ではパイプバッファが詰まりデッドロックが発生します。対処法等は以下が参考になるかもしれません。
https://stackoverflow.com/questions/18421757
https://akirayou.net/wp/2021/python-%E3%81%AEsubprocess/
デッドロックを回避しつつ標準出力と標準エラー出力の両方でリアルタイム性に拘るのであれば、個人的にはStack Overflowのアンサーの1つにもあるようにThreadPoolExecutorを使うのが楽だと思っています。
threading.Timerでタイマーの実装
上のmain.pyにthreading.Timerでタイマーを追加すると以下のような形になります。Timerに設定したkill_process()が指定の秒後に機能するようになっています。ここからは好きにkillすれば良いのですが、この記事ではpsutilを使います。
from threading import Timer
def main(cmd, timeout=3):
proc = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
def kill_process(pid):
print(pid)
# ここにkillの処理を書く
t = Timer(timeout, kill_process, [proc.pid])
t.start()
for line in iter(proc.stdout.readline, b""):
line = line.rstrip().decode()
print(line)
t.cancel()
proc.stdout.close()
proc.wait()
プロセスのkill
kill_process()は以下のようなコードにしてみました。
※kill_process()とkilled()は上のコードのmain()の中に書いています。
import psutil
def killed(proc):
print(f"process {proc} killed with exit code {proc.returncode}")
def kill_process(pid):
t.cancel()
try:
parent = psutil.Process(pid)
except psutil.NoSuchProcess as e:
print(e)
return
procs = parent.children(recursive=True)
procs.append(parent)
for p in procs:
try:
p.kill()
except Exception as e:
print(e)
gone, alive = psutil.wait_procs(procs, callback=killed)
まずタイマーをcancel()で終了しておきます。main()の最後の方でも書いているのでここでは書かなくても問題はないのですが、タイムアウトの時点で即タイマーを終了させておきたいという場合はこういう書き方になると思います。
subprocessのプロセスIDを psutil.Process() に渡してそのプロセスを取得します。プロセスが見つからなければ “process PID not found” というメッセージと共にpsutil.NoSuchProcessという例外が投げられるのでtry-exceptで囲んでおきます。
続けて取得したプロセスの子プロセスのリストを children() で取得します。recursive=Trueで全ての子孫プロセスが返されます。リストに親を追加し、子から順にkill()します。念のためkill()もtry-exceptで囲んでいます。
ドキュメントによるとpsutilのkill()はUNIXではos.kill(pid, signal.SIGKILL)を実行し、WindowsではTerminateProcessが使用されるようです。psutilのterminate()はUNIXではos.kill(pid, signal.SIGTERM)が実行されますが、Windowsではkill()のエイリアスのようなのでWindowsの場合はkill()でもterminate()でもどちらでも良さそうです。
psutil.wait_procs()にプロセスのリストを渡して生きているプロセスと終了済みプロセスのリストを取得できます。また、引数に関数を渡すことで終了済みの各プロセスに対してその関数が実行されます。上のコードではkilled()が終了済みの各プロセスに対して実行されます。wait_procs()には引数にtimeout値を渡すことも可能で、これはwait_procs()がリストを返すまでの最大時間の指定です。指定しないかtimeout値よりも早く全プロセスが終了した場合は即座にリストが返されます。wait_procs()によるリスト取得の使い道としては、UNIXの場合に最初はkill()ではなくterminate()してまだ生きているプロセスがあればkill()するといった使い方があります。ドキュメントのwait_procs()の項目でもそんな例が書かれています。
コード全体
以下、main.pyのコード全体です。
import psutil
import subprocess
from threading import Timer
def main(cmd, timeout=3):
proc = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
def killed(proc):
print(f"process {proc} killed with exit code {proc.returncode}")
def kill_process(pid):
t.cancel()
try:
parent = psutil.Process(pid)
except psutil.NoSuchProcess as e:
print(e)
return
procs = parent.children(recursive=True)
procs.append(parent)
for p in procs:
try:
p.kill()
except Exception as e:
print(e)
gone, alive = psutil.wait_procs(procs, callback=killed)
t = Timer(timeout, kill_process, [proc.pid])
t.start()
for line in iter(proc.stdout.readline, b""):
line = line.rstrip().decode()
print(line)
t.cancel()
proc.stdout.close()
proc.wait()
if __name__ == "__main__":
cmd = "python sub.py"
timeout = 3
main(cmd, timeout)
psutil.Popen()を使うと若干楽になるかもしれません。以下のようにsubprocessには無いpsutilの関数が使用可能で、subprocessと共通しているsend_signal()、terminate()、kill()はpsutilsの実装が使用されるようです。
proc = psutil.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
procs = proc.parent().children(recursive=True)
雑感
これが標準出力のリアルタイム取得時のタイムアウトのベストな方法かは分かりませんが、とりあえず実装はできました。ちなみに、この記事は以下が元ネタです。同じことで悩んでいたのと、この方が採用した os.killpg(os.getpgid(p.pid), signal.SIGTERM) というkillの方法がUNIX限定のやり方なので私はpsutilを採用してみました。
subprocess popen real time output with timeout - Stack Overflow
ディスカッション
コメント一覧
まだ、コメントがありません