Source code for spawn.runners.process_runner
# spawn
# Copyright (C) 2018-2019, Simmovation Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""Implementation of the :class:`ProcessRunner`
"""
import subprocess
import logging
from os import path, getcwd
import json
from spawn.util.validation import validate_file
LOGGER = logging.getLogger(__name__)
SUCCESS = 'success'
FAILURE = 'failure'
[docs]class ProcessRunner:
"""Runner that uses the native os process (provided by :mod:`subprocess`) in order to run tasks
"""
def __init__(self, id_, input_file_path, exe_path, run_name=None, output_dir=None, cwd=None):
"""Initialises the :class:`ProcessRunner`
:param id_: The ID of the runner
:type id_: str
:param input_file_path: The path to the input file
:type input_file_path: path-like
:param exe_path: The path to the executable
:type exe_path: path-like
:param run_name: The name of the run. Defaults to the base name of the input file path.
:type run_name: str
:param output_dir: The output directory for the run. Defaults to the directory of the input file.
:type output_dir: path-like
:param cwd: The current working directory for the child process.
Defaults to the current working directory for the parent process.
:type cwd: path-like
"""
self._id = id_
self._input_file_path = input_file_path
self._exe_path = exe_path
self._run_name = run_name or path.splitext(path.basename(input_file_path))[0]
self._output_dir = output_dir or path.dirname(input_file_path)
self._cwd = cwd or getcwd()
[docs] def run(self):
"""Runs the process synchronously.
Runs the process synchronously and when complete writes the log files and a status file.
"""
validate_file(self._input_file_path, 'input_file_path')
validate_file(self._exe_path, 'exe_path')
LOGGER.info('Executing \'%s\': %s', self._id, self.process_args)
output = subprocess.run(args=self.process_args, cwd=self._cwd, check=False,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._write_logs(output)
state = self._output_to_state(output)
with open(self.state_file, 'w') as fp:
json.dump(state, fp)
if output.returncode != 0:
raise ChildProcessError('process exited with {}'.format(output.returncode))
[docs] def error_logs(self):
"""Error logs produced by the process, if any
:returns: The output written to stderr, if any, otherwise ``None``
:rtype: str
"""
error_file = self.output_file_base + '.err'
if path.isfile(error_file):
with open(error_file) as fp:
return fp.read()
return None
[docs] def logs(self):
"""Stdout logs produced by the process, if any
:returns: The output written to stdout, if any, otherwise ``None``
:rtype: str
"""
log_file = self.output_file_base + '.log'
if path.isfile(log_file):
with open(log_file) as fp:
return fp.read()
return None
[docs] def complete(self):
"""Determine if the run is complete.
:returns: ``True`` if the run is complete; ``False`` otherwise.
:rtype: bool
"""
if path.isfile(self.output_file_base + '.state.json'):
with open(self.state_file) as fp:
state = json.load(fp)
return state['result'] == SUCCESS
return False
@property
def process_args(self):
"""The arguments to be provided to the subprocess.
Can be overridden in derived classes.
:returns: An array containing the process arguments
:rtype: list
"""
return [self._exe_path, self._input_file_path]
@property
def output_file_base(self):
"""The base path of the output file (without extension)
:returns: The path to the output file, without extension.
:rtype: path-like
"""
return path.join(self._output_dir, self._run_name)
@property
def state_file(self):
"""The path to the state file
:returns: The path to the state file
:rtype: path-like
"""
return self.output_file_base + '.state.json'
def _write_logs(self, output):
with open(self.output_file_base + '.log', 'wb') as fp:
fp.write(output.stdout)
if output.stderr:
with open(self.output_file_base + '.err', 'wb') as fp:
fp.write(output.stderr)
elif output.returncode != 0:
with open(self.output_file_base + '.err', 'w') as fp:
fp.write(str(output.returncode))
@staticmethod
def _output_to_state(output):
return {
'result': SUCCESS if output.returncode == 0 else FAILURE,
'returncode': output.returncode
}