English 中文(简体)
我该如何在Python中异步运行外部命令?
原标题:
  • 时间:2009-03-11 22:01:42
  •  标签:

我需要从Python脚本异步运行一个shell命令。这意味着当外部命令启动并执行它需要完成的任务时,我希望Python脚本继续运行。

我看了这篇帖子:

在Python中调用外部命令

然后,我进行了一些测试,看起来 os.system()将完成工作,只要我在命令的结尾使用&,这样我就不必等待它返回。我想知道的是这是否是完成此事的正确方式?我尝试了commands.call(),但它不起作用,因为它会在外部命令上阻塞。

请告诉我这是否适合使用os.system(),或者我应该尝试其他方法。

最佳回答

subprocess.Popen 正好做你想要的事情。

from subprocess import Popen
p = Popen([ watch ,  ls ]) # something long running
# ... do other stuff while subprocess is running
p.terminate()

请提供原文以便翻译。

Popen实例可以执行各种其他操作,例如您可以poll() 查看它是否仍在运行,还可以communicate()与它通信,将数据发送到stdin中,然后等待其终止。

问题回答

如果您想并行运行许多过程,然后在它们产生结果时处理它们,可以使用类似以下的轮询:

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen([ /usr/bin/my_cmd ,  -i %s  % path], stdout=PIPE, stderr=PIPE)
    for path in  /tmp/file0 /tmp/file1 /tmp/file2 .split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
            running_procs.remove(proc)
            break
        else: # No process is done, wait a bit and check again.
            time.sleep(.1)
            continue

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""
    handle_results(proc.stdout)

控制流有点复杂,因为我试图让它更小-你可以根据自己的口味进行重构。 :-)

这样做的好处是优先处理早期完成的请求。如果您在第一个正在运行的进程上调用communicate并且发现它运行时间最长,则在您处理其结果时,其他正在运行的进程将一直闲置。

这在Python 3子进程示例中被涵盖,标题为“异步等待命令终止”。使用IPythonpython -m asyncio运行此代码: Python 3子进程示例

import asyncio

proc = await asyncio.create_subprocess_exec(
    ls , -lha ,
   stdout=asyncio.subprocess.PIPE,
   stderr=asyncio.subprocess.PIPE)

# do something else while ls is working

# if proc takes very long to complete, the CPUs are free to use cycles for 
# other processes
stdout, stderr = await proc.communicate()

一旦 await asyncio.create_subprocess_exec(...) 完成,进程就会开始运行。如果在调用 await proc.communicate() 时它还没有完成,它将在那里等待以便让你获取输出状态。如果它已经完成,proc.communicate() 将立即返回。

这里的要点与Terrels的回答类似,但我认为Terrels的回答似乎过于复杂化了事情。

请参见asyncio.create_subprocess_exec了解更多信息。

编辑

您可以使用asyncio.run()函数运行上面的代码,就像运行其他Python代码一样,无需传递-m asyncio

import asyncio

def main():
    proc = await asyncio.create_subprocess_exec(
        ls , -lha ,
       stdout=asyncio.subprocess.PIPE,
       stderr=asyncio.subprocess.PIPE)

    # do something else while ls is working

    # if proc takes very long to complete, the CPUs are free to use   cycles for 
    # other processes
    stdout, stderr = await proc.communicate()
 
asyncio.run(main())

请参阅asyncio了解更多信息。

我想知道的是,这[os.system()]是否是完成这件事情的正确方式?

不是的。 os.system() 不是正确的方法。这就是为什么每个人都建议使用 subprocess 的原因。

获取更多信息,请阅读http://docs.python.org/library/os.html#os.system

The subprocess module provides more powerful facilities for spawning new processes and retrieving their results; using that module is preferable to using this function. Use the subprocess module. Check especially the Replacing Older Functions with the subprocess Module section.

被接受的答案非常古老。

我在这里找到了一个更好的现代答案:

使用asyncio在Python中流式处理子进程的标准输入和输出https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

并做出了一些更改:

  1. make it work on windows
  2. make it work with multiple commands
import sys
import asyncio

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())


async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

        await asyncio.wait(
            [
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
            ]
        )
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e


def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    """
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    loop.close()
    return rc


def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr


def name_it(start=0, template="s{}"):
    """a simple generator for task names
    """
    while True:
        yield template.format(start)
        start += 1


def runners(cmds):
    """
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    """
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)


if __name__ == "__main__":
    cmds = (
        [
            "sh",
            "-c",
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
        ],
        [
            "bash",
            "-c",
            "echo  hello, Dave.  && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        ],
        [sys.executable, "-c",  print("hello from python");import sys;sys.exit(2) ],
    )

    print(execute(*runners(cmds)))

很可能示例命令在你的系统上不会完美地工作,它也无法处理奇怪的错误,但这段代码展示了使用asyncio运行多个子进程并流式传输输出的一种方法。

我在使用asyncproc模块时取得了很好的成功,它能很好地处理进程输出。例如:

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

使用非阻塞 readline 和 pexpect 一起使用是另一种实现此操作的方法。Pexpect 解决了死锁问题,允许您轻松在后台运行进程,并提供了在进程输出预定义字符串时回调的简单方法,并且通常使与进程的交互更加容易。

考虑到“我不必等待它返回”,最简单的解决方案之一将是这样:

subprocess.Popen( 
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,
).pid

但是,根据我所读的,这不是“完成此任务的正确方式”,因为subprocess.CREATE_NEW_CONSOLE标志会创建安全风险。

这里发生的关键事情是使用 subprocess.CREATE_NEW_CONSOLE 创建新的控制台,以及使用 .pid(返回进程 ID,以便稍后检查程序是否已完成工作),不必等待程序完成其工作。

我在尝试使用Python的s3270脚本软件连接到3270终端时遇到了同样的问题。现在我正在使用我在这里找到的进程子类来解决这个问题:

将此翻译为中文:http://code.activestate.com/recipes/440554/ http://code.activestate.com/recipes/440554/

这是从文件中取出的样本:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r =   
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return   .join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ ==  __main__ :
    if sys.platform ==  win32 :
        shell, commands, tail = ( cmd , ( dir /w ,  echo HELLO WORLD ),  
 )
    else:
        shell, commands, tail = ( sh , ( ls ,  echo HELLO WORLD ),  
 )

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a,  exit  + tail)
    print recv_some(a, e=0)
    a.wait()

这里有几个答案,但都没有满足我的要求:

  1. 我不想等待命令完成或使用子进程输出污染我的终端。

  2. 我想使用重定向运行BASH脚本。

  3. 我想在我的bash脚本中支持管道(例如find ... | tar ...)。

满足以上要求的唯一组合是:

subprocess.Popen([ ./my_script.sh "arg1" > "redirect/path/to" ],
                 stdout=subprocess.PIPE, 
                 stderr=subprocess.PIPE,
                 shell=True)




相关问题
热门标签