Source code for scine_chemoton.gears

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""

# Standard library imports
from abc import ABC, abstractmethod
from collections import UserDict
from ctypes import c_int, c_bool
from enum import Enum
from multiprocessing import Value
from setproctitle import setproctitle
from typing import List
import time

# Third party imports
import scine_database as db

from scine_chemoton.utilities import connect_to_db
from scine_chemoton.utilities.options import BaseOptions
from scine_chemoton.utilities.place_holder_model import (
    ModelNotSetError,
    construct_place_holder_model,
    PlaceHolderModelType
)


[docs]class HoldsCollections: _required_collections: List[str] _manager: db.Manager _calculations: db.Collection _compounds: db.Collection _elementary_steps: db.Collection _flasks: db.Collection _properties: db.Collection _reactions: db.Collection _structures: db.Collection def __init__(self) -> None: super().__init__() self._required_collections = [] self._manager = None # type: ignore self._calculations = None # type: ignore self._compounds = None # type: ignore self._elementary_steps = None # type: ignore self._flasks = None # type: ignore self._properties = None # type: ignore self._reactions = None # type: ignore self._structures = None # type: ignore
[docs] @staticmethod def possible_attributes() -> List[str]: return [ "manager", "calculations", "compounds", "elementary_steps", "flasks", "properties", "reactions", "structures", ]
[docs] def initialize_collections(self, manager: db.Manager) -> None: for attr in self._required_collections: if attr not in self.possible_attributes(): raise NotImplementedError(f"The initialization of member {attr} for class {self.__class__.__name__}, " f"is not possible, we are only supporting {self.possible_attributes()}.") for attr in self._required_collections: if attr == "manager": setattr(self, f"_{attr}", manager) else: setattr(self, f"_{attr}", manager.get_collection(attr))
[docs] def unset_collections(self) -> None: if hasattr(self, "_parent"): setattr(self, "_parent", None) for attr in self.possible_attributes(): setattr(self, f"_{attr}", None) self._unset_collections_of_attributes(self)
def _unset_collections_of_attributes(self, inst): if isinstance(inst, dict) or isinstance(inst, UserDict): items = inst.items() elif hasattr(inst, '__dict__'): items = inst.__dict__.items() elif hasattr(inst, '__slots__'): slots = inst.__slots__ if isinstance(slots, str): slots = [slots] items = ((s, getattr(inst, s)) for s in slots) else: return inst for key, attr in list(items): if isinstance(attr, HoldsCollections): attr.unset_collections() elif isinstance(attr, db.Collection) or isinstance(attr, db.Manager): if isinstance(inst, dict) or isinstance(inst, UserDict): inst[key] = None else: setattr(inst, key, None) continue if isinstance(attr, Enum): continue if hasattr(attr, '__dict__') or hasattr(attr, "__slots__") \ or isinstance(attr, dict) or isinstance(attr, UserDict): attr = self._unset_collections_of_attributes(attr) if isinstance(attr, dict) or isinstance(attr, UserDict): if isinstance(inst, dict) or isinstance(inst, UserDict): inst[key] = attr else: setattr(inst, key, attr) continue if hasattr(attr, '__iter__') and hasattr(attr, "__setitem__") and not isinstance(attr, str): for i, a in list(enumerate(attr)): a = self._unset_collections_of_attributes(a) attr[i] = a if isinstance(inst, dict) or isinstance(inst, UserDict): inst[key] = attr else: setattr(inst, key, attr) return inst
[docs]class HasName: def __init__(self) -> None: super().__init__() # necessary for multiple inheritance self._name = 'Chemoton' + self.__class__.__name__ @property def name(self) -> str: return self._name @name.setter def name(self, n: str): self._name = n def _give_current_process_own_name(self) -> None: setproctitle(self.name) def _remove_chemoton_from_name(self) -> None: self._name = self._name.replace("Chemoton", "") def _join_names(self, objects: list) -> None: self._name += "(" + ", ".join(f.name for f in objects) + ")" def __str__(self): return self.name
[docs]class Gear(ABC, HoldsCollections, HasName): """ The base class for all Gears. A Gear in Chemoton is a continuous loop that alters, analyzes or interacts in any other way with a reaction network stored in a SCINE Database. Each Gear has to be attached to an Engine(:class:`scine_chemoton.engine.Engine`) and will then help in driving the exploration of chemical reaction networks forward. Extending the features of Chemoton can be done by adding new Gears or altering existing ones. """
[docs] class Options(BaseOptions): __slots__ = ("model", "cycle_time") def __init__(self) -> None: super().__init__() self.model: db.Model = construct_place_holder_model() """ db.Model The model the Gear is working with. """ self.cycle_time: int = 10 """ int The minimum number of seconds between two cycles of the Gear. Cycles are finished independently of this option, hence if a cycle takes longer than the cycle_time will effectively lead to longer cycle times and not cause multiple cycles of the same Gear. """
def __init__(self) -> None: super().__init__() self.options = self.Options() self.name = 'Chemoton' + self.__class__.__name__ + 'Gear' self._stop_at_next_break_point = Value(c_bool, False) self._model_is_required = True @property def stop_at_next_break_point(self) -> bool: if self._stop_at_next_break_point is None: self._stop_at_next_break_point = Value(c_bool, False) return self._stop_at_next_break_point.value # type: ignore @stop_at_next_break_point.setter def stop_at_next_break_point(self, stop: bool) -> None: if self._stop_at_next_break_point is None: self._stop_at_next_break_point = Value(c_bool, stop) else: self._stop_at_next_break_point.value = stop # type: ignore def __eq__(self, other): if not isinstance(other, Gear): return False return self.stop_at_next_break_point == other.stop_at_next_break_point \ and self.options == other.options
[docs] def __call__(self, credentials: db.Credentials, loop_count: c_int, single: bool = False): """ Starts the main loop of the Gear, then acting on the database referenced by the given credentials. Parameters ---------- credentials : db.Credentials (Scine::Database::Credentials) The credentials to a database storing a reaction network. loop_count : c_int A shared memory integer that allows to communicate the number of loops across processes. single : bool If true, runs only a single iteration of the actual loop. Default: false, meaning endless repetition of the loop. """ self.stop_at_next_break_point = False self._give_current_process_own_name() # Make sure cycle time exists sleep_time = self.options.cycle_time if self._model_is_required and isinstance(self.options.model, PlaceHolderModelType): raise ModelNotSetError(f"The model option has not been set for {self.name}, " f"please specify a model before running the gear.") # Prepare database connection and members _initialize_a_gear_to_a_db(self, credentials) # Infinite loop with sleep last_cycle = time.time() # Instant first loop self._loop_impl() loop_count.value += 1 # Stop if only a single loop was requested if single: return while True: if self.stop_at_next_break_point: return # Wait if needed now = time.time() if now - last_cycle < sleep_time: time.sleep(sleep_time - now + last_cycle) last_cycle = time.time() self._loop_impl() loop_count.value += 1
@abstractmethod def _loop_impl(self): # Main loop to be implemented by all derived Gears. pass def _propagate_db_manager(self, manager: db.Manager): pass
[docs] def stop(self) -> None: self.stop_at_next_break_point = True
def _initialize_a_gear_to_a_db(gear: Gear, credentials: db.Credentials) -> None: if gear._manager is None or gear._manager.get_credentials() != credentials: gear._manager = connect_to_db(credentials) # Get required collections gear.initialize_collections(gear._manager) # always propagate in case a member has been changed gear._propagate_db_manager(gear._manager)