#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__copyright__ = """ This code is licensed under the 3-clause BSD license.
Copyright ETH Zurich, Laboratory of Physical Chemistry, Reiher Group.
See LICENSE.txt for details.
"""
# Standard library imports
from typing import Iterator, List, Tuple, Union, Dict
import networkx as nx
import numpy as np
from itertools import islice
# Third party imports
import scine_database as db
# Local application imports
from ..utilities.energy_query_functions import get_energy_for_structure, get_barriers_for_elementary_step_by_type, \
get_elementery_step_with_min_ts_energy, rate_constant_from_barrier
from ..utilities.get_molecular_formula import get_molecular_formula_of_aggregate
[docs]class Pathfinder:
"""
A class to represent a list of reactions as a graph and query this graph for simple paths between two nodes.
In a simple path, every node part of the path is visited only once.
Attributes
----------
_calculations :: db.Collection
Collection of the calculations of the connected database.
_compounds :: db.Collection
Collection of the compounds of the connected database.
_flasks :: db.Collection
Collection of the flasks of the connected database.
_reactions :: db.Collection
Collection of the reactions of the connected database.
_elementary_steps :: db.Collection
Collection of the elementary steps of the connected database.
_structures :: db.Collection
Collection of the structures of the connected database.
_properties :: db.Collection
Collection of the properties of the connected database.
graph_handler
A class handling the construction of the graph. Can be adapted to one's needs.
_use_old_iterator :: bool
Bool to indicate if the old iterator shall be used querying for paths between a source - target pair.
_unique_iterator_memory :: Tuple[Tuple[List[str], float], Iterator]
Memory of iterator with the corresponding path and its length as well as the iterator.
start_compounds :: List[str]
A list containing the compounds which are present at the start.
start_compounds_set :: bool
Bool to indicate if start_compounds are set.
_pseudo_inf :: float
Float for edges with infinite weight.
compound_costs :: Dict[str, float]
A dictionary containing the cost of the compounds with the compounds as keys.
compound_costs_solved :: bool
Bool to indicate if all compounds have a compound cost.
"""
def __init__(self, db_manager: db.Manager):
self.options = self.Options()
self.manager = db_manager
# Get required collections
self._calculations = db_manager.get_collection('calculations')
self._compounds = db_manager.get_collection('compounds')
self._flasks = db_manager.get_collection("flasks")
self._reactions = db_manager.get_collection('reactions')
self._elementary_steps = db_manager.get_collection('elementary_steps')
self._structures = db_manager.get_collection('structures')
self._properties = db_manager.get_collection('properties')
self.graph_handler: Union[Pathfinder.BasicHandler, Pathfinder.BarrierBasedHandler, None] = None
# attribute to store iterator employed in find_unique_paths; path_object, iterator
self._use_old_iterator = False
self._unique_iterator_memory: Union[Tuple[Tuple[List[str], float],
Iterator], None] = None
# Compound costs
self.start_compounds: List[str] = []
self.start_compounds_set = False
self._pseudo_inf = 1e12
self.compound_costs: Dict[str, float] = {}
self.compound_costs_solved = False
[docs] class Options:
"""
A class to vary the setup of Pathfinder.
"""
__slots__ = {"graph_handler", "barrierless_weight", "model", "use_energy_threshold", "energy_threshold"}
def __init__(self):
self.graph_handler: str = "basic" # pylint: disable=no-member
"""
A string indicating which graph handler to be used.
"""
self.barrierless_weight: float = 1.0 # 0.01
"""
The weight for barrierless reactions (basic) and rate constant (barrier), respectively.
"""
self.model: Union[None, db.Model] = None
"""
The model for the compounds to be included.
"""
# in kJ / mol
self.use_energy_threshold: bool = False
self.energy_threshold: float = 100.0
[docs] @staticmethod
def get_valid_graph_handler_options() -> List[str]:
return ["basic", "barrier"]
def _construct_graph_handler(self):
"""
Constructor for the graph handler.
Transfers pathfinder.options to graph handler.
Raises
------
RuntimeError
Invalid options for graph handler.
"""
if not self.graph_handler:
if self.options.graph_handler not in self.get_valid_graph_handler_options():
raise RuntimeError("Invalid graph handler option.")
if self.options.graph_handler == "basic":
self.graph_handler = self.BasicHandler(self.manager)
self.graph_handler.barrierless_weight = self.options.barrierless_weight
self.graph_handler.model = self.options.model
elif self.options.graph_handler == "barrier":
self.graph_handler = self.BarrierBasedHandler(self.manager, self.options.model)
self.graph_handler.barrierless_weight = self.options.barrierless_weight
self.graph_handler.model = self.options.model
self.graph_handler._map_elementary_steps_to_reactions()
self.graph_handler._calculate_rate_constant_normalization()
def _reset_iterator_memory(self):
"""
Reset memory for unique memory.
"""
self._unique_iterator_memory = None
# Build graph function from reaction list
[docs] def build_graph(self):
"""
Build the nx.DiGraph() from a list of filtered reactions.
"""
self._reset_iterator_memory()
self._construct_graph_handler()
assert self.graph_handler
for rxn_id in self.graph_handler.get_valid_reaction_ids():
rxn = db.Reaction(rxn_id, self._reactions)
self.graph_handler.add_reaction(rxn)
[docs] def find_paths(self, source: str, target: str, n_requested_paths: int = 3,
n_skipped_paths: int = 0) -> List[Tuple[List[str], float]]:
"""
Query the build graph for simple paths between a source and target node.
Notes
-----
Requires a built graph
Parameters
----------
source : str
The ID of the starting compound as string.
target : str
The ID of the targeted compound as string.
n_requested_paths : int
Number of requested paths, by default 3
n_skipped_paths : int
Number of skipped paths from, by default 0. For example, when four paths are found (``n_requested_paths=4``)
and ``n_skipped_paths=2``, the third, fourth, fifth and sixth path are returned. Therefore, this allows to
set the starting point of the query.
Returns
-------
found_paths :: List[Tuple[List[str] float]]
List of paths where each item (path) consists of the list of nodes of the path and its length.
"""
assert self.graph_handler
found_paths = []
for path in self._k_shortest_paths(self.graph_handler.graph, source, target, n_requested_paths,
weight="weight", path_start=n_skipped_paths):
path_length = nx.path_weight(self.graph_handler.graph, path, "weight")
found_paths.append((path, path_length))
return found_paths
[docs] def find_unique_paths(self, source: str, target: str, number: int = 3) -> List[Tuple[List[str], float]]:
"""
Find a unique number of paths from a given source node to a given target node.
Paths can have the same total length (in terms of sum over edge weights),
but if one is solely interested in one path of paths with identical length,
the shortest (in terms of length) longest (in terms of number of nodes) path is returned.
This is called the unique path (shortest longest path).
Notes
-----
| - Checks if a stored iterator for the given source-target pair should be used.
| - Maximal ten paths with identical length are compared.
Parameters
----------
source : str
The ID of the starting compound as string.
target : str
The ID of the targeted compound as string.
number : int
The number of unique paths to be returned. Per default, 3 paths are returned.
Returns
-------
path_tuple_list :: List[Tuple[List[str], float]]
List of paths where each item (path) consists the list of nodes of the path and its length.
"""
assert self.graph_handler
counter = 0
path_tuple_list = list()
# # # Initialise iterator over shortest simple paths if it is either not set or source/target do not match
if not self._use_old_iterator or \
self._unique_iterator_memory is None or \
self._unique_iterator_memory[0][0][0] != source or \
self._unique_iterator_memory[0][0][-1] != target:
path_iterator = iter(nx.shortest_simple_paths(self.graph_handler.graph, source, target, weight="weight"))
# # # Find first path and its cost
old_path = next(path_iterator)
old_path_cost = nx.path_weight(self.graph_handler.graph, old_path, weight="weight")
# # # Load old iterator
else:
path_iterator = self._unique_iterator_memory[1]
old_path = self._unique_iterator_memory[0][0]
old_path_cost = self._unique_iterator_memory[0][1]
while counter < number:
same_cost = True
tmp_path_list: List[List[str]] = list()
# # # Collect all paths with same cost
n_max_collected_paths = 10
while same_cost and len(tmp_path_list) < n_max_collected_paths:
# # # Append old path to tmp_path list
tmp_path_list.append(old_path)
# # # Get next path and its cost
new_path = next(path_iterator, None)
# # # Break loop if no path is returned
if new_path is None:
break
new_path_cost = nx.path_weight(self.graph_handler.graph, new_path, weight="weight")
# # # Check if new cost different to old cost
if abs(old_path_cost - new_path_cost) > 1e-12:
same_cost = False
# # # Overwrite old path with new path
old_path = new_path
# # # Append path with most nodes to tuple list and its cost
path_tuple_list.append((max(tmp_path_list, key=lambda x: len(x)), # pylint: disable=unnecessary-lambda
old_path_cost))
# # # Break counter loop if no more paths to target are found
if new_path is None:
break
old_path_cost = new_path_cost
counter += 1
# # # Store iterator and path info (list of nodes and length)
if new_path is not None:
self._unique_iterator_memory = ((new_path, new_path_cost), path_iterator)
return path_tuple_list
[docs] def get_elementary_step_sequence(self, path: List[str]) -> str:
"""
Prints the sequence of elementary steps of a path with the compounds written as molecular formulas
with multiplicity and charge as well as the final cost of the path.
Reactant node is printed in red, product node in blue to enhance readability.
Parameters
----------
path :: Tuple[List[str] float]
Path containing a list of the traversed nodes and the cost of this path.
"""
sequence_string = ""
assert self.graph_handler
# # # Loop over elementary steps by dissecting path
for i in np.arange(0, len(path) - 2, 2):
step = path[i:i + 3]
# # # Count Reactants
reactants = [step[0]]
reactants += self.graph_handler.graph.edges[step[0], step[1]]['required_compounds']
# # # Count Products
products = [step[2]]
products += self.graph_handler.graph.edges[step[1], step[2]]['required_compounds']
rxn_eq = ""
for i, side in enumerate([reactants, products]):
for j, aggregate_id in enumerate(side):
# # # Identify Compound or Flask
if self.graph_handler.graph.nodes[aggregate_id]['type'] == db.CompoundOrFlask.COMPOUND.name:
aggregate_type = db.CompoundOrFlask.COMPOUND
elif self.graph_handler.graph.nodes[aggregate_id]['type'] == db.CompoundOrFlask.FLASK.name:
aggregate_type = db.CompoundOrFlask.FLASK
aggregate_str = get_molecular_formula_of_aggregate(
db.ID(aggregate_id), aggregate_type, self._compounds, self._flasks, self._structures)
# # # Color reactant node
if j == 0 and i == 0:
aggregate_str = '\033[31m' + aggregate_str + '\033[0m'
# # # Color product node
elif j == 0 and i == 1:
aggregate_str = '\033[36m' + aggregate_str + '\033[0m'
rxn_eq += aggregate_str
if j < len(side) - 1:
rxn_eq += " + "
if i == 0:
rxn_eq += " -> "
sequence_string += rxn_eq + "\n"
return sequence_string
@staticmethod
def _k_shortest_paths(graph: nx.DiGraph, source: str, target: str, n_paths: int, weight: Union[str, None] = None,
path_start: int = 0) -> List[List[str]]:
"""
Finding k shortest paths between a source and target node in a given graph.
The length of the returned paths increases.
Notes
-----
* This procedure is based on the algorithm by Jin Y. Yen [1]. Finding the first k paths requires O(k*nodes^3)
operations.
* Implemented as given in the documentation of NetworkX:
https://networkx.org/documentation/stable/reference/algorithms/generated/ \
networkx.algorithms.simple_paths.shortest_simple_paths.html
* [1]: Jin Y. Yen, “Finding the K Shortest Loopless Paths in a Network”, Management Science, Vol. 17, No. 11,
Theory Series (Jul., 1971), pp. 712-716.
Parameters
----------
graph :: nx.DiGraph
The graph to be queried.
source :: str
The ID of the starting compound as string.
target :: str
The ID of the targeted compound as string.
n_paths :: int
The number of paths to be returned.
weight :: Union[str, None], optional
The key for the weight encoded in the edges to be used.
path_start :: int, optional
An index of the first returned path, by default 0
Returns
-------
List[List[str]]
List of paths, each path consisting of a list of nodes.
"""
return list(
islice(nx.shortest_simple_paths(graph, source, target, weight=weight), path_start, path_start + n_paths)
)
[docs] def set_start_conditions(self, conditions: Dict[str, float]):
"""
Add the IDs of the start compounds to self.start_compounds and add entries for cost in self.compound_cost.
Parameters
----------
conditions :: Dict[str float]
The IDs of the compounds as keys and its given cost as values.
"""
# # # Reset Start conditions, if already set previously
if self.start_compounds_set:
self.compound_costs = {}
self.start_compounds = []
# # # Loop over conditions and set them
for cmp_id, cmp_cost in conditions.items():
self.compound_costs[cmp_id] = cmp_cost
self.start_compounds.append(cmp_id)
self.start_compounds_set = True
def __weight_func(self, u: str, _: str, d: Dict):
"""
Calculates the weight of the edge d connecting u with _ (directional!).
Only consider the costs of the required compounds if the edge d is from a compound node to a reaction node.
If the edge d connects a reaction node with a compound node, the returned weight should be 0.
Parameters
----------
u :: str
The ID of start node.
_ :: str
The ID of end node.
d :: Dict
The edge connecting u and _ as dictionary
Returns
-------
float
Sum over the edge weight and the costs of the required compounds.
"""
# # # Weight of edge
edge_wt = d.get("weight", 0)
# # # List of required compounds
tmp_required_compounds = d.get("required_compounds", None)
# # # Sum over costs of required compounds.
# # # Only for edges from compound node to rxn node
if ';' not in u and tmp_required_compounds is not None:
required_compound_costs = np.sum([self.compound_costs[n] for n in tmp_required_compounds])
else:
required_compound_costs = 0.0
return edge_wt + required_compound_costs
[docs] def calculate_compound_costs(self, recursive: bool = True):
"""
Determine the cost for all compounds via determining their shortest paths from the ``start_compounds``.
If this succeeds, set ``compound_costs_solved`` to ``True``. Otherwise it stays ``False``.
The algorithm works as follows:
Given the starting conditions, one loops over the individual starting compounds as long as:\n
- the ``_pseudo_inf`` entries in ``compound_costs`` are reduced
- for any entry in ``compounds_cost`` a lower cost is found
With each starting compound, one loops over compounds which have yet no cost assigned.
For each start - target compound pair, the shortest path is determined employing Dijkstra's algorithm.
The weight function checks the ``weight`` of the edges as well as the costs of the required compounds listed in
the ``required_compounds`` of the traversed edges.
If the path exceeds the length of self._pseudo_inf, this path is not considered for further evaluation.
The weight of the starting compound is added to the tmp_cost.
If the target compound has no weight assigned yet in ``compound_costs`` OR
if the target compound has a weight assigned which is larger (in ``compound_costs`` as well as in
``tmp_compound_costs``) than the current ``tmp_cost`` is written to the temporary storage of
``tmp_compound_costs``.
After the loop over all starting compounds completes, the collected costs for the found targets are written to
``compound_costs``.
The convergence variables are updated and the while loop continues.
Notes
-----
* Checks if the start compounds are set.
* Checks if the graph contains any nodes.
Parameters
----------
recursive :: bool
All compounds are checked for shorter paths, True by default.
If set to False, compounds for which a cost has been determined are not checked in the next loop.
"""
assert self.graph_handler
if not self.start_compounds_set:
raise RuntimeError("No start conditions given")
if len(self.graph_handler.graph.nodes) == 0:
raise RuntimeError("No nodes in graph")
cmps_to_check = [n for n in self.graph_handler.graph.nodes if ';' not in n
and n not in self.start_compounds]
# # # Set all compounds to be checked to pseudo inf cost
for cmp_id in cmps_to_check:
self.compound_costs[cmp_id] = self._pseudo_inf
# # # Dictionary for current run
tmp_compound_costs: Dict[str, float] = {}
# # # Determine convergence variables for while loop
current_inf_count = sum(value == self._pseudo_inf for value in self.compound_costs.values())
prev_inf_count = current_inf_count
# # # Convergence criteria
compound_costs_change = None
compound_costs_opt_change = 1
converged = False
# # # Find paths until either no costs are unknown or the None count has not changed
while (not converged):
compound_costs_opt_change = 0
print("Remaining Nodes:", len(cmps_to_check))
for tmp_start_node in self.start_compounds:
# # # Loop over all nodes to be checked starting from start nodes
for target in cmps_to_check:
# # # Determine cost and path with dijkstra's algorithm
tmp_cost, _ = nx.single_source_dijkstra(self.graph_handler.graph, tmp_start_node, target,
cutoff=None, weight=self.__weight_func)
# # # Check if the obtained cost is larger than infinity (pseudo_inf)
# # # and continue with the next target if this is the case
if (tmp_cost - self._pseudo_inf) > 0.0:
continue
# # # Add cost of the starting node
tmp_cost += self.compound_costs[tmp_start_node]
# # # Check for value in compound_costs dict and
if (self.compound_costs[target] != self._pseudo_inf and
10e-6 < self.compound_costs[target] - tmp_cost):
compound_costs_opt_change += 1
# # # Not already set check
if self.compound_costs[target] == self._pseudo_inf or (
self.compound_costs != self._pseudo_inf and 10e-6 < self.compound_costs[target] - tmp_cost):
# # # Not discovered in current run OR new tmp cost lower than stored cost
if (target not in tmp_compound_costs.keys()):
tmp_compound_costs[target] = tmp_cost
elif (tmp_cost < tmp_compound_costs[target]):
tmp_compound_costs[target] = tmp_cost
# # # Write obtained compound_costs to overall dictionary
for key, value in tmp_compound_costs.items():
self.compound_costs[key] = value
# # # Remove found nodes if recursive is false
if not recursive:
cmps_to_check.remove(key)
# # # Reset tmp_pr_cost
tmp_compound_costs = {}
# # # Convergence Check
current_inf_count = sum(value == self._pseudo_inf for value in self.compound_costs.values())
compound_costs_change = prev_inf_count - current_inf_count
prev_inf_count = current_inf_count
print(50 * '-')
print("Current None:", current_inf_count)
print("PR Change:", compound_costs_change)
print("PR Opt Change:", compound_costs_opt_change)
# # # Convergence Check
if (compound_costs_change == 0 and compound_costs_opt_change == 0):
converged = True
# # # Final check if all compound costs are solved
if current_inf_count == 0:
self.compound_costs_solved = True
[docs] def update_graph_compound_costs(self):
"""
Update the 'weight' of edges from compound to reaction nodes by adding the compound costs.
The compound costs are the sum over the costs stored in self.compound_costs of the required compounds.
The edges of the resulting graph contain a weight including the required_compound_costs based on the starting
conditions.
All analysis of the graph therefore depend on the starting conditions.
Notes
-----
* Checks if the compound costs have successfully been determined.
"""
# # # Check if all costs are available
if not self.compound_costs_solved:
unsolved_cmp = [key for key, _ in self.compound_costs.items()]
raise RuntimeError("The following cmp have no cost assigned:\n" + str(unsolved_cmp) +
"\nReconsider the starting conditions.")
# # # Reset unique_iterator_list as graph changes
self._reset_iterator_memory()
for node in self.compound_costs.keys():
# # # Loop over all edges of compound and manipulate weight
for target_node, attributes in self.graph_handler.graph[node].items():
required_compound_costs = np.asarray([self.compound_costs[k] for k in attributes['required_compounds']])
tot_required_compound_costs = np.sum(required_compound_costs)
# # # Set required compound costs in edge
self.graph_handler.graph.edges[node,
target_node]['required_compound_costs'] = tot_required_compound_costs
# # # Add required compound costs to weight
self.graph_handler.graph.edges[node, target_node]['weight'] += tot_required_compound_costs
# Base Class for adding a reaction; up weight to rxn node is just one per default, no further info
[docs] class BasicHandler:
"""
A basic class to handle the construction of the nx.DiGraph.
A list of reactions can be added differently, depending on the implementation of ``_get_weight`` and
``get_valid_reaction_ids``.
Attributes
----------
graph :: nx.DiGraph
The directed graph.
db_manager :: db.Manager
The database manager for the connected database.
barrierless_weight :: float
The weight to be set for barrierless reactions.
model :: Union[None, db.Model]
A model for filtering the valid reactions.
Per default None, reactions are included regardless of the model.
"""
def __init__(self, manager: db.Manager, model: db.Model = None):
self.graph = nx.DiGraph()
self.db_manager = manager
self.barrierless_weight = 1.0
self.model: Union[None, db.Model] = model
# Collections
self._compounds = self.db_manager.get_collection("compounds")
self._flasks = self.db_manager.get_collection("flasks")
self._structures = self.db_manager.get_collection("structures")
self._properties = self.db_manager.get_collection("properties")
self._reactions = self.db_manager.get_collection("reactions")
self._elementary_steps = self.db_manager.get_collection("elementary_steps")
[docs] def add_reaction(self, reaction: db.Reaction):
"""
Add a reaction to the graph.
Each reaction node represents the LHS and RHS.
Hence every reagent of a reaction is connected to every product of a reaction via one reaction node.
For instance:\n
| A + B = C + D, reaction R\n
| A -> R\\ :sub:`1` -> C
| A -> R\\ :sub:`1` -> D
| B -> R\\ :sub:`1` -> C
| B -> R\\ :sub:`1` -> D
|
| C -> R\\ :sub:`2` -> A
| C -> R\\ :sub:`2` -> B
| D -> R\\ :sub:`2` -> A
| D -> R\\ :sub:`2` -> B
Representing this reaction in the graph yields 4 compound nodes,
2 reaction nodes (same reaction) and 16 edges (2*2*2*2).
The weights assigned to the edges depends on the ``_get_weight`` implementation.
The edges from a compound node to a reaction node contain several information:
:weight: the weight of the edge
1.0 if the reaction is not barrierless, otherwise it is set to ``barrierless_weight``
:required_compounds: the IDs of the other reagents of this side of the reaction in a list
:required_compound_costs: the sum over all compound costs of the compounds in the ``required_compounds``
list.
None by default.
The edges from a reaction node to a compound node contain several information:
:weight: the weight of the edge, set to 0.0
:required_compounds: the IDs of the other products emerging;
added for easier information extraction during the path analysis
Parameters
----------
reaction :: db.Reaction
The reaction to be added to the graph.
"""
# Add two rxn nodes
rxn_nodes = []
reaction_id = reaction.id().string()
for i in range(0, 2):
# Add rxn node between lhs and rhs compound
rxn_node = ';'.join([reaction_id, str(i)])
rxn_node += ';'
self.graph.add_node(rxn_node, type='rxn_node')
rxn_nodes.append(rxn_node)
# Convert to strings
reactants = reaction.get_reactants(db.Side.BOTH)
reactant_types = reaction.get_reactant_types(db.Side.BOTH)
weights = self._get_weight(reaction)
# Add lhs aggregates and connect
for lhs_cmp, lhs_type in zip([i.string() for i in reactants[0]],
[i.name for i in reactant_types[0]]):
if lhs_cmp not in self.graph:
self.graph.add_node(lhs_cmp, type=lhs_type)
required_cmps_lhs = [s.string() for s in reactants[0]]
required_cmps_lhs.remove(lhs_cmp)
self.graph.add_edge(lhs_cmp, rxn_nodes[0], weight=weights[0], required_compounds=required_cmps_lhs,
required_compound_costs=None)
self.graph.add_edge(rxn_nodes[1], lhs_cmp, weight=0.0, required_compounds=None)
# Add rhs aggregates and connect
for rhs_cmp, rhs_type in zip([i.string() for i in reactants[1]],
[i.name for i in reactant_types[1]]):
if rhs_cmp not in self.graph:
self.graph.add_node(rhs_cmp, type=rhs_type)
required_cmps_rhs = [s.string() for s in reactants[1]]
required_cmps_rhs.remove(rhs_cmp)
self.graph.add_edge(rhs_cmp, rxn_nodes[1], weight=weights[1], required_compounds=required_cmps_rhs,
required_compound_costs=None)
self.graph.add_edge(rxn_nodes[0], rhs_cmp, weight=0.0, required_compounds=None)
# # # Loop over reaction nodes to add required compounds info to downwards edges; might be unnecessary
node_index = 1
for node in rxn_nodes:
for key in self.graph[node].keys():
self.graph.edges[node, key]['required_compounds'] = \
self.graph.edges[key, rxn_nodes[node_index]]['required_compounds']
node_index -= 1
def _get_weight(self, reaction: db.Reaction) -> Tuple[float, float]:
"""
Determining the weights for the edges of the given reaction.
Parameters
----------
reaction :: db.Reaction
Reaction of interest
Returns
-------
Tuple[float, float]
Weight for connections to the LHS reaction node, weight for connections to the RHS reaction node
"""
for step in reaction.get_elementary_steps(self.db_manager):
# # # Barrierless weights for barrierless reactions
if step.get_type() == db.ElementaryStepType.BARRIERLESS:
return self.barrierless_weight, self.barrierless_weight
return 1.0, 1.0
[docs] def get_valid_reaction_ids(self) -> List[db.ID]:
"""
Basic filter function for reactions.
Per default it returns all reactions.
Returns
-------
List[db.ID]
List of IDs of the filtered reactions.
"""
valid_ids: List[db.ID] = list()
for reaction in self._reactions.iterate_all_reactions():
if self._valid_reaction(reaction):
valid_ids.append(reaction.id())
return valid_ids
def _valid_reaction(self, reaction: db.Reaction) -> bool:
"""
Checks if a given reaction is valid.
A reaction is considered valid if at least one elementary step of the reaction has an electronic energy
assigned to its first structure, if required calculated with the set db.Model.
Parameters
----------
reaction :: db.Reaction
_description_
Returns
-------
bool
Bool indicating if the reaction is valid.
"""
reaction.link(self._reactions)
for es_id in reaction.get_elementary_steps():
es = db.ElementaryStep(es_id, self._elementary_steps)
first_structure_lhs = db.Structure(es.get_reactants()[0][0])
# # # Model Check
# Check if model is not specified (None)
if self.model is None:
first_structure_lhs.link(self._structures)
if len(first_structure_lhs.get_properties("electronic_energy")) > 0:
return True
# Check, if energy of this structure with specified model exists
else:
first_structure_lhs_e = get_energy_for_structure(
first_structure_lhs, "electronic_energy", self.model, self._structures, self._properties)
if first_structure_lhs_e is not None:
return True
# If all elementary steps of this reaction fail the checks, this reaction is not valid
return False
[docs] class BarrierBasedHandler(BasicHandler):
"""
A class derived from the basic graph handler class to encode the reaction barrier information in the edges.
The barriers of the elementary step with the minimal TS energy of a reaction are employed.
The barriers are converted to rate constants, normalized over all rate constants in the graph and then the cost
function :math:`|log(normalized\\ rate\\ constant)|` is applied to obtain the weight.
Attributes
----------
temperature :: float
The temperature for calculating the rate constants from the barriers. Default is 298.15 K.
_rate_constant_normalization :: float
The factor to normalize the rate constant.
_rxn_to_es_map :: Dict[str, db.ID]
A dictionary holding the ID of the elementary step with the lowest TS energy for each reaction.
"""
def __init__(self, db_manager: db.Manager, model: db.Model):
super().__init__(db_manager, model)
self.temperature = 298.15
self._rate_constant_normalization = 1.0
self._rxn_to_es_map: Dict[str, db.ID] = {}
[docs] def set_temperature(self, temperature: float):
"""
Setting the temperature for determining the rate constants.
Parameters
----------
temperature :: float
The temperature in Kelvin.
"""
self.temperature = temperature
[docs] def get_temperature(self):
"""
Gets the set temperature.
Returns
-------
self.temperature :: float
The set temperature.
"""
return self.temperature
def _map_elementary_steps_to_reactions(self):
"""
Loop over all reactions to get the elementary step with the lowest TS energy for each reaction.
The corresponding db.ID of the elementary step is written to the _rxn_to_es_map.
"""
for reaction_id in self.get_valid_reaction_ids():
reaction = db.Reaction(reaction_id, self._reactions)
# # # Go for gibbs energy first
es_id = get_elementery_step_with_min_ts_energy(
reaction,
"gibbs_free_energy",
self.model,
self._elementary_steps,
self._structures,
self._properties)
# # # If unsuccessful, attempt electronic energy
if es_id is None:
es_id = get_elementery_step_with_min_ts_energy(
reaction,
"electronic_energy",
self.model,
self._elementary_steps,
self._structures,
self._properties)
# # # Write elementary step to map
self._rxn_to_es_map[reaction_id.string()] = es_id
def _calculate_rate_constant_normalization(self):
"""
Determine the rate constant normalization factor for calculating the edge weights.
Loops over the _rxn_to_es_map, converts every barrier to the rate constant and adds it to the sum of rate
constants.
For barrierless elementary steps, twice the barrierless_weight is added.
The rate constant normalization is then the inverse of the final sum.
"""
k_sum = 0.0
for es_id in self._rxn_to_es_map.values():
es = db.ElementaryStep(es_id, self._elementary_steps)
if es.get_type() == db.ElementaryStepType.BARRIERLESS:
k_sum += self.barrierless_weight * 2
else:
# # # Retrieve barriers of elementary step in kJ/mol
barriers = get_barriers_for_elementary_step_by_type(
es, "gibbs_free_energy", self.model, self._structures, self._properties)
if None in barriers:
barriers = get_barriers_for_elementary_step_by_type(
es, "electronic_energy", self.model, self._structures, self._properties)
k_lhs = rate_constant_from_barrier(barriers[0], self.temperature)
k_rhs = rate_constant_from_barrier(barriers[1], self.temperature)
k_sum += k_lhs + k_rhs
self._rate_constant_normalization = 1 / k_sum
def _get_weight(self, reaction: db.Reaction) -> Tuple[float, float]:
"""
Determines the weight for a given reaction.
The weight is calculated by determining the rate constant from the barrier, normalizing the constant with
the rate_constant_normalization and taking the abs(log()) of the corresponding product.
For barrierless reactions, the barrierless_weight is taken as rate constants.
Parameters
----------
reaction :: db.Reaction
The reaction for which the weights should be determined.
Returns
-------
weights :: Tuple(float, float)
The weight for the LHS -> RxnNode and for the RHS -> RxnNode.
"""
# # # Look up elementary step id for reaction
es_id = self._rxn_to_es_map[reaction.id().string()]
es_step = db.ElementaryStep(es_id, self._elementary_steps)
# # # Barrierless weights for barrierless reactions
if es_step.get_type() == db.ElementaryStepType.BARRIERLESS:
k_lhs = self.barrierless_weight
k_rhs = self.barrierless_weight
else:
assert self.model
# # # Retrieve barriers of elementary step in kJ/mol
barriers = get_barriers_for_elementary_step_by_type(
es_step, "gibbs_free_energy", self.model, self._structures, self._properties)
if None in barriers:
barriers = get_barriers_for_elementary_step_by_type(
es_step, "electronic_energy", self.model, self._structures, self._properties)
assert barriers[0]
k_lhs = rate_constant_from_barrier(barriers[0], self.temperature)
k_rhs = rate_constant_from_barrier(barriers[1], self.temperature)
return abs(np.log(k_lhs * self._rate_constant_normalization)), \
abs(np.log(k_rhs * self._rate_constant_normalization))