#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Laboratory of Physical Chemistry, Reiher Group.
See LICENSE.txt for details.
"""
# Standard library imports
import time
from json import dumps
from typing import Dict
# Third party imports
import scine_database as db
# Local application imports
from . import Gear
from ..utilities.queries import stop_on_timeout
[docs]class Scheduler(Gear):
"""
This Gear is a basic scheduler for all calculations stored in the database.
By default, all Calculations generated by other Gears will be set to 'hold'
upon creation.
The scheduler will then, based on the given options, add them to the queue
of Calculations run by the backend (Puffins).
Attributes
----------
options :: Scheduler.Options
The options for the Scheduler Gear.
Notes
-----
The logic counts the number of active Calculations per Order and compares
with the allowed number of active Calculations.
The gap between these numbers will be closed by activating Calculations
that are currently on hold.
The order of the execution of the jobs as well as their activation does not
necessarily follow the chronological order of their generation.
"""
[docs] class Options:
"""
The options for the Scheduler Gear.
"""
__slots__ = ("cycle_time", "job_counts", "job_priorities")
def __init__(self):
self.cycle_time = 10
"""
int
The minimum number of seconds between two cycles of the Gear.
Cycles are finished independent of this option, thus if a cycle
takes longer than the cycle_time will effectively lead to longer
cycle times and not cause multiple cycles of the same Gear.
"""
self.job_counts: Dict[str, int] = {
"scine_geometry_optimization": 50,
"scine_bond_orders": 50,
"graph": 100,
"scine_hessian": 10,
"scine_react_complex_afir": 10,
"scine_react_complex_nt": 10,
"scine_react_complex_nt2": 10,
"scine_dissociation_cut": 10,
"conformers": 2,
"final_conformer_deduplication": 2,
}
"""
Dict[str, int]
The number of Calculations to be set to run at any given time.
Counts are given per order type (i.e. 'scine_hessian').
Defaults are:
scine_geometry_optimization: 50,
scine_bond_orders: 50,
graph: 100,
scine_hessian: 10,
scine_react_complex: 10,
conformers: 2
"""
self.job_priorities: Dict[str, int] = {
"scine_geometry_optimization": 2,
"scine_ts_optimization": 2,
"scine_bond_orders": 2,
"graph": 2,
"scine_hessian": 2,
"scine_react_complex_afir": 5,
"scine_react_complex_nt": 5,
"scine_react_complex_nt2": 5,
"scine_dissociation_cut": 5,
"conformers": 2,
"final_conformer_deduplication": 2,
"scine_step_refinement": 5,
"scine_single_point": 5
}
"""
Dict[str, int]
The priority number of Calculations, which determines which are executed first.
Priorities are given per order type (i.e. 'scine_hessian').
A lower number corresponds to earlier execution.
The possible range of numbers is 1 to 10.
Defaults are:
scine_geometry_optimization: 2,
scine_bond_orders: 2,
graph: 2,
scine_hessian: 2,
scine_react_complex: 5,
conformers: 2
"""
def __init__(self):
super().__init__()
self.options = self.Options()
self._required_collections = ["calculations"]
def _loop_impl(self):
# Check how many calculations are scheduled to be run for each job type
for order in self.options.job_counts:
selection = {
"$and": [
{"status": "new"},
{"job.order": order},
]
}
hits = 0
if hits >= self.options.job_counts[order]:
continue
# Count new jobs for the given order
for calculation in self._calculations.iterate_calculations(dumps(selection)):
hits += 1
if hits >= self.options.job_counts[order]:
break
else:
# If all new jobs were counted and there are free slots as defined by
# the job_count start new jobs (set them from 'hold' to 'new')
selection = {"$and": [{"status": "hold"}, {"job.order": order}]}
diff = self.options.job_counts[order] - hits
for calculation in stop_on_timeout(self._calculations.iterate_calculations(dumps(selection))):
calculation.link(self._calculations)
# Sleep a bit in order not to make the DB choke
time.sleep(0.001)
calculation.set_status(db.Status.NEW)
calculation.set_priority(self.options.job_priorities[order])
diff -= 1
if diff <= 0:
break