Source code for spawn.tasks.simulation

# spawn
# Copyright (C) 2018-2019, Simmovation Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
"""luigi Tasks
"""
import traceback
from os import path
import logging
import luigi

from spawn.runners import ProcessRunner

from .base import SpawnTask


LOGGER = logging.getLogger(__name__)

[docs]class SimulationTask(SpawnTask): """Implementation of :class:`luigi.Task` """ _input_file_path = luigi.Parameter() _runner_type = luigi.Parameter() _exe_path = luigi.Parameter() _working_dir = luigi.Parameter(default=None)
[docs] def run(self): """Run this task """ if self._exe_path: self._create_runner().run()
[docs] def complete(self): """Determine if this task is complete :returns: ``True`` if this task is complete; otherwise ``False`` :rtype: bool """ if not self._exe_path: return True return self._create_runner().complete()
[docs] def on_failure(self, exception): """Interprets any exceptions raised by the run method. Attempts to find any logs associated with the runner. :returns: A string representation of the error. :rtype: str """ runner = self._create_runner() all_logs = [] error_logs = runner.error_logs() if error_logs: all_logs.append('Error logs:\n\n{}'.format(error_logs)) logs = runner.logs() if logs: all_logs.append('Logs:\n\n{}'.format(logs)) if all_logs: return '\n\n'.join(all_logs) error_string = traceback.format_exception( type(exception), exception, exception.__traceback__ ) return 'Unhandled exception running task:\n\n{}'.format(''.join(error_string))
@property def run_name_with_path(self): """Return the run name of this task """ return path.splitext(self._input_file_path)[0] def _create_runner(self): if self._runner_type not in self.available_runners: raise ValueError( 'could not find runner for runner_type {} and task type {}' .format(self._runner_type, type(self)) ) return self.available_runners[self._runner_type]( self._id, self._input_file_path, exe_path=self._exe_path, cwd=self._working_dir ) @property def available_runners(self): """Runners available for this task. Can be overridden by derived tasks """ return { 'process': ProcessRunner }