Source code for scine_chemoton.gears.scheduler

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Laboratory of Physical Chemistry, Reiher Group.
See LICENSE.txt for details.
"""

# Standard library imports
import time
from json import dumps
from typing import Dict

# Third party imports
import scine_database as db

# Local application imports
from . import Gear
from ..utilities.queries import stop_on_timeout


[docs]class Scheduler(Gear): """ This Gear is a basic scheduler for all calculations stored in the database. By default, all Calculations generated by other Gears will be set to 'hold' upon creation. The scheduler will then, based on the given options, add them to the queue of Calculations run by the backend (Puffins). Attributes ---------- options :: Scheduler.Options The options for the Scheduler Gear. Notes ----- The logic counts the number of active Calculations per Order and compares with the allowed number of active Calculations. The gap between these numbers will be closed by activating Calculations that are currently on hold. The order of the execution of the jobs as well as their activation does not necessarily follow the chronological order of their generation. """
[docs] class Options: """ The options for the Scheduler Gear. """ __slots__ = ("cycle_time", "job_counts", "job_priorities") def __init__(self): self.cycle_time = 10 """ int The minimum number of seconds between two cycles of the Gear. Cycles are finished independent of this option, thus if a cycle takes longer than the cycle_time will effectively lead to longer cycle times and not cause multiple cycles of the same Gear. """ self.job_counts: Dict[str, int] = { "scine_geometry_optimization": 50, "scine_bond_orders": 50, "graph": 100, "scine_hessian": 10, "scine_react_complex_afir": 10, "scine_react_complex_nt": 10, "scine_react_complex_nt2": 10, "scine_dissociation_cut": 10, "conformers": 2, "final_conformer_deduplication": 2, } """ Dict[str, int] The number of Calculations to be set to run at any given time. Counts are given per order type (i.e. 'scine_hessian'). Defaults are: scine_geometry_optimization: 50, scine_bond_orders: 50, graph: 100, scine_hessian: 10, scine_react_complex: 10, conformers: 2 """ self.job_priorities: Dict[str, int] = { "scine_geometry_optimization": 2, "scine_ts_optimization": 2, "scine_bond_orders": 2, "graph": 2, "scine_hessian": 2, "scine_react_complex_afir": 5, "scine_react_complex_nt": 5, "scine_react_complex_nt2": 5, "scine_dissociation_cut": 5, "conformers": 2, "final_conformer_deduplication": 2, "scine_step_refinement": 5, "scine_single_point": 5 } """ Dict[str, int] The priority number of Calculations, which determines which are executed first. Priorities are given per order type (i.e. 'scine_hessian'). A lower number corresponds to earlier execution. The possible range of numbers is 1 to 10. Defaults are: scine_geometry_optimization: 2, scine_bond_orders: 2, graph: 2, scine_hessian: 2, scine_react_complex: 5, conformers: 2 """
def __init__(self): super().__init__() self.options = self.Options() self._required_collections = ["calculations"] def _loop_impl(self): # Check how many calculations are scheduled to be run for each job type for order in self.options.job_counts: selection = { "$and": [ {"status": "new"}, {"job.order": order}, ] } hits = 0 if hits >= self.options.job_counts[order]: continue # Count new jobs for the given order for calculation in self._calculations.iterate_calculations(dumps(selection)): hits += 1 if hits >= self.options.job_counts[order]: break else: # If all new jobs were counted and there are free slots as defined by # the job_count start new jobs (set them from 'hold' to 'new') selection = {"$and": [{"status": "hold"}, {"job.order": order}]} diff = self.options.job_counts[order] - hits for calculation in stop_on_timeout(self._calculations.iterate_calculations(dumps(selection))): calculation.link(self._calculations) # Sleep a bit in order not to make the DB choke time.sleep(0.001) calculation.set_status(db.Status.NEW) calculation.set_priority(self.options.job_priorities[order]) diff -= 1 if diff <= 0: break