#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""
# Standard library imports
from json import dumps
from typing import List, Union, Optional, Tuple
from warnings import warn
import numpy as np
from copy import copy
# Third party imports
import scine_database as db
from scine_database.queries import stop_on_timeout
from scine_database.energy_query_functions import get_energy_for_structure
from scine_database.compound_and_flask_creation import get_compound_or_flask
import scine_utilities as utils
# Local application imports
from . import Gear
[docs]class BasicReactionHousekeeping(Gear):
"""
This Gear updates all Elementary Steps stored in the database such that they
are added to an existing Reaction or that a new Reaction is created if the
Step does not fit an existing one.
Attributes
----------
options : BasicReactionHousekeeping.Options
The options for the BasicReactionHousekeeping Gear.
Notes
-----
Checks for all Elementary Steps without a 'reaction'.
"""
def __init__(self) -> None:
super().__init__()
self.options = self.Options()
self._required_collections = ["compounds", "elementary_steps", "flasks", "reactions", "properties",
"structures"]
self._reaction_cache: Optional[dict] = None
self._model_is_required = False
[docs] class Options(Gear.Options):
"""
The options for the BasicReactionHousekeeping Gear.
"""
__slots__ = ("use_structure_deduplication",
"use_energy_deduplication",
"use_rmsd_deduplication",
"energy_tolerance",
"rmsd_tolerance")
def __init__(self) -> None:
super().__init__()
self.use_structure_deduplication = True
"""
bool
If true duplicated elementary steps are not sorted into reactions.
The criterion is that all structures on the lhs of the elementary step
have to be identical. This criterion can be combined with other selection criteria.
Steps are only eliminated if all selection criteria signal identical steps.
"""
self.use_energy_deduplication = True
"""
bool
If true duplicated elementary steps are not sorted into reactions.
The criterion is that the total electronic energy of the transition state
(if available) has to be within the threshold limits of `energy_tolerance`.
This criterion can be combined with other selection criteria. Steps are only eliminated
if all selection criteria signal identical steps.
"""
self.use_rmsd_deduplication = True
"""
bool
If true duplicated elementary steps are not sorted into reactions.
The criterion is that the RMSD of the transition state coordinates
(if available) have to be within the threshold limits of `rmsd_tolerance`.
This criterion can be combined with other selection criteria. Steps are only eliminated
if all selection criteria signal identical steps. Note that this deduplication
strategy may fail to eliminate elementary steps if the atom ordering the transition
state differs.
"""
self.energy_tolerance = 1e-5
"""
float
The energy tolerance for the transition state energy deduplication.
"""
self.rmsd_tolerance = 1e-3
"""
float
The RMSD tolerance for the transition state RMSD deduplication.
"""
options: Options
def _loop_impl(self):
# Setup query for elementary steps without reactions
selection = {"$and": [
{"reaction": ""},
{"analysis_disabled": {"$ne": True}}
]}
uses_elementary_step_deduplication =\
self.options.use_structure_deduplication or self.options.use_energy_deduplication or \
self.options.use_rmsd_deduplication
# Loop over all results
for step in stop_on_timeout(self._elementary_steps.iterate_elementary_steps(dumps(selection))):
step.link(self._elementary_steps)
reactants = step.get_reactants(db.Side.BOTH)
if self.stop_at_next_break_point:
return
# Determine structure types
types = [[], []]
aggregates = [[], []]
all_structures_have_aggregates = True
for i, side in enumerate(reactants):
if not all_structures_have_aggregates:
# we are breaking here to avoid rhs evaluation if lhs is already incomplete
break
for structure_id in side:
structure = db.Structure(structure_id, self._structures)
# check aggregate
if not structure.has_aggregate():
all_structures_have_aggregates = False
break
if not structure.has_graph("masm_cbor_graph"):
raise RuntimeError(f"The Structure {str(structure.id())} has an aggregate, but no graph")
graph = structure.get_graph("masm_cbor_graph")
agg_type = db.CompoundOrFlask.FLASK if ";" in graph else db.CompoundOrFlask.COMPOUND
types[i].append(agg_type)
aggregates[i].append(structure.get_aggregate())
if not all_structures_have_aggregates:
continue
self._replace_duplicate_structures(step)
# Check for a reactions with the same structures/compounds
true_hit, is_parallel = self._check_cached_reactions(aggregates[0], aggregates[1])
if true_hit is not None:
if not is_parallel:
self._invert_elementary_step(step)
# Add elementary step to reaction if it is not duplicated.
if uses_elementary_step_deduplication and self._is_duplicate(step, true_hit):
step.disable_analysis()
step.disable_exploration()
reaction = true_hit
# check for regular vs barrierless
self._disable_barrierless_if_mixed_types_in_reaction(reaction, step)
reaction.add_elementary_step(step.id())
step.set_reaction(reaction.id())
reaction.enable_exploration()
reaction.enable_analysis()
continue
# Generate new reaction
reaction = db.Reaction(db.ID(), self._reactions)
reaction.create(aggregates[0], aggregates[1], types[0], types[1])
reaction.add_elementary_step(step.id())
step.set_reaction(reaction.id())
self._add_reaction_to_aggregate(aggregates[0] + aggregates[1], types[0] + types[1], reaction.id())
self._add_to_cache(reaction, aggregates[0], aggregates[1])
def _check_cached_reactions(
self, lhs_aggregates: List[db.ID], rhs_aggregates: List[db.ID]
) -> Tuple[Optional[db.Reaction], bool]:
self._rebuild_cache()
assert self._reaction_cache is not None
key = ','.join(sorted([x.string() for x in lhs_aggregates]))
key += '<->'
key += ','.join(sorted([x.string() for x in rhs_aggregates]))
if key in self._reaction_cache:
reaction = db.Reaction(db.ID(self._reaction_cache[key]))
reaction.link(self._reactions)
return reaction, True
key = ','.join(sorted([x.string() for x in rhs_aggregates]))
key += '<->'
key += ','.join(sorted([x.string() for x in lhs_aggregates]))
if key in self._reaction_cache:
reaction = db.Reaction(db.ID(self._reaction_cache[key]))
reaction.link(self._reactions)
return reaction, False
return None, True
def _rebuild_cache(self) -> None:
if self._reaction_cache is None:
self._reaction_cache = {}
for r in self._reactions.iterate_reactions('{}'):
r.link(self._reactions)
reactants = r.get_reactants(db.Side.BOTH)
key = ','.join(sorted([x.string() for x in reactants[0]]))
key += '<->'
key += ','.join(sorted([x.string() for x in reactants[1]]))
self._reaction_cache[key] = r.get_id().string()
def _add_to_cache(
self, reaction: db.Reaction, lhs_aggregates: List[db.ID], rhs_aggregates: List[db.ID]
) -> None:
assert self._reaction_cache is not None
key = ','.join(sorted([x.string() for x in lhs_aggregates]))
key += '<->'
key += ','.join(sorted([x.string() for x in rhs_aggregates]))
assert key not in self._reaction_cache
self._reaction_cache[key] = reaction.get_id().string()
def _replace_duplicate_structures(self, elementary_step: db.ElementaryStep):
reactants = elementary_step.get_reactants(db.Side.BOTH)
unique_lhs = self._make_unique_structure_id_list(reactants[0])
unique_rhs = self._make_unique_structure_id_list(reactants[1])
elementary_step.set_reactants(unique_lhs, db.Side.LHS)
elementary_step.set_reactants(unique_rhs, db.Side.RHS)
def _make_unique_structure_id_list(self, id_list: List[db.ID]) -> List[db.ID]:
unique_list: List[db.ID] = list()
for s_id in id_list:
structure = db.Structure(s_id, self._structures)
if structure.get_label() == db.Label.DUPLICATE:
unique_list.append(structure.get_original())
else:
unique_list.append(s_id)
return unique_list
@staticmethod
def _invert_elementary_step(elementary_step: db.ElementaryStep):
reactants = copy(elementary_step.get_reactants(db.Side.BOTH))
elementary_step.set_reactants(reactants[1], db.Side.LHS)
elementary_step.set_reactants(reactants[0], db.Side.RHS)
if elementary_step.has_spline():
spline = elementary_step.get_spline()
knots = np.flipud(np.asarray([1 - x for x in spline.knots]))
trajectory = np.flipud(spline.data)
ts_position = 1 - spline.ts_position
elements = spline.elements
inverted_spline = utils.bsplines.TrajectorySpline(elements, knots, trajectory, ts_position)
elementary_step.set_spline(inverted_spline)
def _disable_barrierless_if_mixed_types_in_reaction(self, reaction: db.Reaction, new_step: db.ElementaryStep):
"""
If we have a regular elementary step and a barrierless elementary step for an identical reaction,
we are for now distrusting the barrierless steps and disable them, but keep them in the reaction.
NOTE: This will not work with old databases that have not worked with this method before out of the box.
To ensure backwards applicability, the content of the reaction collection has to be deleted first.
NOTE: This method assumes there are only TWO types of elementary steps.
"""
# we have two possible procedures here:
# 1) check new step type:
# - if regular, loop over all steps of reaction and disable all barrierless steps
# - if barrierless, loop over steps of reaction, if we find a regular step, break and disable new
# 2) check type of first enabled step in reaction:
# - if different to new step type, check new type
# - if regular, loop over all steps of reaction and disable barrierless
# - if barrierless, only disable new
# - if identical do nothing
# case 2 seems to be more efficient, works on the assumption that newly added entry has always been checked,
# hence added note in doc string
step_type = None
steps = reaction.get_elementary_steps()
# find step type of first enabled step in reaction
for step_id in steps:
reaction_step = db.ElementaryStep(step_id, self._elementary_steps)
if reaction_step.explore() and reaction_step.analyze():
step_type = reaction_step.get_type()
break
# we have a mismatch between the first activate step in reaction and the new step
if step_type is not None and step_type != new_step.get_type():
warn(f"Found barrierless and regular elementary step for identical reaction {str(reaction.id())}. "
f"We are now disabling all barrierless steps in this reaction.")
new_type = new_step.get_type()
barrierless_types = [db.ElementaryStepType.BARRIERLESS]
if new_type in barrierless_types:
# new is barrierless, first enabled was regular, hence we should not need to check all steps of reaction
new_step.disable_exploration()
new_step.disable_analysis()
elif new_type == db.ElementaryStepType.REGULAR:
# new step is regular, and first enabled was barrierless, disable all barrierless in reaction
for step_id in steps:
reaction_step = db.ElementaryStep(step_id, self._elementary_steps)
if reaction_step.get_type() in barrierless_types:
reaction_step.disable_exploration()
reaction_step.disable_analysis()
else:
raise TypeError(f"Unsupported elementary step type for elementary step {str(new_step.id())}")
def _add_reaction_to_aggregate(self, aggregates_to_change: List[db.ID], aggregate_types: List[db.CompoundOrFlask],
reaction_id: db.ID):
for aggregate_id, aggregate_type in zip(aggregates_to_change, aggregate_types):
flask_or_compound = get_compound_or_flask(aggregate_id, aggregate_type, self._compounds, self._flasks)
flask_or_compound.add_reaction(reaction_id)
def _same_energy(self, ts_new_energy: Union[float, None], ts_old: db.Structure, model: db.Model) -> bool:
ts_old_energy = get_energy_for_structure(ts_old, "electronic_energy", model, self._structures,
self._properties)
# Check transition state energy.
if ts_old_energy is None or ts_new_energy is None:
return False
abs_difference = abs(ts_old_energy - ts_new_energy)
return abs_difference <= self.options.energy_tolerance
def _same_coordinates(self, ts_positions_new: np.ndarray, ts_elements: List[utils.ElementType],
ts_old: db.Structure) -> bool:
fit = utils.QuaternionFit(ts_positions_new, ts_old.get_atoms().positions, ts_elements)
rmsd = fit.get_weighted_rmsd()
# Check structure RMSD
# TODO It may be possible to reorder the atoms of both transition states to be in identical
# order before checking the RMSD using atom-index maps.
return rmsd <= self.options.rmsd_tolerance
def _same_starting_structures(self, lhs_new_step: List[db.ID], old_step: db.ElementaryStep) -> bool:
lhs_old_step = old_step.get_reactants(db.Side.LHS)[0]
if len(lhs_old_step) != len(lhs_new_step):
return False
return sorted(lhs_old_step) == sorted(lhs_new_step)
def _is_duplicate(self, elementary_step: db.ElementaryStep, reaction: db.Reaction) -> bool:
lhs_new_step = elementary_step.get_reactants(db.Side.LHS)[0]
if not elementary_step.has_transition_state():
return False
ts_new = db.Structure(elementary_step.get_transition_state(), self._structures)
model = ts_new.model
ts_new_energy = get_energy_for_structure(
ts_new, "electronic_energy", model, self._structures, self._properties)
ts_positions_new = ts_new.get_atoms().positions
ts_elements = ts_new.get_atoms().elements
for old_step_id in reaction.get_elementary_steps():
old_step = db.ElementaryStep(old_step_id, self._elementary_steps)
if self.options.use_structure_deduplication:
if not self._same_starting_structures(lhs_new_step, old_step):
continue
if not old_step.has_transition_state():
continue
ts_old = db.Structure(old_step.get_transition_state(), self._structures)
if self.options.use_energy_deduplication:
if not self._same_energy(ts_new_energy, ts_old, model):
continue
if self.options.use_rmsd_deduplication:
if not self._same_coordinates(ts_positions_new, ts_elements, ts_old):
continue
# If all checks signal identical structures. This is a duplicate!
return True
return False