Source code for scine_chemoton.gears.kinetics

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""

# Standard library imports
from abc import ABC, abstractmethod
from warnings import warn
import os
import datetime
from json import dumps
from typing import Optional, Dict, List, Tuple, Union

# Third party imports
import scine_database as db
from scine_database.energy_query_functions import get_barriers_for_elementary_step_by_type
from scine_database.queries import stop_on_timeout, model_query
from scine_database.concentration_query_functions import (
    query_concentration_with_model_object,
    query_concentration_with_object
)
from scine_database.compound_and_flask_creation import get_compound_or_flask

# Local application imports
from . import Gear
from .pathfinder import Pathfinder as pf
from scine_chemoton.filters.aggregate_filters import AggregateFilter
from scine_chemoton.filters.reaction_filters import ReactionFilter
from scine_chemoton.utilities.place_holder_model import (
    construct_place_holder_model,
    PlaceHolderModelType
)


[docs]class KineticsBase(Gear, ABC): """ Base class for kinetics gears. Attributes ---------- options : KineticsBase.Options The options for the Kinetics Gear. aggregate_filter : AggregateFilter An optional filter to limit the activated aggregates, by default none are filtered """
[docs] class Options(Gear.Options): """ The options for the KineticsBase Gear. """ __slots__ = ("restart", "stop_if_no_new_aggregates_are_activated") def __init__(self) -> None: super().__init__() self.restart = False """ bool Option to restart the filtering of the network, by disabling all aggregates again. Set this to ``True`` if you want to reevaluate a network with different settings. """ self.stop_if_no_new_aggregates_are_activated = False """ bool Option to stop the gear if no new aggregates are activated during a loop. This is useful if you want to run a gear for a certain number of iterations, but want to stop early if there is no more progress. """
options: Options def __init__(self) -> None: super().__init__() self.aggregate_filter = AggregateFilter() self._user_cache: Dict[str, Tuple[int, bool]] = {} self._filtered_cache: Dict[str, bool] = {} self._required_collections = ["compounds", "elementary_steps", "flasks", "properties", "reactions", "structures"] self._enabled_count: int = 0 self._model_is_required = False
[docs] def clear_cache(self) -> None: self._user_cache = {} self._filtered_cache = {}
def _propagate_db_manager(self, manager: db.Manager): self._sanity_check_configuration() self.aggregate_filter.initialize_collections(manager) def _sanity_check_configuration(self): if not isinstance(self.aggregate_filter, AggregateFilter): raise TypeError(f"Expected a AggregateFilter (or a class derived " f"from it) in {self.name}.aggregate_filter.") def _loop_impl(self): if self.options.restart: self._disable_all_aggregates() self.options.restart = False self._enabled_count = 0 # Loop over all deactivated aggregates self._aggregate_loop(self._compounds, db.CompoundOrFlask.COMPOUND) if self.stop_at_next_break_point: return self._aggregate_loop(self._flasks, db.CompoundOrFlask.FLASK) if self.stop_at_next_break_point: return self._check_count() def _check_count(self) -> None: """ If the option to stop after no new enabled aggregates is set, count the number of enabled aggregates in the database and if the count is identical to the current count, stop the loop. """ selection = {"exploration_disabled": False} new_count = self._compounds.count(dumps(selection)) + self._flasks.count(dumps(selection)) if self.options.stop_if_no_new_aggregates_are_activated and self._enabled_count == new_count: self.stop_at_next_break_point = True else: self._enabled_count = new_count def _aggregate_loop(self, collection: db.Collection, agg_type: db.CompoundOrFlask) -> None: selection = {"exploration_disabled": True} if agg_type == db.CompoundOrFlask.COMPOUND: iterator = collection.iterate_compounds(dumps(selection)) elif agg_type == db.CompoundOrFlask.FLASK: iterator = collection.iterate_flasks(dumps(selection)) else: raise RuntimeError(f"Unknown aggregate type {agg_type}") for aggregate in stop_on_timeout(iterator): aggregate.link(collection) if self.stop_at_next_break_point: return str_id = str(aggregate.id()) try: # check if already filtered out once if not self._filtered_cache[str_id]: continue except KeyError: # KeyError means filter has not been evaluated for this aggregate valid_aggregate = self.aggregate_filter.filter(aggregate) self._filtered_cache[str_id] = valid_aggregate if not valid_aggregate: continue if self._aggregate_was_inserted_by_user(aggregate): aggregate.enable_exploration() elif self._filter(aggregate, self._aggregate_accessible_by_reaction(aggregate)): aggregate.enable_exploration() def _disable_all_aggregates(self): for compound in self._compounds.iterate_all_compounds(): compound.link(self._compounds) compound.disable_exploration() for flask in self._flasks.iterate_all_flasks(): flask.link(self._flasks) flask.disable_exploration() def _aggregate_was_inserted_by_user(self, aggregate: Union[db.Compound, db.Flask]) -> bool: """ Any structure of aggregates has a user label Parameters ---------- aggregate : Union[scine_database.Compound, scine_database.Flask] The aggregate that may have been inserted by user Returns ------- bool aggregate was inserted """ str_agg_id = str(aggregate.id()) structures = aggregate.get_structures() n_structures = len(structures) cache_entry = self._user_cache.get(str_agg_id, None) if cache_entry is not None and cache_entry[0] == n_structures: return cache_entry[1] # todo replace with a database library function user_labels = [db.Label.USER_OPTIMIZED, db.Label.USER_GUESS, db.Label.USER_COMPLEX_OPTIMIZED] for s_id in structures: structure = db.Structure(s_id, self._structures) if structure.get_label() in user_labels: self._user_cache[str_agg_id] = n_structures, True return True self._user_cache[str_agg_id] = n_structures, False return False def _aggregate_accessible_by_reaction(self, aggregate: Union[db.Compound, db.Flask]) \ -> List[Tuple[db.Reaction, db.Side]]: """ Aggregate is on a side of an explorable reaction with all compounds on the other side being explorable Parameters ---------- aggregate : Union[scine_database.Compound, scine_database.Flask] The aggregate to check to be accessible Returns ------- List[Tuple[db.Reaction, db.Side]] List of all viable reactions giving access and from the side they give access """ agg_id = aggregate.id() selection = {"$and": [ {"exploration_disabled": {"$ne": True}}, {"$or": [ {"lhs": {"$elemMatch": {"id": {"$oid": str(agg_id)}}}}, {"rhs": {"$elemMatch": {"id": {"$oid": str(agg_id)}}}}, ]}, ]} hits: List[Tuple[db.Reaction, db.Side]] = [] for hit in self._reactions.iterate_reactions(dumps(selection)): hit.link(self._reactions) lhs, rhs = hit.get_reactants(db.Side.BOTH) lhs_types, rhs_types = hit.get_reactant_types(db.Side.BOTH) # check lhs if agg_id in rhs and \ all(get_compound_or_flask(reactant_id, reactant_type, self._compounds, self._flasks).explore() for reactant_id, reactant_type in zip(lhs, lhs_types)): hits.append((hit, db.Side.LHS)) # check rhs elif agg_id in lhs and \ all(get_compound_or_flask(reactant_id, reactant_type, self._compounds, self._flasks).explore() for reactant_id, reactant_type in zip(rhs, rhs_types)): hits.append((hit, db.Side.RHS)) return hits @abstractmethod def _filter(self, aggregate: Union[db.Compound, db.Flask], access_reactions: List[Tuple[db.Reaction, db.Side]]) \ -> bool: raise NotImplementedError
[docs]class MinimalConnectivityKinetics(KineticsBase): """ This Gear enables the exploration of aggregates (Compounds and Flasks) if they were inserted by the user or created by a reaction that requires only explorable aggregates. This should be the case for any aggregates after a sufficient number of iterations and simply drive the exploration. Attributes ---------- options : MinimalConnectivityKinetics.Options The options for the MinimalConnectivityKinetics Gear. aggregate_filter : AggregateFilter An optional filter to limit the activated aggregates, by default none are filtered """
[docs] class Options(KineticsBase.Options): """ The options for the MinimalConnectivityKinetics Gear. """ __slots__ = "user_input_only" def __init__(self) -> None: super().__init__() self.user_input_only = False """ bool Option to only ever allow compounds that contain structures added to the database by a user. If enabled, no other compounds will ever be enabled irrespective of any connectivity. """
options: Options def _filter(self, aggregate: Union[db.Compound, db.Flask], access_reactions: List[Tuple[db.Reaction, db.Side]]) \ -> bool: if self.options.user_input_only: # relies on the fact that user inputs are activated in loop regardless return False return len(access_reactions) > 0
[docs]class BasicBarrierHeightKinetics(KineticsBase): """ This Gear enables the exploration of aggregates if they were inserted by the user or created by a reaction that requires only explorable aggregates and has a forward reaction barrier below a given threshold. Attributes ---------- options : BasicBarrierHeightKinetics.Options The options for the BasicBarrierHeightKinetics Gear. aggregate_filter : AggregateFilter An optional filter to limit the activated aggregates, by default none are filtered Notes ----- Checks for all aggregates that are accessed via a 'reaction'. Manually inserted Compounds/Flasks are always activated by this gear. """
[docs] class Options(KineticsBase.Options): """ The options for the BasicBarrierHeightKinetics Gear. """ __slots__ = ("max_allowed_barrier", "max_allowed_energy", "enforce_free_energies") def __init__(self) -> None: super().__init__() self.max_allowed_barrier = 1000.0 # kJ/mol """ float The maximum barrier height of the reaction resulting in the aggregate in kJ/mol to allow the aggregate to be further explored. """ self.max_allowed_energy: Optional[float] = float('inf') """ Optional[float] The maximum energy threshold for the reaction energies allowed in the aggregate in kJ/mol to allow the aggregate to be further explored. """ self.enforce_free_energies = False """ bool Whether the gear should only enable aggregates based on free energy barriers or can also enable based on electronic energies alone. Make sure to run a Thermo gear if you set this to ``True``. """
options: Options def __init__(self) -> None: super().__init__() self.__cache: Dict[str, dict] = {}
[docs] def clear_cache(self) -> None: super().clear_cache() self.__cache = {}
def _filter(self, aggregate: Union[db.Compound, db.Flask], access_reactions: List[Tuple[db.Reaction, db.Side]]) \ -> bool: agg_id = aggregate.id() for reaction, side in access_reactions: lhs, rhs = reaction.get_reactants(db.Side.BOTH) if agg_id not in lhs + rhs: continue if not self._reaction_barrier_too_high(reaction, side): if self.options.max_allowed_energy is not None and self.options.max_allowed_energy != float('inf'): if not self._reaction_energy_too_high(reaction, side): return True else: return True return False def _reaction_energy_too_high(self, reaction: db.Reaction, accessible_side: db.Side) -> bool: """ Whether the reaction energy of the reaction is too high Parameters ---------- reaction : scine_database.Reaction (Scine::Database::Reaction) The reaction accessible_side : scine_database.Side (Scine::Database::Side) The side of the reaction that is accessible """ reaction_id = reaction.get_id().string() es_list = sorted(i.string() for i in reaction.get_elementary_steps()) cache_entry = self.__cache.get(reaction_id, None) if cache_entry is not None: if cache_entry['accessible_side'] == accessible_side and cache_entry['es_ids'] == es_list: if self.options.enforce_free_energies and self.__cache[reaction_id]['dG'] is not None: return self.__cache[reaction_id]['dG'] > self.options.max_allowed_energy if not self.options.enforce_free_energies and self.__cache[reaction_id]['dE'] is not None: return self.__cache[reaction_id]['dE'] > self.options.max_allowed_energy return True else: # expect data in cache, # expect call of _reaction_barrier_too_high before _reaction_energy_too_high raise RuntimeError("Internal error in BasicBarrierHeightKinetics Gear.") def _reaction_barrier_too_high(self, reaction: db.Reaction, accessible_side: db.Side) -> bool: """ Whether the reaction barrier of the reaction is too high Parameters ---------- reaction : scine_database.Reaction (Scine::Database::Reaction) The reaction accessible_side : scine_database.Side (Scine::Database::Side) The side of the reaction that is accessible """ reaction_id = reaction.get_id().string() es_list = sorted(i.string() for i in reaction.get_elementary_steps()) cache_entry = self.__cache.get(reaction_id, None) if cache_entry is not None and cache_entry['accessible_side'] == accessible_side \ and cache_entry['es_ids'] == es_list: if self.options.enforce_free_energies and self.__cache[reaction_id]['ddG'] is not None: return self.__cache[reaction_id]['ddG'] > self.options.max_allowed_barrier if not self.options.enforce_free_energies and self.__cache[reaction_id]['ddE'] is not None: return self.__cache[reaction_id]['ddE'] > self.options.max_allowed_barrier barrier_heights = self._barrier_height(reaction, accessible_side) self.__cache[reaction_id] = { 'es_ids': es_list, 'accessible_side': accessible_side, 'ddE': barrier_heights[1][0], 'ddG': barrier_heights[0][0], 'dE': barrier_heights[1][1], 'dG': barrier_heights[0][1], } if barrier_heights[0][0] is None: if self.options.enforce_free_energies or barrier_heights[1][0] is None: # skip if free energy barrier not available yet and only free energies allowed or electronic energy also # not available (e.g., because of model network_refinement) return True return barrier_heights[1][0] > self.options.max_allowed_barrier return barrier_heights[0][0] > self.options.max_allowed_barrier def _barrier_height(self, reaction: db.Reaction, accessible_side: db.Side) -> Tuple[ Tuple[Union[float, None], Union[float, None]], Tuple[Union[float, None], Union[float, None]], ]: """ Gives the lowest barrier height of the reaction in kJ/mol out of all the elementary steps grouped into this reaction with the given access side. db.Side.LHS means that the lowest barriers from left to right are given. If db.Side.BOTH is given, the lowest barrier from either left to right or right to left is given, depending on which is lower. Barrier heights are given as Tuple with the first being gibbs free energy and second one the electronic energy. Returns None for not available energies Parameters ---------- reaction : scine_database.Reaction (Scine::Database::Reaction) The reaction we want the barrier height from accessible_side : scine_database.Side (Scine::Database::Side) The side of the reaction that is accessible Returns ------- Tuple[Union[float, None], Union[float, None]] barrier height in kJ/mol """ barriers: Dict[str, List[float]] = {"gibbs_free_energy": [], "electronic_energy": []} energies: Dict[str, List[float]] = {"gibbs_free_energy": [], "electronic_energy": []} for step_id in reaction.get_elementary_steps(): step = db.ElementaryStep(step_id, self._elementary_steps) for energy_type, values in barriers.items(): lhs, rhs = get_barriers_for_elementary_step_by_type(step, energy_type, self.options.model, self._structures, self._properties) if accessible_side == db.Side.LHS: ordered_values = lhs, rhs elif accessible_side == db.Side.RHS: ordered_values = rhs, lhs elif accessible_side == db.Side.BOTH: if lhs is not None and rhs is not None: ordered_values = min(lhs, rhs), max(lhs, rhs) elif lhs is not None: ordered_values = lhs, None elif rhs is not None: ordered_values = rhs, None else: ordered_values = None else: raise NotImplementedError(f"Unknown accessible side {accessible_side}") if ordered_values is not None and ordered_values[0] is not None: values.append(ordered_values[0]) if ordered_values[1] is not None: energies[energy_type].append(ordered_values[0] - ordered_values[1]) barrier_gibbs = None if not barriers["gibbs_free_energy"] else min(barriers["gibbs_free_energy"]) barrier_electronic = None if not barriers["electronic_energy"] else min(barriers["electronic_energy"]) # it seems incorrect to check for validity by checking the barriers list, # but unsure if this has downstream consequences # because `get_barriers_for_elementary_step_by_type` currently returns either float, float or None, None # this should be irrelevant energy_gibbs = None if not barriers["gibbs_free_energy"] else min(energies["gibbs_free_energy"]) energy_electronic = None if not barriers["electronic_energy"] else min(energies["electronic_energy"]) return (barrier_gibbs, energy_gibbs), (barrier_electronic, energy_electronic)
[docs]class MaximumFluxKinetics(KineticsBase): """ This Gear enables the exploration of compounds if they were inserted by the user, created by a reaction that requires only compounds with a concentration flux larger than a given threshold, has a forward reaction barrier below a given maximum, and has reached a minimum concentration larger than a given threshold during kinetic modeling. Attributes ---------- options : MaximumFluxKinetics.Options The options for the MaximumFluxKinetics Gear. aggregate_filter : AggregateFilter An optional filter to limit the activated aggregates, by default none are filtered Notes ----- Checks for all compounds that are accessed via a 'reaction'. Manually inserted Compounds are always activated by this gear. """
[docs] class Options(KineticsBase.Options): """ The options for the MaximumPopulationKinetics Gear. """ __slots__ = ("min_allowed_concentration", "property_label", "variance_label") def __init__(self) -> None: super().__init__() self.min_allowed_concentration = 1e-4 """ float The minimum allowed concentration property to be considered for further exploration. """ self.property_label = "max_concentration" """ float The minimum concentration flux that is required to consider the compound as accessible. """ self.variance_label: Optional[str] = "" """ float The label for the variance of the concentration property. """
options: Options def _filter(self, aggregate: Union[db.Compound, db.Flask], __: List[Tuple[db.Reaction, db.Side]]) -> bool: start_concentration = query_concentration_with_object("start_concentration", aggregate, self._properties, self._structures) max_concentration = query_concentration_with_model_object(self.options.property_label, aggregate, self._properties, self._structures, self.options.model) if self.options.variance_label is not None and self.options.variance_label: max_concentration += query_concentration_with_model_object(self.options.variance_label, aggregate, self._properties, self._structures, self.options.model) if self.options.min_allowed_concentration > max_concentration and start_concentration < 1e-10: return False return True def _aggregate_accessible_by_reaction(self, aggregate: Union[db.Compound, db.Flask]) \ -> List[Tuple[db.Reaction, db.Side]]: """ Assume every compound to be accessible. Compound elimination happens via the concentration. This function returns an empty list to accelerate the gear. Parameters ---------- _ : Union[db.Compound, db.Flask] The compound to check to be accessible Returns ------- Empty list. """ return []
[docs]class PathfinderKinetics(KineticsBase): """ This Gear enables the exploration of compounds if they were inserted by the user or have a compound cost lower than a given compound cost threshold. The compound costs are determined by running Pathfinder with the set options and the given starting conditions. The graph is built and the compound costs determined if the ratio of structures over compounds considered for building the last graph have changed by more than a given excess ratio, by default 10%. Attributes ---------- options : PathfinderKinetics.Options The options for the PathfinderKinetics Gear. """
[docs] class Options(KineticsBase.Options): """ The options for the PathfinderKinetics Gear. """ __slots__ = ("max_compound_cost", "restart_excess_ratio", "structure_model", "start_conditions", "barrierless_weight", "filter_negative_barriers", "allow_unsolved_compound_costs", "temperature", "store_pathfinder_output") def __init__(self) -> None: super().__init__() self.max_compound_cost = 10.0 """ float The maximal compound cost allowed for a compound to be enabled for exploration. """ self.restart_excess_ratio = 0.1 """ float Percentage of ratio (structure / compounds in graph) change to trigger pathfinder analysis. """ self.store_pathfinder_output = False """ bool Bool to indicate if graph and compound costs of run should be exported to files. """ self.structure_model: Optional[db.Model] = construct_place_holder_model() """ Optional[db.Model] Structure model for the structures to be considered. """ self.start_conditions: Dict[str, float] = {} """ Dict[str, float] Starting conditions for pathfinder with the compound IDs as string and the cost as float. """ self.barrierless_weight = 1e12 """ float Weight for barrierless reactions, by default 1e12. """ self.filter_negative_barriers = False """ bool Bool to indicate if negative barriers should be considered. """ self.allow_unsolved_compound_costs = False """ bool Bool to indicate if unsolved compound costs (compounds still having a +inf cost) are allowed. """ self.temperature = 298.15 """ float Temperature to calculate the rate constants from the barriers. """
options: Options def __init__(self) -> None: super().__init__() self.finder = None self._old_ratio = None self._trigger = 0.0 def _loop_impl(self): # # # Restart the gear if self.options.restart: self._disable_all_aggregates() self.options.restart = False self._enabled_count = 0 # # # Check if finder is setup if self.finder is None: self._setup_finder() # # # Get current count of structures with model if self.options.structure_model is None or isinstance(self.options.structure_model, PlaceHolderModelType): structure_count = self._count_structures(self.options.model) else: structure_count = self._count_structures(self.options.structure_model) # # # If a graph is there and compound costs set up, determine ratio to set trigger if self.finder.graph_handler is not None and len(self.finder.compound_costs) > 0: current_ratio = structure_count / len(self.finder.compound_costs) self._trigger = current_ratio / self._old_ratio - 1.0 # better to look at change of structures with model # # # Build the graph if not build yet or the current aggregate count exceeds the tolerated percentage if self.finder.graph_handler is None or \ self._trigger > self.options.restart_excess_ratio: self.finder.build_graph() print("Nodes:", len(self.finder.graph_handler.graph.nodes)) self.finder.set_start_conditions(self.options.start_conditions) print(self.finder.start_compounds) # # # Check if start compounds have compound cost in database if not self._start_compounds_have_compound_costs(): print("Start Compounds have no cost in DB") for key in self.finder.start_compounds: centroid = self._get_centroid_from_aggregate_id_string(key, db.CompoundOrFlask.COMPOUND) self._write_compound_cost(centroid, self.finder.compound_costs[key]) # # # Only calculate compound costs, if there are any nodes in the graph if len(self.finder.graph_handler.graph.nodes) > 0: # # # Safety check if start compounds are in graph if not set(self.finder.start_compounds).issubset(self.finder.graph_handler.graph.nodes): warn("Warning: Not all start compounds in graph.") # Reset graph handler self.finder.graph_handler = None return # # # Calculate the compound costs self.finder.calculate_compound_costs() # # # Write new ratio as old ratio self._old_ratio = structure_count / len(self.finder.compound_costs) # # # Write compound cost of compound to corresponding centroid for key in self.finder.compound_costs.keys(): centroid = self._get_centroid_from_aggregate_id_string(key) self._write_compound_cost(centroid, self.finder.compound_costs[key]) # # # Export graph and compound costs if requested if self.options.store_pathfinder_output: directory_name = "./pathfinder_outputs/" if not os.path.exists(directory_name): os.makedirs(directory_name) timestamp = datetime.datetime.now().strftime("%m%d-%H%M%S") self.finder.export_graph(directory_name + timestamp + ".kinetic.graph.json") self.finder.export_compound_costs(directory_name + timestamp + ".compound_costs.json") else: self.finder.graph_handler = None # # # If unsolved compound costs persist and they are not allowed, don't try to activate stuff if not self.finder.compound_costs_solved and not self.options.allow_unsolved_compound_costs: return # Loop over all deactivated aggregates selection = {"exploration_disabled": True} # # # Loop over deactivated compounds for compound in stop_on_timeout(self._compounds.iterate_compounds(dumps(selection))): compound.link(self._compounds) if self._aggregate_was_inserted_by_user(compound): compound.enable_exploration() elif self._filter(compound, []): compound.enable_exploration() # # # Loop over deactivated flasks for flask in stop_on_timeout(self._flasks.iterate_flasks(dumps(selection))): flask.link(self._flasks) if self._aggregate_was_inserted_by_user(flask): flask.enable_exploration() elif self._filter(flask, []): flask.enable_exploration() self._check_count() def _filter(self, aggregate: Union[db.Compound, db.Flask], __: List[Tuple[db.Reaction, db.Side]]) -> bool: assert self.finder aggregate_id = aggregate.id().string() if self.finder.compound_costs_solved or self.options.allow_unsolved_compound_costs: if aggregate_id in self.finder.compound_costs.keys() and \ self.finder.compound_costs[aggregate_id] < self.options.max_compound_cost: return True # # # else: return False # # # Finder complete but compound cost not solved, raise error else: raise RuntimeError("Pathfinder cannot find a solution under given starting conditions.") def _setup_finder(self): """ Sets up the pathfinder with the barrier handler and sets the options to the pathfinder.options """ self.finder = pf(self._manager) # # # Set correction options for BarrierBased Pathfinder self.finder.options.graph_handler = "barrier" self.finder.options.model = self.options.model self.finder.options.structure_model = self.options.structure_model self.finder.options.barrierless_weight = self.options.barrierless_weight self.finder.options.filter_negative_barriers = self.options.filter_negative_barriers def _count_structures(self, model: db.Model) -> int: """ Counts the optimized structures with the given model. Parameters ---------- model : db.Model Model for the structure query Returns ------- int Number of optimized structures with the given model. """ # todo replace with db method selection = {"$and": [{"label": {"$in": ["user_optimized", "minimum_optimized", "ts_optimized", "complex_optimized", "user_complex_optimized"]}}] + model_query(model)} return self._structures.count(dumps(selection)) def _start_compounds_have_compound_costs(self): """ Checks if all start compounds have the property compound cost in their centroid. Returns ------- bool Bool indicating if all start compounds have the property compound costs in their centroid. """ centroid_list = [self._get_centroid_from_aggregate_id_string( key, db.CompoundOrFlask.COMPOUND) for key in self.finder.start_compounds] return all(centroid.has_property("compound_cost") for centroid in centroid_list) def _get_centroid_from_aggregate_id_string( self, id: str, aggregate_type: Optional[db.CompoundOrFlask] = None) -> db.Structure: """ Get the centroid structure of an aggregate. Parameters ---------- id : str The ID of the aggregate as string aggregate_type : Optional[db.CompoundOrFlask], optional The aggregate type (COMPOUND or FLASK) if known, by default None Returns ------- centroid : db.Structure The centroid of the given aggregate. """ if aggregate_type is None: aggregate_type_determined = self._get_type(id) else: aggregate_type_determined = aggregate_type aggregate = get_compound_or_flask(db.ID(id), aggregate_type_determined, self._compounds, self._flasks) centroid = aggregate.get_centroid(self._manager) return centroid def _get_type(self, key: str) -> db.CompoundOrFlask: """ Determine the aggregate type based on the node information stored in the graph. This information is encoded in each node when constructing the graph, therefore it can always be retrieved. Parameters ---------- key : str The ID of the node, either a compound or flask ID as string. Returns ------- db.CompoundOrFlask The aggregate type of the queried node. """ assert self.finder type_string = self.finder.graph_handler.graph.nodes[key]["type"] if type_string == db.CompoundOrFlask.COMPOUND.name: return db.CompoundOrFlask.COMPOUND else: return db.CompoundOrFlask.FLASK def _write_compound_cost(self, centroid: db.Structure, compound_cost: float): """ Write a compound cost as property to the database and add this property to the centroid structure. Parameters ---------- centroid : db.Structure The centroid structure of a compound or flask to which the compound cost property should be appended. compound_cost : float The determined compound cost of the centroid's compound or flask. """ label = "compound_cost" prop = db.NumberProperty.make(label, self.options.model, compound_cost, self._properties) centroid.add_property(label, prop.id()) prop.set_structure(centroid.id())
[docs]class ReactionFilterBasedKinetics(KineticsBase): """ This class will activate all aggregates fulfilling the aggregate_filter (see KineticsBase) and accessible by a reaction that fulfills the reaction_filter. Attributes ---------- reaction_filter : ReactionFilter The reaction filter. """ def __init__(self) -> None: super().__init__() self.reaction_filter: ReactionFilter = ReactionFilter() def _filter(self, aggregate: Union[db.Compound, db.Flask], access_reactions: List[Tuple[db.Reaction, db.Side]]) \ -> bool: return any(self.reaction_filter.filter(reaction) for reaction, _ in access_reactions) def _propagate_db_manager(self, manager: db.Manager): self._sanity_check_configuration() self.aggregate_filter.initialize_collections(manager) self.reaction_filter.initialize_collections(manager)