Source code for scine_puffin.jobs.kinetx_kinetic_modeling

# -*- coding: utf-8 -*-
from __future__ import annotations
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Department of Chemistry and Applied Biosciences, Reiher Group.
See LICENSE.txt for details.
"""

from typing import Any, TYPE_CHECKING, List, Dict

from .templates.job import breakable, calculation_context, job_configuration_wrapper
from .templates.kinetic_modeling_jobs import KineticModelingJob
from ..utilities.compound_and_flask_helpers import get_compound_or_flask
from scine_puffin.config import Configuration
from scine_puffin.utilities.imports import module_exists, requires, MissingDependency

if module_exists("scine_database") or TYPE_CHECKING:
    import scine_database as db
else:
    db = MissingDependency("scine_database")
if module_exists("scine_utilities") or TYPE_CHECKING:
    import scine_utilities as utils
else:
    utils = MissingDependency("scine_utilities")


[docs]class KinetxKineticModeling(KineticModelingJob): """ A job that performs the kinetic modeling using KiNetX given a set of reactions and an electronic structure model. The reaction rates are calculated from transition state theory. The final concentration and the maximum concentration reached (for at least a given set of time steps) is added as properties to the centroids of each compound. As starting concentrations, the sum of all "start_concentration" properties of each structure for the given compounds is used. **Order Name** ``kinetx_kinetic_modeling`` **Required Input** model : db.Model The electronic structure model to flag the new properties with. **Required Settings** aggregate_ids : List[str] The aggregate IDs (as strings). reaction_ids : List[str] The reaction IDs (as strings). aggregate_types : List[int] The aggregate types. 0 for compounds, 1 for flasks. lhs_rates : List[float] The reaction rates for the forward reactions. rhs_rates : List[float] The reaction rates for the backward reactions. **Optional Settings** Optional settings are read from the ``settings`` field, which is part of any ``Calculation`` stored in a SCINE Database. The following options are available: time_step : float The integration time step. solver : str The name of the numerical differential equation solver. Options are "CashKarp5" (default), "ImplicitEuler", and "ExplicitEuler". batch_interval : int The numerical integration is done in batches of time steps. After each step the maximum concentration for each compound is updated. This is the size of each time-step batch. n_batches : int The numerical integration is done in batches of time steps. After each step the maximum concentration for each compound is updated. This is the number of time-step batches. energy_model_program : str The program with which the electronic structure model should be flagged. Default any. convergence : float Stop the numerical integration if the concentrations do not change more than this threshold between intervals. concentration_label_postfix : str Post fix to the property label. Default "". **Required Packages** - SCINE: Database (present by default) - SCINE: Utils (present by default) - SCINE: KiNetX **Generated Data** If successful (technically and chemically) the following data will be generated and added to the database: Properties The maximum and final concentration, and the vertex flux of each aggregate is added to its centroid. The edge flux, forward + backward flux for each reaction is added to the centroid of the first aggregate on the reaction's LHS. Note, that the properties are NOT listed in the results to avoid large DB documents. """ def __init__(self) -> None: super().__init__() self.name = "KiNetX kinetic modeling job" self.settings: Dict[str, Any] = { "energy_label": "electronic_energy", "time_step": 1e-8, "solver": "cash_karp_5", "batch_interval": 1000, "n_batches": 1000, "convergence": 1e-10, "energy_model_program": "any", "concentration_label_postfix": "" } self.model = db.Model("PM6", "PM6", "")
[docs] @job_configuration_wrapper def run(self, manager: db.Manager, calculation: db.Calculation, config: Configuration) -> bool: import scine_kinetx as kinetx with breakable(calculation_context(self)): self.settings.update(calculation.get_settings()) aggregate_id_list = [db.ID(c_id_str) for c_id_str in self._get_setting_value_list("aggregate_ids")] aggregate_type_list = [db.CompoundOrFlask(a_type) for a_type in self._get_setting_value_list("aggregate_types")] reaction_ids = [db.ID(r_id_str) for r_id_str in self._get_setting_value_list("reaction_ids")] lhs_rates_per_reaction = self._get_setting_value_list("lhs_rates") rhs_rates_per_reaction = self._get_setting_value_list("rhs_rates") concentrations = self._get_setting_value_list("start_concentrations") self.model = calculation.get_model() n_reactions = len(reaction_ids) n_aggregates = len(aggregate_id_list) if len(reaction_ids) != len(lhs_rates_per_reaction) or len(reaction_ids) != len(rhs_rates_per_reaction): raise AssertionError("The number of reaction rates differs from the number of reactions.") network_builder = kinetx.NetworkBuilder() # Prepare the data arrays / network network_builder.reserve(n_compounds=n_aggregates, n_reactions=n_reactions, n_channels_per_reaction=1) # Add compounds and reactions self._add_all_aggregates(aggregate_id_list, aggregate_type_list, network_builder) self._add_all_reactions(reaction_ids, network_builder, aggregate_id_list, lhs_rates_per_reaction, rhs_rates_per_reaction, aggregate_type_list) network = network_builder.generate() # Solve network time_step = self.settings["time_step"] solver = kinetx.get_integrator(self.settings["solver"]) batch_interval = self.settings["batch_interval"] n_batches = self.settings["n_batches"] convergence = self.settings["convergence"] if "max_time" in self.settings: concentration_data, reaction_flux, reaction_flux_forward, reaction_flux_backward = kinetx.integrate( network, concentrations, 0.0, time_step, solver, batch_interval, n_batches, convergence, True, self.settings["max_time"]) else: concentration_data, reaction_flux, reaction_flux_forward, reaction_flux_backward = kinetx.integrate( network, concentrations, 0.0, time_step, solver, batch_interval, n_batches, convergence) # Save the concentrations final_concentrations = concentration_data[:, 0] max_concentrations = concentration_data[:, 1] flux_concentrations = concentration_data[:, 2] results = calculation.get_results() self.model.program = self.settings["energy_model_program"] self._write_concentrations_to_centroids(aggregate_id_list, aggregate_type_list, reaction_ids, [max_concentrations, final_concentrations, flux_concentrations], [reaction_flux, reaction_flux_forward, reaction_flux_backward], [self.c_max_label, self.c_final_label, self.c_flux_label], [self.r_flux_label, self.r_forward_label, self.r_backward_label], results, self.settings["concentration_label_postfix"]) # calculation.set_results(results) self._disable_all_aggregates() self.complete_job() return self.postprocess_calculation_context()
def _get_setting_value_list(self, name: str) -> List[Any]: val = self.settings[name] if not isinstance(val, list): raise RuntimeError(f"Setting {name} must be a list.") return val def _add_all_aggregates(self, aggregate_id_list: List[db.ID], aggregate_type_list: List[db.CompoundOrFlask], network_builder) -> None: """ Add all aggregates to the kinetic model. """ for a_id, a_type in zip(aggregate_id_list, aggregate_type_list): aggregate = get_compound_or_flask(a_id, a_type, self._compounds, self._flasks) centroid = aggregate.get_centroid() mass = self._calculate_weight(centroid) network_builder.add_compound(mass, a_id.string()) @requires("utilities") def _calculate_weight(self, structure_id: db.ID) -> float: """ Calculates the molecular weight, given a DB structure id. Attributes ---------- structure_id : db.ID The structure of which to calculate the molecular weight. Returns ------- weight : float The molecular weight in a.u. . """ structure = db.Structure(structure_id, self._structures) atoms = structure.get_atoms() weight = 0.0 for e in atoms.elements: weight += utils.ElementInfo.mass(e) return weight def _add_all_reactions(self, reaction_ids, network_builder, aggregate_id_list, lhs_rates_per_reaction, rhs_rates_per_reaction, aggregate_type_list) -> None: """ Add all reactions to the kinetic model. """ for i, reaction_id in enumerate(reaction_ids): reaction = db.Reaction(reaction_id, self._reactions) lhs_rates = [lhs_rates_per_reaction[i]] rhs_rates = [rhs_rates_per_reaction[i]] lhs_rhs_compound_or_flask_ids = reaction.get_reactants(db.Side.BOTH) lhs_stoichiometry = [(aggregate_id_list.index(c_id), 1) for c_id in lhs_rhs_compound_or_flask_ids[0]] rhs_stoichiometry = [(aggregate_id_list.index(c_id), 1) for c_id in lhs_rhs_compound_or_flask_ids[1]] self.check_mass_balance(lhs_stoichiometry, rhs_stoichiometry, aggregate_id_list, aggregate_type_list) network_builder.add_reaction(lhs_rates, rhs_rates, lhs_stoichiometry, rhs_stoichiometry)
[docs] def check_mass_balance(self, lhs_stoichiometry, rhs_stoichiometry, aggregate_id_list, aggregate_type_list): lhs_mass = 0.0 for a_index, n in lhs_stoichiometry: a_id = aggregate_id_list[a_index] a_type = aggregate_type_list[a_index] aggregate = get_compound_or_flask(a_id, a_type, self._compounds, self._flasks) centroid = aggregate.get_centroid() lhs_mass += n * self._calculate_weight(centroid) rhs_mass = 0.0 for a_index, n in rhs_stoichiometry: a_id = aggregate_id_list[a_index] a_type = aggregate_type_list[a_index] aggregate = get_compound_or_flask(a_id, a_type, self._compounds, self._flasks) centroid = aggregate.get_centroid() rhs_mass += n * self._calculate_weight(centroid) if abs(rhs_mass - lhs_mass) > 1e-6: raise AssertionError("Unbalanced masses in reaction. You are destroying/creating atoms!")
[docs] @staticmethod def required_programs() -> List[str]: return ["database", "kinetx", "utils"]