Tuesday 29 May 2018

Python subprocess stdout and stderr

In our machine learning pipeline we have number of python subprocesses being started from main python process. Originally subprocess were started using using this simple procedure

import subprocess

def check_call(commands, shell=False):
    process = subprocess.Popen(commands, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    if proc.returncode != 0:
        raise IOError(
                'Call failed, cmd=%s, return_code=%s, stderr=%s, stdout=%s' % (commands, process.returncode, stderr, stdout))
    return stdout, stderr

Drawback of this solution is that process.communicate() is blocking and subprocess standard output and error output is being accumulated into variable and available only after execution ends or fails. In our case that could be tens of minutes of even few hours. Issue was bit overcame by logging into file in subprocess, which gives some sort of real time feedback about it's proceeding. Also when subprocess hangs it will block main process forever and manual action might be necessary to unblock pipeline.

I've been looking form better solution which would make standard output lines available as soon as it is produced by subprocess. While it is old problem, I've found only prototypes and no ready to use code. So here comes my take on it based on unbutu's stackoverflow response with another one from Vadim Fint, merged together and pimped up a bit. Code is for Python 2.7 and it will probably work in Python 3 as well, but then you probably should go with proper Python 3 solution

Asynchronous version is lightweight as it does not need to spawn threads to read outputs and perform timeout checks. Using only standard python library, works for us very well.


import sys
import subprocess
import fcntl
import select
import os
import datetime

def execute(commands, sysout_handlers=[sys.stdout.write], syserr_handlers=[sys.stderr.write], shell=False, cwd=None, timeout_sec=None):
    """Executes commands using subprocess.Popen and asynchronous select and fcntl to handle subprocess standard output and standard error stream
    
    Examples:
        execute(['python2.7', 'random_print.py'])
        execute(['python2.7', 'random_print.py'], outlist.append, errlist.append)
        execute(['python2.7', 'random_print.py'], timeout_sec = 3)
        execute(['ls', '-l'], cwd = '/tmp')
    
    :param commands: list of strings passed to subprocess.Popen()
    :param sysout_handlers: list of handlers for standard output stream 
    :param syserr_handlers: list of handlers for standard error stream 
    :param shell: boolean, default false, default False, passed to subprocess.Popen()
    :param cwd: string, default None, passed to subprocess.Popen()
    :param timeout_sec: int, default None, number of seconds when terminate process. Cannot be less then 1 second
    :return: process returncode or None when timeout is reached. Allways check this value
    """
    
    def make_async(fd):
        # https://stackoverflow.com/a/7730201/190597
        '''add the O_NONBLOCK flag to a file descriptor'''
        fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
    
    def read_async(fd):
        # https://stackoverflow.com/a/7730201/190597
        '''read some data from a file descriptor, ignoring EAGAIN errors'''
        try:
            return fd.read()
        except IOError, e:
            if e.errno != errno.EAGAIN:
                raise e
            else:
                return ''
    
    def handle_line(line, handlers):
        if isinstance(handlers, list):
            for handler in handlers:
                handler(line)
        else:
            handlers(line)

    def handle_output(fds, handler_map):
        for fd in fds:
            line = read_async(fd)
            handlers = handler_map[fd.fileno()]
            handle_line(line, handlers)

    process = subprocess.Popen(commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, shell=shell, cwd=cwd)
    make_async(process.stdout)
    make_async(process.stderr)
    handler_map = { process.stdout.fileno() : sysout_handlers, process.stderr.fileno() : syserr_handlers }
    
    started = datetime.datetime.now()
    while True:
        rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [], 1) # 1 second wait for any output
        handle_output(rlist, handler_map)
        if process.poll() is not None:
            handle_output([process.stdout, process.stderr], handler_map)
            break
        if timeout_sec is not None and (datetime.datetime.now() - started).total_seconds() > timeout_sec:
            sys.stderr.write('Terminating subprocess %s on timeout %d seconds\n' % (commands, timeout_sec))
            process.kill()
            break

    return process.returncode

With default handlers printing subprocess outputs, usage is pretty simple...


returncode = execute['python2.7', '/somewhere/random_print.py'])
print(returncode)

In case you really want to accumulate outputs as you have some plans with it, like parsing it or passing as an input into some other process.


stdoutlist = []
stderrlist = []
returncode = execute(['python2.7', '/somewhere/random_print.py'],[stdoutlist.append],[stderrlist.append])
print(returncode)
print(stdoutlist)
print(stderrlist)

Or you can go ham and use multiple handlers for output lines


import StringIO
stdoutio = StringIO.StringIO()
stderrio = StringIO.StringIO()
stdout_lines = []
stderr_lines = []
stdout_handlers = [sys.stdout.write, stdout_lines.append, stdoutio.write]
stderr_handlers = [sys.stderr.write, stderr_lines.append, stderrio.write]
returncode = execute(['python2.7', '/somewhere/random_print.py'], stdout_handlers, stderr_handlers)
print(returncode)
print(stdout_lines)
print(stdoutio.getvalue())
print(stderr_lines)
print(stderrio.getvalue())

To be complete, here goes my random_print.py, I've been testing with


import sys
import time
import random

for i in range(50):
    if random.choice([True, False]):
        sys.stdout.write('random out-'+ str(i)+'\n')
        sys.stdout.flush()
    else:
        sys.stderr.write('random err-'+ str(i)+'\n')
        sys.stderr.flush()
    time.sleep(0.1)
    
raise Exception('You shall not pass')