Source code for pytfa.thermo.tmodel

# -*- coding: utf-8 -*-
"""
.. module:: pytfa
   :platform: Unix, Windows
   :synopsis: Thermodynamics-based Flux Analysis

.. moduleauthor:: pyTFA team

Thermodynamic cobra_model class and methods definition


"""

import re
from copy import deepcopy
from math import log

import pandas as pd
from cobra import Model

from ..core.model import LCSBModel
from . import std
from .metabolite import MetaboliteThermo
from .reaction import calcDGtpt_rhs, calcDGR_cues, get_debye_huckel_b
from .utils import (
    check_reaction_balance,
    check_transport_reaction,
    find_transported_mets,
    get_reaction_compartment,
)
from ..optim.constraints import (
    SimultaneousUse,
    NegativeDeltaG,
    BackwardDeltaGCoupling,
    ForwardDeltaGCoupling,
    BackwardDirectionCoupling,
    ForwardDirectionCoupling,
    ReactionConstraint,
    MetaboliteConstraint,
    DisplacementCoupling,
)
from ..optim.variables import (
    ThermoDisplacement,
    DeltaGstd,
    DeltaG,
    ForwardUseVariable,
    BackwardUseVariable,
    LogConcentration,
    ReactionVariable,
    MetaboliteVariable,
)
from ..utils import numerics
from ..utils.logger import get_bistream_logger

[docs]BIGM = numerics.BIGM
[docs]BIGM_THERMO = numerics.BIGM_THERMO
[docs]BIGM_DG = numerics.BIGM_DG
[docs]BIGM_P = numerics.BIGM_P
[docs]EPSILON = numerics.EPSILON
[docs]MAX_STOICH = 10
[docs]class ThermoModel(LCSBModel, Model): """ A class representing a cobra_model with thermodynamics information """ def __init__(self, thermo_data=None, model=Model(), name=None, temperature=std.TEMPERATURE_0, min_ph=std.MIN_PH, max_ph=std.MAX_PH): """ :param float temperature: the temperature (K) at which to perform the calculations :param dict thermo_data: The thermodynamic database :type temperature: float """ LCSBModel.__init__(self, model, name) self.logger = get_bistream_logger('ME model' + str(self.name)) self.TEMPERATURE = temperature self.thermo_data = thermo_data self.parent = model # CONSTANTS self.MAX_pH = max_ph self.MIN_pH = min_ph self._init_thermo() self.logger.info('# Model initialized with units {} and temperature {} K' \ .format(self.thermo_unit, self.TEMPERATURE)) def _init_thermo(self): self.thermo_unit = self.thermo_data['units'] self.reaction_cues_data = self.thermo_data['cues'] self.compounds_data = self.thermo_data['metabolites'] self.Debye_Huckel_B = get_debye_huckel_b(self.TEMPERATURE) self.logger = get_bistream_logger('thermomodel_' + str(self.name)) # Compute internal values to adapt the the thermo_unit provided if self.thermo_unit == "kJ/mol": self.GAS_CONSTANT = 8.314472 / 1000 # kJ/(K mol) self.Adjustment = 1 else: self.GAS_CONSTANT = 1.9858775 / 1000 # Kcal/(K mol) self.Adjustment = 4.184 self.RT = self.GAS_CONSTANT * self.TEMPERATURE def normalize_reactions(self): """ Find reactions with important stoichiometry and normalizes them :return: """ self.logger.info("# Model normalization") for this_reaction in self.reactions: metabolites = this_reaction.metabolites max_stoichiometry = max(metabolites.values()) if max_stoichiometry > MAX_STOICH: new_metabolites = { k: -v + v / max_stoichiometry for k, v in metabolites.items() } this_reaction.add_metabolites(new_metabolites) else: continue def _prepare_metabolite(self, met): """ :param met: :return: """ # Get the data about the compartment of the enzyme if not met.compartment in self.compartments: raise Exception( "Compartment not found in cobra_model : " + met.compartment ) CompartmentpH = self.compartments[met.compartment]["pH"] CompartmentionicStr = self.compartments[met.compartment]["ionicStr"] # Which index of the reaction DB do you correspond to ? if not "seed_id" in met.annotation: # raise Exception("seed_id missing for " + met.name) self.logger.debug( "Metabolite {} ({}) has no seed_id".format(met.id, met.name) ) metData = None elif not met.annotation["seed_id"] in self.compounds_data: self.logger.debug( "Metabolite {} ({}) not present in thermoDB".format( met.annotation["seed_id"], met.name ) ) metData = None else: metData = self.compounds_data[met.annotation["seed_id"]] # Override the formula met.formula = metData["formula"] met.thermo = MetaboliteThermo( metData, CompartmentpH, CompartmentionicStr, self.TEMPERATURE, self.MIN_pH, self.MAX_pH, self.Debye_Huckel_B, self.thermo_unit, ) def _prepare_reaction(self, reaction, null_error_override=2): """ :param reaction: :param null_error_override: overrides DeltaG when it is 0 to allow flexibility. 2kcal/mol is standard in estimation frameworks like GCM. :return: """ DeltaGrxn = 0 DeltaGRerr = 0 proton_of = self._proton_of # identifying the reactants if len(reaction.metabolites) == 1: NotDrain = False else: NotDrain = True # Initialize a dictionnary where we will put our data - FIXME : Create a thermo object ? reaction.thermo = {"isTrans": False} # also check if rxn and enzyme compartments match reaction.compartment = get_reaction_compartment(reaction) # Make sure the reaction is balanced... balanceResult = check_reaction_balance( reaction, ( proton_of[reaction.compartment] if reaction.compartment in proton_of else None ), ) # Also test if this is a transport reaction reaction.thermo["isTrans"] = check_transport_reaction(reaction) # Make sure we have correct thermo values for each metabolites correctThermoValues = True for met in reaction.metabolites: if met.thermo.deltaGf_std > 0.9 * BIGM_DG: correctThermoValues = False break if ( not NotDrain or not correctThermoValues or len(reaction.metabolites) >= 100 or balanceResult in ["missing atoms", "drain flux"] ): self.logger.debug( "{} : thermo constraint NOT created".format(reaction.id) ) reaction.thermo["computed"] = False reaction.thermo["deltaGR"] = BIGM_DG reaction.thermo["deltaGRerr"] = BIGM_DG else: self.logger.debug( "{} : thermo constraint created".format(reaction.id) ) reaction.thermo["computed"] = True if reaction.thermo["isTrans"]: (rhs, breakdown) = calcDGtpt_rhs( reaction, self.compartments, self.thermo_unit ) reaction.thermo["deltaGR"] = rhs reaction.thermo["deltaGrxn"] = breakdown["sum_deltaGFis"] else: for met in reaction.metabolites: if met.formula != "H" or ( "seed_id" in met.annotation # That's H+ and met.annotation["seed_id"] != "cpd00067" ): DeltaGrxn += ( reaction.metabolites[met] * met.thermo.deltaGf_tr ) DeltaGRerr += abs( reaction.metabolites[met] * met.thermo.deltaGf_err ) reaction.thermo["deltaGR"] = DeltaGrxn (tmp1, DeltaGRerr, tmp2, tmp3) = calcDGR_cues( reaction, self.reaction_cues_data ) if DeltaGRerr == 0 and null_error_override: DeltaGRerr = null_error_override # default value for DeltaGRerr reaction.thermo["deltaGRerr"] = DeltaGRerr def prepare(self, null_error_override=2): """ Prepares a COBRA toolbox cobra_model for TFBA analysis by doing the following: 1. checks if a reaction is a transport reaction 2. checks the ReactionDB for Gibbs energies of formation of metabolites 3. computes the Gibbs energies of reactions :param null_error_override: overrides DeltaG when it is 0 to allow flexibility. 2kcal/mol is standard in estimation frameworks like GCM. """ self.logger.info("# Model preparation starting...") # Number of reactions num_rxns = len(self.reactions) # Number of metabolites num_mets = len(self.metabolites) for i in range(num_mets): met = self.metabolites[i] self._prepare_metabolite(met) # And now, reactions ! self.logger.debug("computing reaction thermodynamic data") # Look for the proton enzyme... proton = {} for i in range(num_mets): if self.metabolites[i].formula == "H" or ( "seed_id" in self.metabolites[i].annotation and self.metabolites[i].annotation["seed_id"] == "cpd00067" ): proton[self.metabolites[i].compartment] = self.metabolites[i] if len(proton) == 0: raise Exception("Cannot find proton") else: self._proton_of = proton # Iterate over each reaction for i in range(num_rxns): reaction = self.reactions[i] self._prepare_reaction(reaction, null_error_override) self.logger.info("# Model preparation done.") def _convert_metabolite(self, met, add_potentials, verbose): """ Given a enzyme, proceeds to create the necessary variables and constraints for thermodynamics-based modeling :param met: :return: """ if add_potentials: P_lb = -BIGM_P # kcal/mol P_ub = BIGM_P # kcal/mol # exclude protons and water and those without deltaGF # P_met: P_met - RT*LC_met = DGF_met metformula = met.formula metDeltaGF = met.thermo.deltaGf_tr metComp = met.compartment metLConc_lb = log(self.compartments[metComp]["c_min"]) metLConc_ub = log(self.compartments[metComp]["c_max"]) Comp_pH = self.compartments[metComp]["pH"] LC = None if metformula == "H2O": LC = self.add_variable(LogConcentration, met, lb=0, ub=0) elif metformula == "H": LC = self.add_variable( LogConcentration, met, lb=log(10 ** -Comp_pH), ub=log(10 ** -Comp_pH), ) elif ( "seed_id" in met.annotation and met.annotation["seed_id"] == "cpd11416" ): # we do not create the thermo variables for biomass enzyme pass elif metDeltaGF < 10 ** 6: if verbose: self.logger.debug( "generating thermo variables for {}".format(met.id) ) LC = self.add_variable( LogConcentration, met, lb=metLConc_lb, ub=metLConc_ub ) if add_potentials: P = self.add_variable("P_" + met.id, P_lb, P_ub) self.P_vars[met] = P # Formulate the constraint expr = P - self.RT * LC self.add_constraint( LogConcentration, "P_" + met.id, expr, metDeltaGF, metDeltaGF, ) else: self.logger.debug( "NOT generating thermo variables for {}".format(met.id) ) if LC != None: # Register the variable to find it more easily self.LC_vars[met] = LC def _convert_reaction(self, rxn, add_potentials, add_displacement, verbose): """ :param rxn: :param add_potentials: :param add_displacement: :param verbose: :return: """ RT = self.RT DGR_lb = -BIGM_THERMO # kcal/mol DGR_ub = BIGM_THERMO # kcal/mol epsilon = self.solver.configuration.tolerances.feasibility # Is it a water transport reaction ? H2OtRxns = False if rxn.thermo["isTrans"] and len(rxn.reactants) == 1: try: if rxn.reactants[0].annotation["seed_id"] == "cpd00001": H2OtRxns = True except KeyError: pass # Is it a drain reaction ? NotDrain = len(rxn.metabolites) > 1 # if the reaction is flagged with rxnThermo, and it's not a H2O # transport, we will add thermodynamic constraints if rxn.thermo["computed"] and not H2OtRxns: if verbose: self.logger.debug( "generating thermo constraint for {}".format(rxn.id) ) # add the delta G as a variable DGR = self.add_variable(DeltaG, rxn, lb=DGR_lb, ub=DGR_ub) # add the delta G naught as a variable RxnDGerror = rxn.thermo["deltaGRerr"] DGoR = self.add_variable( DeltaGstd, rxn, lb=rxn.thermo["deltaGR"] - RxnDGerror, ub=rxn.thermo["deltaGR"] + RxnDGerror, ) # Initialization of indices and coefficients for all possible # scenaria: LC_TransMet = 0 LC_ChemMet = 0 P_expr = 0 if rxn.thermo["isTrans"]: # calculate the DG component associated to transport of the # enzyme. This will be added to the constraint on the Right # Hand Side (RHS) transportedMets = find_transported_mets(rxn) # Chemical coefficient, it is the enzyme's coefficient... # + transport coeff for reactants # - transport coeff for products chem_stoich = rxn.metabolites.copy() # Adding the terms for the transport part for seed_id, trans in transportedMets.items(): for type_ in ["reactant", "product"]: if trans[type_].formula != "H": LC_TransMet += ( self.LC_vars[trans[type_]] * RT * trans["coeff"] * (-1 if type_ == "reactant" else 1) ) chem_stoich[trans[type_]] += trans["coeff"] * ( 1 if type_ == "reactant" else -1 ) # Also add the chemical reaction part chem_stoich = { met: val for met, val in chem_stoich.items() if val != 0 } for met in chem_stoich: metFormula = met.formula if metFormula not in ["H", "H2O"]: LC_ChemMet += self.LC_vars[met] * RT * chem_stoich[met] else: # if it is just a regular chemical reaction if add_potentials: RHS_DG = 0 for met in rxn.metabolites: metformula = met.formula metDeltaGFtr = met.thermo.deltaGf_tr if metformula == "H2O": RHS_DG = ( RHS_DG + rxn.metabolites[met] * metDeltaGFtr ) elif metformula != "H": P_expr += self.P_vars[met] * rxn.metabolites[met] else: # RxnDGnaught on the right hand side RHS_DG = rxn.thermo["deltaGR"] for met in rxn.metabolites: metformula = met.formula if metformula not in ["H", "H2O"]: # we use the LC here as we already accounted for the # changes in deltaGFs in the RHS term LC_ChemMet += ( self.LC_vars[met] * RT * rxn.metabolites[met] ) # G: - DGR_rxn + DGoRerr_Rxn # + RT * StoichCoefProd1 * LC_prod1 # + RT * StoichCoefProd2 * LC_prod2 # + RT * StoichCoefSub1 * LC_subs1 # + RT * StoichCoefSub2 * LC_subs2 # - ... # = 0 # Formulate the constraint CLHS = DGoR - DGR + LC_TransMet + LC_ChemMet self.add_constraint(NegativeDeltaG, rxn, CLHS, lb=0, ub=0) if add_displacement: lngamma = self.add_variable( ThermoDisplacement, rxn, lb=-BIGM_P, ub=BIGM_P ) # ln(Gamma) = +DGR/RT (DGR < 0 , rxn is forward, ln(Gamma) < 0d expr = lngamma - 1 / RT * DGR self.add_constraint(DisplacementCoupling, rxn, expr, lb=0, ub=0) # Create the use variables constraints and connect them to the # deltaG if the reaction has thermo constraints # FU_rxn: 1000 FU_rxn + DGR_rxn < 1000 - epsilon FU_rxn = self.add_variable(ForwardUseVariable, rxn) CLHS = DGR + FU_rxn * BIGM_THERMO self.add_constraint( ForwardDeltaGCoupling, rxn, CLHS, ub=BIGM_THERMO - epsilon ) # BU_rxn: 1000 BU_rxn - DGR_rxn < 1000 - epsilon BU_rxn = self.add_variable(BackwardUseVariable, rxn) CLHS = BU_rxn * BIGM_THERMO - DGR self.add_constraint( BackwardDeltaGCoupling, rxn, CLHS, ub=BIGM_THERMO - epsilon ) else: if not NotDrain: self.logger.debug( "generating only use constraints for drain reaction" + rxn.id ) else: self.logger.debug( "generating only use constraints for reaction" + rxn.id ) FU_rxn = self.add_variable(ForwardUseVariable, rxn) BU_rxn = self.add_variable(BackwardUseVariable, rxn) # create the prevent simultaneous use constraints # SU_rxn: FU_rxn + BU_rxn <= 1 CLHS = FU_rxn + BU_rxn self.add_constraint(SimultaneousUse, rxn, CLHS, ub=1) # create constraints that control fluxes with their use variables # UF_rxn: F_rxn - M FU_rxn < 0 F_rxn = rxn.forward_variable CLHS = F_rxn - FU_rxn * BIGM self.add_constraint(ForwardDirectionCoupling, rxn, CLHS, ub=0) # UR_rxn: R_rxn - M RU_rxn < 0 R_rxn = rxn.reverse_variable CLHS = R_rxn - BU_rxn * BIGM self.add_constraint(BackwardDirectionCoupling, rxn, CLHS, ub=0) def convert( self, add_potentials=False, add_displacement=False, verbose=True ): """ Converts a cobra_model into a tFBA ready cobra_model by adding the thermodynamic constraints required .. warning:: This function requires you to have already called :func:`~.pytfa.ThermoModel.prepare`, otherwise it will raise an Exception ! """ self.logger.info("# Model conversion starting...") ########################################### # CONSTANTS & PARAMETERS for tFBA problem # ########################################### # value for the bigM in big M constraints such as: # UF_rxn: F_rxn - M*FU_rxn < 0 bigM = BIGM # Check each reactions' bounds for reaction in self.reactions: if ( reaction.lower_bound < -bigM - EPSILON or reaction.upper_bound > bigM + EPSILON ): raise Exception("flux bounds too wide or big M not big enough") if reaction.lower_bound < -bigM: reaction.lower_bound = -bigM if reaction.upper_bound > bigM: reaction.upper_bound = bigM ################### # INPUTS & CHECKS # ################### # check if cobra_model reactions has been checked if they are transport reactions try: for reaction in self.reactions: if not "isTrans" in reaction.thermo: reaction.thermo["isTrans"] = check_transport_reaction( reaction ) except: raise Exception( "Reaction thermo data missing. " + "Please run ThermoModel.prepare()" ) # FIXME Use generalized rule (ext to the function) # formatting the enzyme and reaction names to remove brackets replacements = {"_": re.compile(r"[\[\(]"), "": re.compile(r"[\]\)]")} for items in [self.metabolites, self.reactions]: for item in items: for rep in replacements: item.name = re.sub(replacements[rep], rep, item.name) self.LC_vars = {} self.P_vars = {} for met in self.metabolites: self._convert_metabolite(met, add_potentials, verbose) ## For each reaction... for rxn in self.reactions: self._convert_reaction( rxn, add_potentials, add_displacement, verbose ) # CONSISTENCY CHECKS # Creating the objective if len(self.objective.variables) == 0: self.logger.warning("Objective not found") self.logger.info("# Model conversion done.") self.logger.info("# Updating cobra_model variables...") self.repair() self.logger.info("# cobra_model variables are up-to-date") def print_info(self, specific=False): """ Print information and counts for the cobra_model :return: """ if not specific: LCSBModel.print_info(self) n_metabolites = len(self.metabolites) n_reactions = len(self.reactions) n_metabolites_thermo = len( # [ # x # for x in self.metabolites # if hasattr(x, "thermo") and x.thermo["id"] # ] self._var_kinds[LogConcentration.__name__] ) n_reactions_thermo = len( # [ # x # for x in self.reactions # if x.id is not None # and hasattr(x, "thermo") # and x.thermo["computed"] # ] self._cons_kinds[ForwardDeltaGCoupling.__name__] ) info = pd.DataFrame(columns=["value"]) info.loc["num metabolites(thermo)"] = n_metabolites_thermo info.loc["num reactions(thermo)"] = n_reactions_thermo info.loc["pct metabolites(thermo)"] = ( n_metabolites_thermo / n_metabolites * 100 ) info.loc["pct reactions(thermo)"] = ( n_reactions_thermo / n_reactions * 100 ) info.index.name = "key" print(info) def __deepcopy__(self, memo): """ :param memo: :return: """ return self.copy() def copy(self): from ..io.dict import model_from_dict, model_to_dict from ..optim.utils import copy_solver_configuration dictmodel = model_to_dict(self) new = model_from_dict(dictmodel) copy_solver_configuration(self, new) return new