Showing posts with label timeout. Show all posts
Showing posts with label timeout. Show all posts

Tuesday, 29 May 2018

Python subprocess stdout and stderr

In our machine learning pipeline we have number of python subprocesses being started from main python process. Originally subprocess were started using using this simple procedure

import subprocess

def check_call(commands, shell=False):
    process = subprocess.Popen(commands, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    if proc.returncode != 0:
        raise IOError(
                'Call failed, cmd=%s, return_code=%s, stderr=%s, stdout=%s' % (commands, process.returncode, stderr, stdout))
    return stdout, stderr

Drawback of this solution is that process.communicate() is blocking and subprocess standard output and error output is being accumulated into variable and available only after execution ends or fails. In our case that could be tens of minutes of even few hours. Issue was bit overcame by logging into file in subprocess, which gives some sort of real time feedback about it's proceeding. Also when subprocess hangs it will block main process forever and manual action might be necessary to unblock pipeline.

I've been looking form better solution which would make standard output lines available as soon as it is produced by subprocess. While it is old problem, I've found only prototypes and no ready to use code. So here comes my take on it based on unbutu's stackoverflow response with another one from Vadim Fint, merged together and pimped up a bit. Code is for Python 2.7 and it will probably work in Python 3 as well, but then you probably should go with proper Python 3 solution

Asynchronous version is lightweight as it does not need to spawn threads to read outputs and perform timeout checks. Using only standard python library, works for us very well.


import sys
import subprocess
import fcntl
import select
import os
import datetime

def execute(commands, sysout_handlers=[sys.stdout.write], syserr_handlers=[sys.stderr.write], shell=False, cwd=None, timeout_sec=None):
    """Executes commands using subprocess.Popen and asynchronous select and fcntl to handle subprocess standard output and standard error stream
    
    Examples:
        execute(['python2.7', 'random_print.py'])
        execute(['python2.7', 'random_print.py'], outlist.append, errlist.append)
        execute(['python2.7', 'random_print.py'], timeout_sec = 3)
        execute(['ls', '-l'], cwd = '/tmp')
    
    :param commands: list of strings passed to subprocess.Popen()
    :param sysout_handlers: list of handlers for standard output stream 
    :param syserr_handlers: list of handlers for standard error stream 
    :param shell: boolean, default false, default False, passed to subprocess.Popen()
    :param cwd: string, default None, passed to subprocess.Popen()
    :param timeout_sec: int, default None, number of seconds when terminate process. Cannot be less then 1 second
    :return: process returncode or None when timeout is reached. Allways check this value
    """
    
    def make_async(fd):
        # https://stackoverflow.com/a/7730201/190597
        '''add the O_NONBLOCK flag to a file descriptor'''
        fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
    
    def read_async(fd):
        # https://stackoverflow.com/a/7730201/190597
        '''read some data from a file descriptor, ignoring EAGAIN errors'''
        try:
            return fd.read()
        except IOError, e:
            if e.errno != errno.EAGAIN:
                raise e
            else:
                return ''
    
    def handle_line(line, handlers):
        if isinstance(handlers, list):
            for handler in handlers:
                handler(line)
        else:
            handlers(line)

    def handle_output(fds, handler_map):
        for fd in fds:
            line = read_async(fd)
            handlers = handler_map[fd.fileno()]
            handle_line(line, handlers)

    process = subprocess.Popen(commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, shell=shell, cwd=cwd)
    make_async(process.stdout)
    make_async(process.stderr)
    handler_map = { process.stdout.fileno() : sysout_handlers, process.stderr.fileno() : syserr_handlers }
    
    started = datetime.datetime.now()
    while True:
        rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [], 1) # 1 second wait for any output
        handle_output(rlist, handler_map)
        if process.poll() is not None:
            handle_output([process.stdout, process.stderr], handler_map)
            break
        if timeout_sec is not None and (datetime.datetime.now() - started).total_seconds() > timeout_sec:
            sys.stderr.write('Terminating subprocess %s on timeout %d seconds\n' % (commands, timeout_sec))
            process.kill()
            break

    return process.returncode

With default handlers printing subprocess outputs, usage is pretty simple...


returncode = execute['python2.7', '/somewhere/random_print.py'])
print(returncode)

In case you really want to accumulate outputs as you have some plans with it, like parsing it or passing as an input into some other process.


stdoutlist = []
stderrlist = []
returncode = execute(['python2.7', '/somewhere/random_print.py'],[stdoutlist.append],[stderrlist.append])
print(returncode)
print(stdoutlist)
print(stderrlist)

Or you can go ham and use multiple handlers for output lines


import StringIO
stdoutio = StringIO.StringIO()
stderrio = StringIO.StringIO()
stdout_lines = []
stderr_lines = []
stdout_handlers = [sys.stdout.write, stdout_lines.append, stdoutio.write]
stderr_handlers = [sys.stderr.write, stderr_lines.append, stderrio.write]
returncode = execute(['python2.7', '/somewhere/random_print.py'], stdout_handlers, stderr_handlers)
print(returncode)
print(stdout_lines)
print(stdoutio.getvalue())
print(stderr_lines)
print(stderrio.getvalue())

To be complete, here goes my random_print.py, I've been testing with


import sys
import time
import random

for i in range(50):
    if random.choice([True, False]):
        sys.stdout.write('random out-'+ str(i)+'\n')
        sys.stdout.flush()
    else:
        sys.stderr.write('random err-'+ str(i)+'\n')
        sys.stderr.flush()
    time.sleep(0.1)
    
raise Exception('You shall not pass')

Monday, 4 March 2013

ConnectTimeoutTest

From time to time I need to simulate one special type of timeout and that is connect timeout.

It differs from more common read timeout, which may happen when connection is already opened, request is sent, but it takes too long to get response. Connection refused is also different beast, meaning that server actively refused your request, probably while you have been accessing wrong host or port. You should not ignore this connect timeout just because it is less common. When it hits, it will get you down you down as quickly as read timeout.

Connect timeout can occur during connection opening process. That is after connection attempt was not refused and before any possible read timeout.

Fairly rare occasions lead to this type of timeout. It may be firewall misconfiguration silently throwing away your packets, network failure or server can be dying down being unable to even to open socket correctly. Any of those conditions is quite hard to meet so when you need to write some piece of software anticipating this type of timeout, some deterministic way to induce it on demand will come handy.

Basic idea is to create ServerSocket with one position long backlog. Backlog is basically request queue and if we make artificial request to fill it up, any consequent request is doomed.

Update: Backlog queue implementation is differs between platforms. This does work on Mac OS X and does NOT on Linux or Windows. Damn it!

Test method uses standard java HttpURLConnection to make the request and it's setConnectionTimeout to set connection timeout to some rasonable value.

Here comes the complete test case:

Oddly enough, different exception (java.net.ConnectException: Operation timed out) is thrown after 75 seconds in case that connection timeout is NOT set, comparing to case when it IS set (java.net.SocketTimeoutException: connect timed out).

Bonus code! Setup code for Apache Httpclient might look like:

HttpConnectionParamBean connectionBean = new HttpConnectionParamBean(httpParams);
connectionBean.setConnectionTimeout(getConnectTimeout());//httpParams.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 1000L);
connectionBean.setSoTimeout(getReadTimeout());//httpParams.setParameter(CoreConnectionPNames.SO_TIMEOUT, 5000L);
DefaultHttpClient httpClient = new DefaultHttpClient(httpParams);

Enjoy your timeouts!