#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""
# Standard library imports
import time
from json import dumps
from typing import Dict
# Third party imports
import scine_database as db
from scine_database.queries import stop_on_timeout
# Local application imports
from . import Gear
[docs]class Scheduler(Gear):
"""
This Gear is a basic scheduler for all calculations stored in the database.
By default, all Calculations generated by other Gears will be set to 'hold'
upon creation.
The scheduler will then, based on the given options, add them to the queue
of Calculations run by the backend (Puffins).
Attributes
----------
options : Scheduler.Options
The options for the Scheduler Gear.
Notes
-----
The logic counts the number of active Calculations per Order and compares
with the allowed number of active Calculations.
The gap between these numbers will be closed by activating Calculations
that are currently on hold.
The order of the execution of the jobs as well as their activation does not
necessarily follow the chronological order of their generation.
"""
[docs] class Options(Gear.Options):
"""
The options for the Scheduler Gear.
"""
__slots__ = ("job_counts", "job_priorities")
def __init__(self) -> None:
super().__init__()
self.job_counts: Dict[str, int] = {
"scine_geometry_optimization": 5000,
"scine_ts_optimization": 5000,
"scine_single_point": 5000,
"scine_bond_orders": 5000,
"graph": 10000,
"scine_hessian": 1000,
"scine_react_complex_afir": 1000,
"scine_react_complex_nt": 1000,
"scine_react_complex_nt2": 1000,
"scine_react_ts_guess": 1000,
"scine_step_refinement": 1000,
"scine_dissociation_cut": 1000,
"conformers": 200,
"final_conformer_deduplication": 200,
"kinetx_kinetic_modeling": 1,
"rms_kinetic_modeling": 1,
"scine_bspline_optimization": 5000,
"scine_reoptimization": 5000
}
"""
Dict[str, int]
The number of Calculations to be set to run at any given time.
Counts are given per order type (i.e. 'scine_hessian').
"""
self.job_priorities: Dict[str, int] = {
"scine_geometry_optimization": 2,
"scine_ts_optimization": 2,
"scine_single_point": 5,
"scine_bond_orders": 2,
"graph": 2,
"scine_hessian": 2,
"scine_react_complex_afir": 5,
"scine_react_complex_nt": 5,
"scine_react_complex_nt2": 5,
"scine_react_ts_guess": 5,
"scine_step_refinement": 5,
"scine_dissociation_cut": 5,
"conformers": 2,
"final_conformer_deduplication": 2,
"kinetx_kinetic_modeling": 1,
"rms_kinetic_modeling": 1,
"scine_bspline_optimization": 5,
"scine_reoptimization": 5
}
"""
Dict[str, int]
The priority number of Calculations, which determines which are executed first.
Priorities are given per order type (i.e. 'scine_hessian').
A lower number corresponds to earlier execution.
The possible range of numbers is 1 to 10.
"""
options: Options
def __init__(self) -> None:
super().__init__()
self._required_collections = ["calculations"]
self._model_is_required = False
def _loop_impl(self):
self._options_sanity_check()
selection = {"status": "hold"}
if self._calculations.get_one_calculation(dumps(selection)) is None:
return
# Check how many calculations are scheduled to be run for each job type
for order in self.options.job_counts:
selection = {
"$and": [
{"status": "new"},
{"job.order": order},
]
}
hits = 0
# Skip if no jobs of this order are allowed to run
if hits >= self.options.job_counts[order]:
continue
# Skip if enough jobs are active
hits = self._calculations.count(dumps(selection))
if hits >= self.options.job_counts[order]:
continue
# If all new jobs were counted and there are free slots as defined by
# the job_count start new jobs (set them from 'hold' to 'new')
selection = {"$and": [{"status": "hold"}, {"job.order": order}]}
diff = self.options.job_counts[order] - hits
for calculation in stop_on_timeout(self._calculations.iterate_calculations(dumps(selection))):
if self.stop_at_next_break_point:
return
calculation.link(self._calculations)
# Sleep a bit in order not to make the DB choke
time.sleep(0.001)
calculation.set_status(db.Status.NEW)
calculation.set_priority(self.options.job_priorities[order])
diff -= 1
if diff <= 0:
break
if self.stop_at_next_break_point:
return
def _options_sanity_check(self):
for k in self.options.job_counts:
if k not in self.options.job_priorities:
raise ValueError(f"Missing key '{k}' in 'job_priorities'.")