# -*- coding: utf-8 -*-
from __future__ import annotations
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""
import sys
from copy import deepcopy
from typing import TYPE_CHECKING, Dict
from scine_puffin.config import Configuration
from .templates.job import breakable, calculation_context, job_configuration_wrapper
from .templates.scine_react_job import ReactJob
from typing import Optional, List
from scine_puffin.utilities.scine_helper import SettingsManager
from scine_puffin.utilities.imports import module_exists, MissingDependency
if module_exists("scine_database") or TYPE_CHECKING:
import scine_database as db
else:
db = MissingDependency("scine_database")
if module_exists("scine_utilities") or TYPE_CHECKING:
import scine_utilities as utils
else:
utils = MissingDependency("scine_database")
[docs]class ScineBsplineOptimization(ReactJob):
"""
The job interpolated between two structures. Generates a transition state guess,
optimizes the transition state, and verifies the IRC.
The job performs the following steps:
1. Optimize reactant and product structure.
If reactant and product converge to structures with the same graph. Stop and add
barrier-less reactions if at least one of the original structures was a flask
(+ geometry optimization of the separated structures).
Else:
2. Run a B-spline interpolation and optimization to extract an TS guess.
3. Optimize the transition state.
4. See @scine_react_complex_nt2.py for all following steps (Hessian, IRC, ...).
**Order Name**
``scine_bspline_optimization``
**Required Input**
The first structures corresponds to the reactants, the second structure
to the product.
**Optional Settings**
Optional settings are read from the ``settings`` field, which is part of
any ``Calculation`` stored in a SCINE Database.
All possible settings for this job are based on those available in SCINE
ReaDuct. For a complete list see the
`ReaDuct manual <https://scine.ethz.ch/download/readuct>`_
Given that this job does more than one, in fact many separate calculations
it is possible to target each individually with the settings. In order to
achieve this each regular setting, such as ``convergence_max_iterations``
has to be prepended with a tag, identifying which part of the job it is
meant to impact. If the setting is meant to be added to the IRC scan at
the end of this job ``irc_convergence_max_iterations`` should be used.
Note that this may include a doubling of this style of flags, as Readuct
uses a similar way of sorting options. Hence, choosing a none default
``irc_mode`` in this task has to be done using ``irc_irc_mode``.
The complete list prefixes for specific settings for the steps listed at
the start of this section is:
1. TS optimization: ``tsopt_*``
2. Validation using an IRC scan: ``irc_*``
3. Optimization of the structures obtained with the IRC scan : ``ircopt_*``
4. Optimization of the products and reactants: ``opt_*``
The following settings are recognized without a prepending flag:
add_based_on_distance_connectivity : bool
Whether to add the connectivity (i.e. add bonds) as derived from
atomic distances when graphs are generated. (default: True)
sub_based_on_distance_connectivity : bool
Whether to subtract the connectivity (i.e. remove bonds) as derived from
atomic distances when graphs are generated. (default: True)
only_distance_connectivity : bool
Whether to impose the connectivity solely from distances. (default: False)
imaginary_wavenumber_threshold : float
Threshold value in inverse centimeters below which a wavenumber
is considered as imaginary when the transition state is analyzed.
Negative numbers are interpreted as imaginary. (default: 0.0)
spin_propensity_check : int
The range to check for possible multiplicities for products. A value
of 2 (default) will check triplet and quintet for a singlet
and will check singlet, quintet und septet for triplet.
Additionally, all settings that are recognized by the SCF program chosen.
are also available. These settings are not required to be prepended with
any flag.
Common examples are:
max_scf_iterations : int
The number of allowed SCF cycles until convergence.
**Required Packages**
- SCINE: Database (present by default)
- SCINE: molassembler (present by default)
- SCINE: Readuct (present by default)
- SCINE: Utils (present by default)
- A program implementing the SCINE Calculator interface, e.g. Sparrow
**Generated Data**
If successful (technically and chemically) the following data will be
generated and added to the database:
Elementary Steps
If found, a single new elementary step with the associated transition
state will be added to the database.
Structures
The transition state (TS) and also the separated products and reactants
will be added to the database.
Properties
The ``hessian`` (``DenseMatrixProperty``), ``frequencies``
(``VectorProperty``), ``normal_modes`` (``DenseMatrixProperty``),
``gibbs_energy_correction`` (``NumberProperty``) and
``gibbs_free_energy`` (``NumberProperty``) of the TS will be
provided. The ``electronic_energy`` associated with the TS structure and
each of the products will be added to the database.
"""
def __init__(self) -> None:
super().__init__()
self.name = "Scine double ended transition state optimization from b-splines"
self.exploration_key = "bspline"
tsopt_defaults = {
"output": ["ts"],
"optimizer": "bofill",
"convergence_max_iterations": 200,
}
irc_defaults = {
"output": ["irc_forward", "irc_backward"],
"convergence_max_iterations": 50,
"irc_initial_step_size": 0.1,
"sd_factor": 1.0,
"stop_on_error": False,
}
ircopt_defaults = {"stop_on_error": True, "convergence_max_iterations": 200}
opt_defaults = {
"convergence_max_iterations": 500,
}
bspline_defaults = {
"output": ["tsguess"],
"extract_ts_guess": True,
"optimize": True,
}
self.settings = {
**self.settings,
"bspline": bspline_defaults,
"tsopt": tsopt_defaults,
"irc": irc_defaults,
"ircopt": ircopt_defaults,
self.opt_key: opt_defaults,
}
[docs] @job_configuration_wrapper
def run(self, manager: db.Manager, calculation: db.Calculation, config: Configuration) -> bool:
import scine_readuct as readuct
import scine_molassembler as masm
# Everything that calls SCINE is enclosed in a try/except block
with breakable(calculation_context(self)):
if len(calculation.get_structures()) != 2:
self.raise_named_exception(f"{self.name} requires 2 input structures.")
r_structure = db.Structure(calculation.get_structures()[0], self._structures)
p_structure = db.Structure(calculation.get_structures()[1], self._structures)
if len(r_structure.get_atoms()) != len(p_structure.get_atoms()):
self.raise_named_exception(f"{self.name} requires that the input structures are the same molecule.")
if r_structure.get_model() != p_structure.get_model():
self.raise_named_exception(f"{self.name} requires that the input structures have the same model.")
if r_structure.get_multiplicity() != p_structure.get_multiplicity() or \
r_structure.get_charge() != p_structure.get_charge():
self.raise_named_exception(f"{self.name} requires that the input structures have the same "
f"molecular charge and spin multiplicity.")
settings_manager, program_helper = self.create_helpers(r_structure)
settings_manager.separate_settings(self._calculation.get_settings())
settings_manager.update_calculator_settings(r_structure, self._calculation.get_model(),
self.config["resources"])
self.sort_settings(settings_manager.task_settings)
self.ref_structure = r_structure
# Prepare the structures by
# 1. optimizing both spline ends.
# 2. set attributes of parent class start_graph, start_charges etc.
reactant_name, product_name, opt_r_graph, opt_p_graph = self.__prepare_structures(settings_manager,
r_structure, p_structure)
# Stop the calculation if both spline ends collapsed to the same species.
if masm.JsonSerialization.equal_molecules(opt_r_graph, opt_p_graph):
self.__check_barrierless_reactions(settings_manager, reactant_name, product_name, r_structure,
p_structure)
calculation.set_comment(self.name + " Spline ends transform barrier-less!")
self.capture_raw_output()
raise breakable.Break
# self.save_initial_graphs_and_charges(settings_manager, [opt_reactant])
# The spline ends are true minima. Therefore, their must be a transition state between them.
""" B-Spline Optimization """
inputs = [reactant_name, product_name]
print("\nBspline Settings:")
print(self.settings["bspline"], "\n")
self.systems, success = readuct.run_bspline_task(
self.systems, inputs, **self.settings["bspline"]
)
self.throw_if_not_successful(
success,
self.systems,
inputs,
["energy"],
"B-Spline optimization failed:\n",
)
""" TSOPT-Hess-IRC """
inputs = self.output("bspline")
product_names, start_names = self._tsopt_hess_irc_ircopt(inputs[0], settings_manager)
""" Store new starting material conformer(s) """
r_tuple = None
# This may be re-enabled in the future. Therefore, I would like to keep it as a comment.
# p_tuple = self.__check_barrierless_alternative_reactions(settings_manager, product_name, p_structure,
# product_names, "product_00")
if start_names is not None:
if not self.no_irc_structure_matches_start:
r_tuple = self.__check_barrierless_alternative_reactions(settings_manager, reactant_name,
r_structure, start_names, "reactant_00")
start_structures = self.store_start_structures(
start_names, program_helper, "tsopt", [r_structure.id()])
else:
if r_structure.get_model() == self._calculation.get_model():
start_structures = [self._calculation.get_structures()[0]]
else:
start_structures = self.store_start_structures(
[reactant_name], program_helper, "tsopt", [r_structure.id()])
# If the lhs or rhs of the reaction decomposes into fragment through a barrier-less reaction and these
# fragments are different from the fragments of the original lhs or rhs, e.g,
# final: A + B --> AB --> C
# orig: D + E --> DE --> F optimized into D' + E' --> AB --> C
# we have to add a barrier-less reaction transforming between the fragments. A simple example for such
# a situation is a barrier-less protonation that is only barrier-less with the "new" electronic structure
# model.
# Note that this logic only applies if the individual endpoints used as input for the spline, are
# rediscovered by the IRC. Since this is only checked for the lhs, the corresponding fragmentation
# embedding for the rhs is disabled at the moment.
lhs, _, _ = self.react_postprocessing(product_names, program_helper, "tsopt", start_structures)
if r_tuple is not None:
self.__add_barrierless_reaction(r_tuple[0], r_tuple[1], r_tuple[2], lhs, r_tuple[3])
# if p_tuple is not None:
# self.__add_barrierless_reaction(p_tuple[0], p_tuple[1], p_tuple[2], rhs, p_tuple[3])
return self.postprocess_calculation_context()
def __set_up_calculator(self, structure: db.Structure, settings_manager: SettingsManager, name: str):
"""
Create a calculator for the given structure.
"""
xyz_name = name + ".xyz"
utils.io.write(xyz_name, structure.get_atoms())
structure_calculator_settings = deepcopy(settings_manager.calculator_settings)
structure_calculator_settings[utils.settings_names.molecular_charge] = structure.get_charge()
structure_calculator_settings[
utils.settings_names.spin_multiplicity] = structure.get_multiplicity()
reactant = utils.core.load_system_into_calculator(
xyz_name,
self._calculation.get_model().method_family,
**structure_calculator_settings,
)
self.systems[name] = reactant
fragment_atoms, graph_string, charges, multiplicities, decision_lists = self.get_graph_charges_multiplicities(
name, structure.get_charge())
if structure.has_graph("masm_cbor_graph") and structure.has_graph("masm_decision_list"):
structure.set_graph("masm_cbor_graph", graph_string)
structure.set_graph("masm_decision_list", ";".join(decision_lists))
return fragment_atoms, graph_string, charges, multiplicities, decision_lists
def __check_barrierless_alternative_reactions(self, settings_manager: SettingsManager, opt_name_reactant: str,
r_structure: db.Structure, opt_reactant_fragment_names: List[str],
r_name: str):
"""
Check if there are multiple plausible decomposition paths for the given complex or structure, i.e, the structure
reacted barrier-less leading to a different fragmentation path as for the original elementary step.
"""
r_fragments, _, r_charges, r_multi, _ = self.__set_up_calculator(r_structure, settings_manager, r_name)
if len(r_fragments) == 1:
return None
opt_r_orig_names, opt_r_orig_fragment_graphs, _ = self.__optimize_and_get_graphs_and_energies(
"opt_" + r_name + "_orig_fragments", r_fragments, r_charges, r_multi, settings_manager
)
if not self.__same_molecules(opt_r_orig_names, opt_reactant_fragment_names):
opt_r_graph, self.systems = self.make_graph_from_calc(self.systems, opt_name_reactant)
return opt_name_reactant, opt_r_orig_fragment_graphs, opt_r_graph, opt_r_orig_names
return None
def __check_barrierless_reactions(self, settings_manager: SettingsManager, opt_name_reactant: str,
opt_name_product: str, r_structure: db.Structure, p_structure: db.Structure):
"""
Check if both input structures collapsed to the same molecule. We will compare the optimized structures of
the input's lhs and rhs.
"""
import scine_molassembler as masm
results = self._calculation.get_results()
results.clear()
self._calculation.set_results(results)
charge = r_structure.get_charge()
r_name = "reactant_00"
p_name = "product_00"
r_fragments, r_graph, r_charges, r_multi, _ = self.__set_up_calculator(
r_structure, settings_manager, r_name)
p_fragments, p_graph, p_charges, p_multi, _ = self.__set_up_calculator(
p_structure, settings_manager, p_name)
# check graph of spline ends
opt_r_fragments, opt_r_graph, opt_r_charges, opt_r_multiplicities, _ = \
self.get_graph_charges_multiplicities(opt_name_reactant, charge)
opt_p_fragments, opt_p_graph, opt_p_charges, opt_p_multiplicities, _ = \
self.get_graph_charges_multiplicities(opt_name_product, charge)
# create structures for optimized ends
if ";" in opt_r_graph:
opt_r_fragment_names, opt_r_fragment_graphs, _ = self.__optimize_and_get_graphs_and_energies(
"opt_r_fragments", opt_r_fragments, opt_r_charges, opt_r_multiplicities, settings_manager)
opt_reactant_structure_ids = self.__add_barrierless_reaction(opt_name_reactant, opt_r_fragment_graphs,
opt_r_graph, None, opt_r_fragment_names)
# optimization changed the initial complex. Add barrier-less step between previous fragments and the
# optimized fragment
if ";" in r_graph and not masm.JsonSerialization.equal_molecules(r_graph, opt_r_graph):
opt_r_orig_framnet_names, opt_r_orig_fragment_graphs, _ = self.__optimize_and_get_graphs_and_energies(
"opt_r_orig_fragments", r_fragments, r_charges, r_multi, settings_manager
)
if not self.__same_molecules(opt_r_orig_framnet_names, opt_r_fragment_names):
self.__add_barrierless_reaction(opt_name_reactant, opt_r_orig_fragment_graphs, opt_r_graph,
opt_reactant_structure_ids, opt_r_orig_framnet_names)
if ";" in opt_p_graph and not masm.JsonSerialization.equal_molecules(opt_p_graph, opt_r_graph):
opt_p_fragment_names, opt_p_frgagment_graphs, _ = self.__optimize_and_get_graphs_and_energies(
"opt_p_fragments",
opt_p_fragments,
opt_p_charges,
opt_p_multiplicities,
settings_manager)
opt_structure_ids = self.__add_barrierless_reaction(opt_name_product, opt_p_frgagment_graphs, opt_p_graph,
None, opt_p_fragment_names)
if ";" in p_graph and not masm.JsonSerialization.equal_molecules(p_graph, opt_p_graph):
opt_p_orig_framnet_names, opt_p_orig_fragment_graphs, _ = self.__optimize_and_get_graphs_and_energies(
"opt_p_orig_fragments", p_fragments, p_charges, p_multi, settings_manager
)
if not self.__same_molecules(opt_p_orig_framnet_names, opt_p_fragment_names):
self.__add_barrierless_reaction(opt_name_product, opt_p_orig_fragment_graphs, opt_p_graph,
opt_structure_ids, opt_p_orig_framnet_names)
def __prepare_structures(self, settings_manager, reactant_structure, products_structure):
"""
Optimize the input structures + generate graphs.
"""
# optimize spline ends
""" Reactant """
opt_name_reactant, self.systems = self.optimize_structures("opt_reactant", self.systems,
[reactant_structure.get_atoms()],
[reactant_structure.get_charge()],
[reactant_structure.get_multiplicity()],
deepcopy(
settings_manager.calculator_settings.as_dict()))
if len(opt_name_reactant) != 1:
self.raise_named_exception("The optimization of the reactant structure failed.")
raise RuntimeError("Unreachable")
lowest_r_name, _ = self._get_propensity_names_within_range(
opt_name_reactant[0], self.systems, self.settings[self.propensity_key]["energy_range_to_optimize"]
)
if lowest_r_name is None:
self.raise_named_exception("No reactant optimization was successful.")
raise RuntimeError("Unreachable")
if lowest_r_name != opt_name_reactant[0]:
sys.stderr.write(f"Warning: Detected a lower energy spin multiplicity of "
f"{self.get_multiplicity(self.get_system(lowest_r_name))} for the reactant.")
opt_name_reactant = [lowest_r_name]
""" Product """
opt_name_product, self.systems = self.optimize_structures("opt_product", self.systems,
[products_structure.get_atoms()],
[products_structure.get_charge()],
[products_structure.get_multiplicity()],
deepcopy(
settings_manager.calculator_settings.as_dict()))
if len(opt_name_product) != 1:
self.raise_named_exception("The optimization of the product structure failed.")
raise RuntimeError("Unreachable")
lowest_p_name, _ = self._get_propensity_names_within_range(
opt_name_product[0], self.systems, self.settings[self.propensity_key]["energy_range_to_optimize"]
)
if lowest_p_name is None:
self.raise_named_exception("No product optimization was successful.")
raise RuntimeError("Unreachable")
if lowest_p_name != opt_name_product[0]:
sys.stderr.write(f"Warning: Detected a lower energy spin multiplicity of "
f"{self.get_multiplicity(self.get_system(lowest_p_name))} for the product.")
opt_name_product = [lowest_p_name]
if self.get_multiplicity(self.get_system(opt_name_reactant[0])) != \
self.get_multiplicity(self.get_system(opt_name_product[0])):
self.raise_named_exception("The optimized reactant and product have different spin multiplicities.")
raise RuntimeError("Unreachable")
_, opt_r_graph, opt_r_charges, opt_r_multiplicities, opt_r_decision_list = \
self.get_graph_charges_multiplicities(opt_name_reactant[0], products_structure.get_charge())
_, opt_p_graph, _, _, _ = \
self.get_graph_charges_multiplicities(opt_name_product[0], products_structure.get_charge())
self.start_charges = opt_r_charges
self.start_multiplicities = opt_r_multiplicities
self.start_graph = opt_r_graph
self.start_decision_lists = opt_r_decision_list
self.rc_opt_system_name = opt_name_reactant[0]
return opt_name_reactant[0], opt_name_product[0], opt_r_graph, opt_p_graph
def __create_complex_or_minimum(self, graph: str, calculator_name: str):
"""
Create a structure with the correct label according to its Molassembler serialization (graph).
"""
label = db.Label.MINIMUM_OPTIMIZED if ";" not in graph else db.Label.COMPLEX_OPTIMIZED
new_structure = self.create_new_structure(self.systems[calculator_name], label)
if self.ref_structure is None:
self.raise_named_exception("The reference structure is not set.")
raise RuntimeError("Unreachable") # for mypy
self.transfer_properties(self.ref_structure, new_structure)
bond_orders, self.systems = self.make_bond_orders_from_calc(self.systems, calculator_name)
self.store_energy(self.get_system(calculator_name), new_structure)
self.store_bond_orders(bond_orders, new_structure)
self.store_property(
self._properties,
"atomic_charges",
"VectorProperty",
self.get_system(calculator_name).get_results().atomic_charges,
self._calculation.get_model(),
self._calculation,
new_structure,
)
self.add_graph(new_structure, bond_orders)
results = self._calculation.get_results()
results.add_structure(new_structure.id())
self._calculation.set_results(self._calculation.get_results() + results)
return new_structure
def __optimize_and_get_graphs_and_energies(self, fragment_base_name: str, fragments: List[utils.AtomCollection],
charges: List[int], multiplicities: List[int],
settings_manager: SettingsManager):
"""
Optimize molecular fragments and return their names, graphs, and energies.
"""
opt_fragment_names, self.systems = (
self.optimize_structures(fragment_base_name, self.systems, fragments,
charges, multiplicities,
deepcopy(settings_manager.calculator_settings.as_dict()))
)
opt_f_graphs = []
fragment_energies = []
for name, charge in zip(opt_fragment_names, charges):
_, opt_f_graph, _, _, _ = self.get_graph_charges_multiplicities(name, charge)
if ";" in opt_f_graph:
self.raise_named_exception(
"Fragments in flask keep dissociating in the job "
+ self.name
)
opt_f_graphs.append(opt_f_graph)
fragment_energies.append(self.get_system(name).get_results().energy)
return opt_fragment_names, opt_f_graphs, fragment_energies
def __add_barrierless_reaction(self, opt_name: str, opt_f_graphs: List[str], opt_graph: str,
opt_structure_ids: Optional[List[db.ID]], opt_fragment_names: List[str]):
"""
Add a barrier-less reaction to the database between the optimized structure and its fragments.
"""
self.__assert_conserved_atom(opt_fragment_names, [opt_name])
db_results = self._calculation.get_results()
db_results.clear()
fragment_structures = []
for name, graph in zip(opt_fragment_names, opt_f_graphs):
fragment_structures.append(self.__create_complex_or_minimum(graph, name))
if opt_structure_ids is None:
opt_structure_ids = [self.__create_complex_or_minimum(opt_graph, opt_name).id()]
new_step = db.ElementaryStep()
new_step.link(self._elementary_steps)
new_step.create([s.id() for s in fragment_structures], opt_structure_ids)
new_step.set_type(db.ElementaryStepType.BARRIERLESS)
db_results.add_elementary_step(new_step.id())
self._calculation.set_comment(self.name + ": Barrier-less reaction found.")
self._calculation.set_results(self._calculation.get_results() + db_results)
return opt_structure_ids
def __assert_conserved_atom(self, lhs_names: List[str], rhs_names: List[str]):
"""
Assert that the number of atoms did not change between the calculators.
"""
lhs_atoms = [self.get_system(name).structure for name in lhs_names]
rhs_atoms = [self.get_system(name).structure for name in rhs_names]
lhs_counts = self.__get_elements_in_atom_collections(lhs_atoms)
rhs_counts = self.__get_elements_in_atom_collections(rhs_atoms)
print("Atom counts lhs", lhs_counts)
print("Atom counts rhs", rhs_counts)
if lhs_counts != rhs_counts:
raise RuntimeError("Error: Non stoichiometric elementary step detected. The structures are likely wrong.")
@staticmethod
def __get_elements_in_atom_collections(atom_collections: List[utils.AtomCollection]) -> Dict[str, int]:
"""
Builds a dictionary containing the element symbols and the number of their occurrence in a given atom
collection.
"""
elements: List[str] = []
for atom_collection in atom_collections:
elements += [str(e) for e in atom_collection.elements]
return {e: elements.count(e) for e in elements}
def __same_molecules(self, names_one, names_two):
"""
Check if two molecules/calculators are the same according to charges, Molassembler (graphs), and multiplicities.
"""
import scine_molassembler as masm
graphs_one, charges_one, multies_one = self.__get_sorted_graphs_charges_multiplicities(names_one)
graphs_two, charges_two, multies_two = self.__get_sorted_graphs_charges_multiplicities(names_two)
total_graph_one = ";".join(graphs_one)
total_graph_two = ";".join(graphs_two)
return charges_one == charges_two and multies_one == multies_two and masm.JsonSerialization.equal_molecules(
total_graph_one, total_graph_two)
def __get_sorted_graphs_charges_multiplicities(self, names_one: List[str]):
"""
Get the sorted Molassembler serializations (graphs), charges, and multiplicites of the calculators corresponding
to the given names
"""
charges_one = []
multies_one = []
graphs_one = []
for name in names_one:
system = self.get_system(name)
charges_one.append(system.settings[utils.settings_names.molecular_charge])
multies_one.append(system.settings[utils.settings_names.spin_multiplicity])
graphs_one.append(self.make_graph_from_calc(self.systems, name)[0])
graphs, charges, multiplicities = (
list(start_val)
for start_val in zip(*sorted(zip(
graphs_one,
charges_one,
multies_one)))
)
return graphs, charges, multiplicities