Source code for scine_chemoton.gears.scheduler

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""

# Standard library imports
import time
from json import dumps
from typing import Dict

# Third party imports
import scine_database as db
from scine_database.queries import stop_on_timeout

# Local application imports
from . import Gear


[docs]class Scheduler(Gear): """ This Gear is a basic scheduler for all calculations stored in the database. By default, all Calculations generated by other Gears will be set to 'hold' upon creation. The scheduler will then, based on the given options, add them to the queue of Calculations run by the backend (Puffins). Attributes ---------- options : Scheduler.Options The options for the Scheduler Gear. Notes ----- The logic counts the number of active Calculations per Order and compares with the allowed number of active Calculations. The gap between these numbers will be closed by activating Calculations that are currently on hold. The order of the execution of the jobs as well as their activation does not necessarily follow the chronological order of their generation. """
[docs] class Options(Gear.Options): """ The options for the Scheduler Gear. """ __slots__ = ("job_counts", "job_priorities") def __init__(self) -> None: super().__init__() self.job_counts: Dict[str, int] = { "scine_geometry_optimization": 5000, "scine_ts_optimization": 5000, "scine_single_point": 5000, "scine_bond_orders": 5000, "graph": 10000, "scine_hessian": 1000, "scine_react_complex_afir": 1000, "scine_react_complex_nt": 1000, "scine_react_complex_nt2": 1000, "scine_react_ts_guess": 1000, "scine_step_refinement": 1000, "scine_dissociation_cut": 1000, "conformers": 200, "final_conformer_deduplication": 200, "kinetx_kinetic_modeling": 1, "rms_kinetic_modeling": 1, "scine_bspline_optimization": 5000, "scine_reoptimization": 5000 } """ Dict[str, int] The number of Calculations to be set to run at any given time. Counts are given per order type (i.e. 'scine_hessian'). """ self.job_priorities: Dict[str, int] = { "scine_geometry_optimization": 2, "scine_ts_optimization": 2, "scine_single_point": 5, "scine_bond_orders": 2, "graph": 2, "scine_hessian": 2, "scine_react_complex_afir": 5, "scine_react_complex_nt": 5, "scine_react_complex_nt2": 5, "scine_react_ts_guess": 5, "scine_step_refinement": 5, "scine_dissociation_cut": 5, "conformers": 2, "final_conformer_deduplication": 2, "kinetx_kinetic_modeling": 1, "rms_kinetic_modeling": 1, "scine_bspline_optimization": 5, "scine_reoptimization": 5 } """ Dict[str, int] The priority number of Calculations, which determines which are executed first. Priorities are given per order type (i.e. 'scine_hessian'). A lower number corresponds to earlier execution. The possible range of numbers is 1 to 10. """
options: Options def __init__(self) -> None: super().__init__() self._required_collections = ["calculations"] self._model_is_required = False def _loop_impl(self): self._options_sanity_check() selection = {"status": "hold"} if self._calculations.get_one_calculation(dumps(selection)) is None: return # Check how many calculations are scheduled to be run for each job type for order in self.options.job_counts: selection = { "$and": [ {"status": "new"}, {"job.order": order}, ] } hits = 0 # Skip if no jobs of this order are allowed to run if hits >= self.options.job_counts[order]: continue # Skip if enough jobs are active hits = self._calculations.count(dumps(selection)) if hits >= self.options.job_counts[order]: continue # If all new jobs were counted and there are free slots as defined by # the job_count start new jobs (set them from 'hold' to 'new') selection = {"$and": [{"status": "hold"}, {"job.order": order}]} diff = self.options.job_counts[order] - hits for calculation in stop_on_timeout(self._calculations.iterate_calculations(dumps(selection))): if self.stop_at_next_break_point: return calculation.link(self._calculations) # Sleep a bit in order not to make the DB choke time.sleep(0.001) calculation.set_status(db.Status.NEW) calculation.set_priority(self.options.job_priorities[order]) diff -= 1 if diff <= 0: break if self.stop_at_next_break_point: return def _options_sanity_check(self): for k in self.options.job_counts: if k not in self.options.job_priorities: raise ValueError(f"Missing key '{k}' in 'job_priorities'.")