Source code for spawn.schedulers.luigi

# spawn
# Copyright (C) 2018-2019, Simmovation Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
""":mod:`spawn` scheduler for luigi
"""
import logging

from luigi import build, worker, rpc, scheduler, execution_summary

from spawn.tasks.generate import generate_tasks_from_spec

LOGGER = logging.getLogger()

class _LuigiWorkerSchedulerFactory():
    @staticmethod
    def create_local_scheduler():
        """Creates a local scheduler

        :returns: The local scheduler
        :rtype: :class:`scheduler.Scheduler`
        """
        return scheduler.Scheduler(prune_on_get_work=True, record_task_history=False)

    @staticmethod
    def create_remote_scheduler(url):
        """Creates a remote scheduler

        :returns: The remote scheduler
        :rtype: :class:`rpc.RemoteScheduler`
        """
        return rpc.RemoteScheduler(url)

    @staticmethod
    #pylint: disable=redefined-outer-name
    def create_worker(scheduler, worker_processes, assistant=False):
        """Creates a worker

        :returns: The worker
        :rtype: :class:`worker.Worker`
        """
        return worker.Worker(
            scheduler=scheduler, worker_processes=worker_processes, assistant=assistant)

[docs]class LuigiScheduler: """Scheduler implementation for Luigi Because this is currently the only scheduler implementation it's probable that the interface will evolve in time. """ def __init__(self, config): """Initialise the :class:`LuigiScheduler` :param config: Configuration object :type config: :class:`ConfigurationBase` Config Values ============= workers The number of workers (int) outdir The output directory (path-like) local ``True`` if running locally; otherwise, ``False``. (bool) port The port on which the remote scheduler is running, if ``local`` is ``False``. (int) """ self._workers = config.get(config.default_category, 'workers') self._out_dir = config.get(config.default_category, 'outdir') self._local = config.get(config.default_category, 'local', parameter_type=bool) self._host = config.get('server', 'host') self._port = config.get('server', 'port', parameter_type=int) self._worker_scheduler_factory = _LuigiWorkerSchedulerFactory()
[docs] def run(self, spawner, spec): """Run the spec by generating tasks using the spawner :param spawner: The task spawner :type spawner: :class:`TaskSpawner` :param spec: The specification :type spec: :class:`SpecificationModel` """ tasks = generate_tasks_from_spec(spawner, spec.root_node, self._out_dir) success = build( tasks, worker_scheduler_factory=self._worker_scheduler_factory, local_scheduler=self._local, workers=self._workers, scheduler_port=self._port, scheduler_host=self._host ) if not success: LOGGER.error('Error running spawn tasks - see logs for details')
[docs] def add_worker(self): """Add a worker """ if self._local: scheduler_ = self._worker_scheduler_factory.create_local_scheduler() else: url = 'http://{host}:{port:d}/'.format(host=self._host, port=self._port) scheduler_ = self._worker_scheduler_factory.create_remote_scheduler(url) assistant_worker = self._worker_scheduler_factory.create_worker(scheduler_, self._workers, True) with assistant_worker: success = assistant_worker.run() LOGGER.info(execution_summary.summary(assistant_worker)) if not success: LOGGER.error('Error running spawn tasks - see logs for details')