Source code for scine_chemoton.gears.reaction

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Laboratory of Physical Chemistry, Reiher Group.
See LICENSE.txt for details.
"""

# Standard library imports
from json import dumps
from typing import List, Union
from warnings import warn
import numpy as np
from copy import copy

# Third party imports
import scine_database as db
import scine_utilities as utils

# Local application imports
from . import Gear
from ..utilities.queries import identical_reaction, stop_on_timeout, _verify_identical_reaction
from ..utilities.energy_query_functions import get_energy_for_structure


[docs]class BasicReactionHousekeeping(Gear): """ This Gear updates all Elementary Steps stored in the database such that they are added to an existing Reaction or that a new Reaction is created if the Step does not fit an existing one. Attributes ---------- options :: BasicReactionHousekeeping.Options The options for the BasicReactionHousekeeping Gear. Notes ----- Checks for all Elementary Steps without a 'reaction'. """ def __init__(self): super().__init__() self.options = self.Options() self._required_collections = ["compounds", "elementary_steps", "flasks", "reactions", "properties", "structures"]
[docs] class Options: """ The options for the BasicReactionHousekeeping Gear. """ __slots__ = ("cycle_time", "use_structure_deduplication", "use_energy_deduplication", "use_rmsd_deduplication", "energy_tolerance", "rmsd_tolerance") def __init__(self): self.cycle_time = 10 """ int The minimum number of seconds between two cycles of the Gear. Cycles are finished independent of this option, thus if a cycle takes longer than the cycle_time will effectively lead to longer cycle times and not cause multiple cycles of the same Gear. """ self.use_structure_deduplication = True """ bool If true duplicated elementary steps are not sorted into reactions. The criterion is that all structures on the lhs of the elementary step have to be identical. This criterion can be combined with other selection criteria. Steps are only eliminated if all selection criteria signal identical steps. """ self.use_energy_deduplication = True """ bool If true duplicated elementary steps are not sorted into reactions. The criterion is that the total electronic energy of the transition state (if available) has to be within the threshold limits of `energy_tolerance`. This criterion can be combined with other selection criteria. Steps are only eliminated if all selection criteria signal identical steps. """ self.use_rmsd_deduplication = True """ bool If true duplicated elementary steps are not sorted into reactions. The criterion is that the RMSD of the transition state coordinates (if available) have to be within the threshold limits of `rmsd_tolerance`. This criterion can be combined with other selection criteria. Steps are only eliminated if all selection criteria signal identical steps. Note that this deduplication strategy may fail to eliminate elementary steps if the atom ordering the transition state differs. """ self.energy_tolerance = 1e-5 """ float The energy tolerance for the transition state energy deduplication. """ self.rmsd_tolerance = 1e-3 """ float The RMSD tolerance for the transition state RMSD deduplication. """
def _loop_impl(self): # Setup query for elementary steps without reactions selection = {"$and": [ {"reaction": ""}, {"analysis_disabled": {"$ne": True}} ]} uses_elementary_step_deduplication =\ self.options.use_structure_deduplication or self.options.use_energy_deduplication or \ self.options.use_rmsd_deduplication # Loop over all results for step in stop_on_timeout(self._elementary_steps.iterate_elementary_steps(dumps(selection))): step.link(self._elementary_steps) reactants = step.get_reactants(db.Side.BOTH) # Determine structure types types = [[], []] aggregates = [[], []] all_structures_have_aggregates = True for i, side in enumerate(reactants): if not all_structures_have_aggregates: # we are breaking here to avoid rhs evaluation if lhs is already incomplete break for structure_id in side: structure = db.Structure(structure_id, self._structures) # check aggregate if not structure.has_aggregate(): all_structures_have_aggregates = False break else: aggregates[i].append(structure.get_aggregate()) # check label label = structure.get_label() if label in [db.Label.MINIMUM_OPTIMIZED, db.Label.USER_OPTIMIZED, db.Label.DUPLICATE]: types[i].append(db.CompoundOrFlask.COMPOUND) elif label == db.Label.COMPLEX_OPTIMIZED: types[i].append(db.CompoundOrFlask.FLASK) else: raise RuntimeError(f"Structure label '{str(label)}' is not supported for aggregation.") if not all_structures_have_aggregates: continue self._replace_duplicate_structures(step) # Check for a reactions with the same structures/compounds true_hit = identical_reaction(aggregates[0], aggregates[1], types[0], types[1], self._reactions) if true_hit is not None: is_parallel = _verify_identical_reaction(aggregates[0], aggregates[1], true_hit) if not is_parallel: self._invert_elementary_step(step) # Add elementary step to reaction if it is not duplicated. if uses_elementary_step_deduplication and self._is_duplicate(step, true_hit): step.disable_analysis() step.disable_exploration() continue reaction = true_hit # check for regular vs barrierless self._disable_barrierless_if_mixed_types_in_reaction(reaction, step) reaction.add_elementary_step(step.id()) step.set_reaction(reaction.id()) continue # Generate new reaction reaction = db.Reaction(db.ID(), self._reactions) reaction.create(aggregates[0], aggregates[1], types[0], types[1]) reaction.add_elementary_step(step.id()) step.set_reaction(reaction.id()) self._add_reaction_to_aggregate(aggregates[0] + aggregates[1], types[0] + types[1], reaction.id()) def _replace_duplicate_structures(self, elementary_step: db.ElementaryStep): reactants = elementary_step.get_reactants(db.Side.BOTH) unique_lhs = self._make_unique_structure_id_list(reactants[0]) unique_rhs = self._make_unique_structure_id_list(reactants[1]) elementary_step.set_reactants(unique_lhs, db.Side.LHS) elementary_step.set_reactants(unique_rhs, db.Side.RHS) def _make_unique_structure_id_list(self, id_list): unique_list: List[db.ID] = list() for s_id in id_list: structure = db.Structure(s_id, self._structures) if structure.get_label() == db.Label.DUPLICATE: unique_list.append(structure.is_duplicate_of()) else: unique_list.append(s_id) return unique_list @staticmethod def _invert_elementary_step(elementary_step: db.ElementaryStep): reactants = copy(elementary_step.get_reactants(db.Side.BOTH)) elementary_step.set_reactants(reactants[1], db.Side.LHS) elementary_step.set_reactants(reactants[0], db.Side.RHS) if elementary_step.has_spline(): spline = elementary_step.get_spline() knots = np.flipud(np.asarray([1 - x for x in spline.knots])) trajectory = np.flipud(spline.data) ts_position = 1 - spline.ts_position elements = spline.elements inverted_spline = utils.bsplines.TrajectorySpline(elements, knots, trajectory, ts_position) elementary_step.set_spline(inverted_spline) def _disable_barrierless_if_mixed_types_in_reaction(self, reaction: db.Reaction, new_step: db.ElementaryStep): """ If we have a regular elementary step and a barrierless elementary step for an identical reaction, we are for now distrusting the barrierless steps and disable them, but keep them in the reaction. NOTE: This will not work with old databases that have not worked with this method before out of the box. To ensure backwards applicability, the content of the reaction collection has to be deleted first. NOTE: This method assumes there are only TWO types of elementary steps. """ # we have two possible procedures here: # 1) check new step type: # - if regular, loop over all steps of reaction and disable all barrierless steps # - if barrierless, loop over steps of reaction, if we find a regular step, break and disable new # 2) check type of first enabled step in reaction: # - if different to new step type, check new type # - if regular, loop over all steps of reaction and disable barrierless # - if barrierless, only disable new # - if identical do nothing # case 2 seems to be more efficient, works on the assumption that newly added entry has always been checked, # hence added note in doc string step_type = None steps = reaction.get_elementary_steps() # find step type of first enabled step in reaction for step_id in steps: reaction_step = db.ElementaryStep(step_id, self._elementary_steps) if reaction_step.explore() and reaction_step.analyze(): step_type = reaction_step.get_type() break # we have a mismatch between the first activate step in reaction and the new step if step_type is not None and step_type != new_step.get_type(): warn(f"Found barrierless and regular elementary step for identical reaction {str(reaction.id())}. " f"We are now disabling all barrierless steps in this reaction.") new_type = new_step.get_type() if new_type == db.ElementaryStepType.BARRIERLESS: # new is barrierless, first enabled was regular, hence we should not need to check all steps of reaction new_step.disable_exploration() new_step.disable_analysis() elif new_type == db.ElementaryStepType.REGULAR: # new step is regular, and first enabled was barrierless, disable all barrierless in reaction for step_id in steps: reaction_step = db.ElementaryStep(step_id, self._elementary_steps) if reaction_step.get_type() == db.ElementaryStepType.BARRIERLESS: reaction_step.disable_exploration() reaction_step.disable_analysis() else: raise TypeError(f"Unsupported elementary step type for elementary step {str(new_step.id())}") def _add_reaction_to_aggregate(self, aggregates_to_change: List[db.ID], aggregate_types: List[db.CompoundOrFlask], reaction_id: db.ID): for aggregate_id, aggregate_type in zip(aggregates_to_change, aggregate_types): if aggregate_type == db.CompoundOrFlask.COMPOUND: compound = db.Compound(aggregate_id, self._compounds) compound.add_reaction(reaction_id) elif aggregate_type == db.CompoundOrFlask.FLASK: flask = db.Flask(aggregate_id, self._flasks) flask.add_reaction(reaction_id) else: raise RuntimeError(f"Requested aggregate type '{aggregate_type}' is not supported.") def _same_energy(self, ts_new_energy: Union[float, None], ts_old: db.Structure, model: db.Model) -> bool: ts_old_energy = get_energy_for_structure(ts_old, "electronic_energy", model, self._structures, self._properties) # Check transition state energy. if ts_old_energy is None or ts_new_energy is None: return False abs_difference = abs(ts_old_energy - ts_new_energy) return abs_difference <= self.options.energy_tolerance def _same_coordinates(self, ts_positions_new: np.ndarray, ts_elements: List[utils.ElementType], ts_old: db.Structure) -> bool: fit = utils.QuaternionFit(ts_positions_new, ts_old.get_atoms().positions, ts_elements) rmsd = fit.get_weighted_rmsd() # Check structure RMSD # TODO It may be possible to reorder the atoms of both transition states to be in identical # order before checking the RMSD using atom-index maps. return rmsd <= self.options.rmsd_tolerance def _same_starting_structures(self, lhs_new_step: List[db.ID], old_step: db.ElementaryStep) -> bool: lhs_old_step = old_step.get_reactants(db.Side.LHS)[0] if len(lhs_old_step) != len(lhs_new_step): return False return sorted(lhs_old_step) == sorted(lhs_new_step) def _is_duplicate(self, elementary_step: db.ElementaryStep, reaction: db.Reaction) -> bool: lhs_new_step = elementary_step.get_reactants(db.Side.LHS)[0] if not elementary_step.has_transition_state(): return False ts_new = db.Structure(elementary_step.get_transition_state(), self._structures) model = ts_new.model ts_new_energy = get_energy_for_structure(ts_new, "electronic_energy", model, self._structures, self._properties) ts_positions_new = ts_new.get_atoms().positions ts_elements = ts_new.get_atoms().elements for old_step_id in reaction.get_elementary_steps(): old_step = db.ElementaryStep(old_step_id, self._elementary_steps) if self.options.use_structure_deduplication: if not self._same_starting_structures(lhs_new_step, old_step): continue if not old_step.has_transition_state(): continue ts_old = db.Structure(old_step.get_transition_state(), self._structures) if self.options.use_energy_deduplication: if not self._same_energy(ts_new_energy, ts_old, model): continue if self.options.use_rmsd_deduplication: if not self._same_coordinates(ts_positions_new, ts_elements, ts_old): continue # If all checks signal identical structures. This is a duplicate! return True return False