Source code for pytfa.core.model

# -*- coding: utf-8 -*-
"""
.. module:: pytfa
   :platform: Unix, Windows
   :synopsis: Thermodynamics-based Flux Analysis

.. moduleauthor:: pyTFA team

Model class
"""

from abc import ABC, abstractmethod
from collections import defaultdict

import pandas as pd
from numpy import empty
import optlang
from optlang.exceptions import SolverError
from cobra import DictList, Model
from cobra.core.solution import Solution

from ..utils.str import camel2underscores
from ..optim.variables import GenericVariable, ReactionVariable, MetaboliteVariable
from ..optim.constraints import ReactionConstraint, MetaboliteConstraint
from ..optim.utils import get_primal, get_all_subclasses

import time

def timeit(method):
    """
    Adapted from Andreas Jung's `blog
    <https://www.zopyx.com/andreas-jung/contents/a-python-decorator-for-measuring-the-execution-time-of-methods>`_

    :param method: The method to time
    :return:
    """


    def timed(self, *args, **kw):
        ts = time.time()
        result = method(self, *args, **kw)
        te = time.time()

        message = '%r (%r, %r) %2.2f sec' % (method.__name__, args, kw, te-ts)

        try:
            self.logger.debug(message)
        except AttributeError:
            print(message)
        return result

    return timed

[docs]class LCSBModel(ABC): # @abstractmethod def __init__(self, model, name, sloppy=False): """ Very much model specific """ Model.__init__(self, model.copy(), name) self._cons_queue = list() self._var_queue = list() self._var_dict = dict() self._cons_dict = dict() self.sloppy=sloppy @abstractmethod def copy(self): """ Needs to be reimplemented, as our objects have complicated hierarchy :return: """ def print_info(self): """ Print information and counts for the cobra_model :return: """ n_metabolites = len(self.metabolites) n_reactions = len(self.reactions) n_constraints = len(self.constraints) n_variables = len(self.variables) info = pd.DataFrame(columns=['value']) info.loc['name'] = self.name info.loc['description'] = self.description info.loc['num constraints'] = n_constraints info.loc['num variables'] = n_variables info.loc['num metabolites'] = n_metabolites info.loc['num reactions'] = n_reactions info.index.name = 'key' print(info) def add_variable(self, kind, hook, queue=False, **kwargs): """ Add a new variable to a COBRApy cobra_model. :param kind: :param string,cobra.Reaction hook: Either a string representing the name of the variable to add to the cobra_model, or a reaction object if the kind allows it :returns: The created variable :rtype: optlang.interface.Variable """ # Initialisation links to the cobra_model var = kind(hook, # lb=lower_bound if lower_bound != float('-inf') else None, # ub=upper_bound if upper_bound != float('inf') else None, queue=queue, **kwargs) self._var_dict[var.name] = var self.logger.debug('Added variable: {}'.format(var.name)) # self.add_cons_vars(var.variable) return var def add_constraint(self, kind, hook, expr, queue=False,**kwargs): """ Add a new constraint to a COBRApy cobra_model :param kind: :param string,cobra.Reaction hook: Either a string representing the name of the variable to add to the cobra_model, or a reaction object if the kind allows it :param sympy.thermo.expr.Expr expr: The expression of the constraint :returns: The created constraint :rtype: optlang.interface.Constraint """ if isinstance(expr, GenericVariable): # make sure we actually pass the optlang variable expr = expr.variable # Initialisation links to the cobra_model cons = kind(hook, expr, # problem = self.problem, # lb=lower_bound if lower_bound != float('-inf') else None, # ub=upper_bound if upper_bound != float('inf') else None, queue=queue, **kwargs) self._cons_dict[cons.name] = cons self.logger.debug('Added constraint: {}'.format(cons.name)) # self.add_cons_vars(cons.constraint) return cons def remove_reactions(self, reactions, remove_orphans=False): # Remove the constraints and variables associated to these reactions all_cons_subclasses = get_all_subclasses(ReactionConstraint) all_var_subclasses = get_all_subclasses(ReactionVariable) self._remove_associated_consvar(all_cons_subclasses, all_var_subclasses, reactions) Model.remove_reactions(self,reactions,remove_orphans) def remove_metabolites(self, metabolite_list, destructive=False): # Remove the constraints and variables associated to these reactions all_cons_subclasses = get_all_subclasses(MetaboliteConstraint) all_var_subclasses = get_all_subclasses(MetaboliteVariable) self._remove_associated_consvar(all_cons_subclasses, all_var_subclasses, metabolite_list) Model.remove_metabolites(self, metabolite_list, destructive) def _remove_associated_consvar(self, all_cons_subclasses, all_var_subclasses, collection): """ Removes both the constraints and variables associated to an element, as long as it was used as a hook in the cons/var declaration. For example, upon removing a reaction, also removes its associated deltaG variables and coupling constraints """ if not hasattr(collection, '__iter__'): collection = [collection] strfy = lambda x:x if isinstance(x, str) else x.id for cons_type in all_cons_subclasses: for element in collection: try: cons = self._cons_kinds[cons_type.__name__].get_by_id(strfy(element)) self.remove_constraint(cons) except KeyError as e: pass for var_type in all_var_subclasses: for element in collection: try: var = self._var_kinds[var_type.__name__].get_by_id(strfy(element)) self.remove_variable(var) except KeyError as e: pass def remove_variable(self, var): """ Removes a variable :param var: :return: """ # Get the pytfa var object if an optlang variable is passed if isinstance(var,optlang.Variable): var = self._var_dict[var.name] self._var_dict.pop(var.name) self.remove_cons_vars(var.variable) self.logger.debug('Removed variable {}'.format(var.name)) def remove_constraint(self, cons): """ Removes a constraint :param cons: :return: """ # Get the pytfa var object if an optlang variable is passed if isinstance(cons,optlang.Constraint): cons = self._cons_dict[cons.name] self._cons_dict.pop(cons.name) self.remove_cons_vars(cons.constraint) self.logger.debug('Removed constraint {}'.format(cons.name)) def _push_queue(self): """ updates the constraints and variables of the model with what's in the queue :return: """ self.add_cons_vars(self._var_queue, sloppy=self.sloppy) self.add_cons_vars(self._cons_queue, sloppy = self.sloppy) if len(self._var_queue) > 0: self.regenerate_variables() if len(self._cons_queue) > 0: self.regenerate_constraints() self._var_queue = list() self._cons_queue = list() def regenerate_variables(self): """ Generates references to the cobra_model's constraints in self._var_dict as tab-searchable attributes of the thermo cobra_model :return: """ # Let us not forget to remove fields that might be empty by now if hasattr(self, '_var_kinds'): for k in self._var_kinds: attrname = camel2underscores(k) try: delattr(self, attrname) except AttributeError: pass # The attribute may not have been set up yet _var_kinds = defaultdict(DictList) for k, v in self._var_dict.items(): _var_kinds[v.__class__.__name__].append(v) for k in _var_kinds: attrname = camel2underscores(k) setattr(self, attrname, _var_kinds[k]) self._var_kinds = _var_kinds def regenerate_constraints(self): """ Generates references to the cobra_model's constraints in self._cons_dict as tab-searchable attributes of the thermo cobra_model :return: """ # Let us not forget to remove fields that migh be empty by now if hasattr(self, '_cons_kinds'): for k in self._cons_kinds: attrname = camel2underscores(k) try: delattr(self, attrname) except AttributeError: pass # The attribute may not have been set up yet _cons_kinds = defaultdict(DictList) for k, v in self._cons_dict.items(): _cons_kinds[v.__class__.__name__].append(v) for k in _cons_kinds: attrname = camel2underscores(k) setattr(self, attrname, _cons_kinds[k]) self._cons_kinds = _cons_kinds def repair(self): """ Updates references to variables and constraints :return: """ # self.add_cons_vars([x.constraint for x in self._cons_dict.values()]) # self.add_cons_vars([x.variable for x in self._var_dict.values()]) self._push_queue() Model.repair(self) self.regenerate_constraints() self.regenerate_variables() def get_primal(self, vartype, index_by_reactions=False): """ Returns the primal value of the cobra_model for variables of a given type :param index_by_reactions: :param vartype: Class of variable. Ex: pytfa.optim.variables.ThermoDisplacement :return: """ return get_primal(self, vartype, index_by_reactions) def get_solution(self): """ Overrides the cobra.thermo.solution method, to also get the supplementary variables we added to the cobra_model * :code:`solution.fluxes` in `cobrapy` is a transformed version of the solver output, as it actually calculates the _net_ flux of each reaction by substracting the reverse variable value to the forward variable value. This should be used anytime one needs the actual flux value * :code:`solution.raw` is a clear copy of the solver output. From there one can access the value at solution for all the variables of the problem. However, looking for a reaction ID in there will only give the _forward_ flux. This should be used for any other variable than fluxes. * :code:`solution.values` yields variables multiplied by their scaling factor (1 by default). Useful if you operated scaling on your equations for numerical reasons. This does _not_ include fluxes :return: """ objective_value = self.solver.objective.value status = self.solver.status variables = pd.Series(data=self.solver.primal_values) fluxes = empty(len(self.reactions)) rxn_index = list() var_primals = self.solver.primal_values for (i, rxn) in enumerate(self.reactions): rxn_index.append(rxn.id) fluxes[i] = var_primals[rxn.id] - var_primals[rxn.reverse_id] fluxes = pd.Series(index=rxn_index, data=fluxes, name="fluxes") solution = Solution(objective_value=objective_value, status=status, fluxes=fluxes) self.solution = solution self.solution.raw = variables self.\ solution.values = pd.DataFrame.from_dict({k:v.unscaled for k,v in self._var_dict.items()}, orient = 'index') return solution def optimize(self, objective_sense=None, **kwargs): """ Call the Model.optimize function (which really is but an interface to the solver's. Catches SolverError in the case of no solutions. Passes down supplementary keyword arguments (see cobra.thermo.Model.optimize) :type objective_sense: 'min' or 'max' """ if objective_sense: self.objective.direction = objective_sense try: # self._hidden_optimize_call(kwargs) Model.optimize(self, **kwargs) solution = self.get_solution() self.solution = solution return solution except SolverError as SE: status = self.solver.status self.logger.error(SE) self.logger.warning('Solver status: {}'.format(status)) raise (SE) # @timeit # def _hidden_optimize_call(self, kwargs): # return Model.optimize(self, **kwargs) @timeit def slim_optimize(self, *args, **kwargs): return Model.slim_optimize(self, *args, **kwargs) def get_constraints_of_type(self, constraint_type): """ Convenience function that takes as input a constraint class and returns all its instances within the cobra_model :param constraint_type: :return: """ if isinstance(constraint_type,str): constraint_key = constraint_type else: #it is a class constraint_key = constraint_type.__name__ return self._cons_kinds[constraint_key] def get_variables_of_type(self, variable_type): """ Convenience function that takes as input a variable class and returns all its instances within the cobra_model :param variable_type: :return: """ if isinstance(variable_type,str): variable_key = variable_type else: #it is a class variable_key = variable_type.__name__ return self._var_kinds[variable_key]