Source code for pytfa.optim.relaxation

# -*- coding: utf-8 -*-
"""
.. module:: pytfa
   :platform: Unix, Windows
   :synopsis: Thermodynamics-based Flux Analysis

.. moduleauthor:: pyTFA team

Relaxation of models with constraint too tight

"""

from collections import OrderedDict
from copy import deepcopy

from tqdm import tqdm
import pandas as pd
from cobra.util.solver import set_objective
from optlang.exceptions import SolverError

from .constraints import NegativeDeltaG
from .config import dg_relax_config
from .utils import get_solution_value_for_variables, chunk_sum, symbol_sum
from .variables import PosSlackVariable, NegSlackVariable, DeltaGstd, \
    LogConcentration, NegSlackLC, PosSlackLC
from ..utils import numerics

[docs]BIGM = numerics.BIGM
[docs]BIGM_THERMO = numerics.BIGM_THERMO
[docs]BIGM_DG = numerics.BIGM_DG
[docs]BIGM_P = numerics.BIGM_P
[docs]EPSILON = numerics.EPSILON
[docs]def relax_dgo_gurobi(model, relax_obj_type = 0): the_cons = [x.constraint._internal_constraint for x in model.get_constraints_of_type(NegativeDeltaG)] cons_penalities = [1]*len(the_cons) # the_vars = [x._internal_variable for x in model.variables] # vars_penalities = [1]*len(the_vars) grm = model.solver.problem.feasRelax(relaxobjtype=relax_obj_type, minrelax=True, constrs=the_cons, # vars=the_vars, # lbpen=vars_penalities, # ubpen=vars_penalities, vars=None, lbpen=None, ubpen=None, rhspen=cons_penalities) return grm
[docs]def relax_dgo(tmodel, reactions_to_ignore=(), solver=None, in_place = False): """ :param t_tmodel: :type t_tmodel: pytfa.thermo.ThermoModel: :param reactions_to_ignore: Iterable of reactions that should not be relaxed :param solver: solver to use (e.g. 'optlang-glpk', 'optlang-cplex', 'optlang-gurobi' :return: a cobra_model with relaxed bounds on standard Gibbs free energy """ if solver is None: solver = tmodel.solver.interface # Create a copy of the cobra_model on which we will perform the slack addition slack_model = deepcopy(tmodel) slack_model.solver = solver slack_model.name = 'SlackModel '+tmodel.name slack_model.id = 'SlackModel_'+tmodel.id # Ensure the lazy updates are all done slack_model.repair() if not in_place: # Create a copy that will receive the relaxation relaxed_model = deepcopy(tmodel) relaxed_model.solver = solver relaxed_model.name = 'RelaxedModel '+tmodel.name relaxed_model.id = 'RelaxedModel_'+tmodel.id relaxed_model.repair() else: relaxed_model = slack_model dg_relax_config(slack_model) original_objective = relaxed_model.objective # Do not relax if cobra_model is already optimal # try: # solution = tmodel.optimize() # except SolverError as SE: # status = tmodel.solver.status # tmodel.logger.error(SE) # tmodel.logger.warning('Solver status: {}'.format(status)) # if tmodel.solver.status == OPTIMAL: # raise Exception('Model is already optimal') # Find variables that represent standard Gibbs Energy change my_dgo = relaxed_model.get_variables_of_type(DeltaGstd) # Find constraints that represent negativity of Gibbs Energy change my_neg_dg = slack_model.get_constraints_of_type(NegativeDeltaG) changes = OrderedDict() objective_symbols = [] slack_model.logger.info('Adding slack constraints') slack_model.solver.update() for this_neg_dg in tqdm(my_neg_dg, desc='adding slacks'): # If there is no thermo, or relaxation forbidden, pass if this_neg_dg.id in reactions_to_ignore or this_neg_dg.id not in my_dgo: continue # Create the negative and positive slack variables # We can't queue them because they will be in an expression to declare # the constraint neg_slack = slack_model.add_variable(NegSlackVariable, this_neg_dg.reaction, lb=0, ub=BIGM_DG, queue=False) pos_slack = slack_model.add_variable(PosSlackVariable, this_neg_dg.reaction, lb=0, ub=BIGM_DG, queue=False) # Create the new constraint by adding the slack variables to the # negative delta G constraint (from the initial cobra_model) new_expr = this_neg_dg.constraint.expression new_expr += (pos_slack - neg_slack) this_reaction = this_neg_dg.reaction # # Remove former constraint to override it # slack_model.remove_constraint(slack_model._cons_dict[this_neg_dg.name]) # # # Add the new variant # slack_model.add_constraint(NegativeDeltaG, # this_reaction, # expr=new_expr, # lb=0, # ub=0, # queue=True) this_neg_dg.change_expr(new_expr) # Update the objective with the new variables objective_symbols += [neg_slack, pos_slack] # objective = chunk_sum(objective_symbols) objective = symbol_sum(objective_symbols) # Change the objective to minimize slack set_objective(slack_model, objective) # Update variables and constraints references slack_model.repair() slack_model.logger.info('Optimizing slack model') # Relax slack_model.objective.direction = 'min' relaxation = slack_model.optimize() # Extract the relaxation values from the solution, by type relaxed_model.logger.info('Extracting relaxation') my_neg_slacks = slack_model.get_variables_of_type(NegSlackVariable) my_pos_slacks = slack_model.get_variables_of_type(PosSlackVariable) neg_slack_values = get_solution_value_for_variables(relaxation, my_neg_slacks) pos_slack_values = get_solution_value_for_variables(relaxation, my_pos_slacks) epsilon = relaxed_model.solver.configuration.tolerances.feasibility relaxed_model.repair() relaxed_model.solver.update() # Apply reaction delta G standard bound change for this_reaction in tqdm(relaxed_model.reactions, desc = 'applying slack'): # No thermo, or relaxation forbidden if this_reaction.id in reactions_to_ignore or this_reaction.id not in my_dgo: continue # Get the standard delta G variable the_dgo = my_dgo.get_by_id(this_reaction.id) # Get the relaxation dgo_delta_lb = \ neg_slack_values[my_neg_slacks \ .get_by_id(this_reaction.id).name] dgo_delta_ub = \ pos_slack_values[my_pos_slacks \ .get_by_id(this_reaction.id).name] if in_place: the_neg_slack = my_neg_slacks.get_by_id(this_reaction.id) the_neg_slack_value = slack_model.solution.raw[the_neg_slack.name] the_neg_slack.variable.lb = the_neg_slack_value - epsilon the_neg_slack.variable.ub = the_neg_slack_value + epsilon the_pos_slack = my_pos_slacks.get_by_id(this_reaction.id) the_pos_slack_value = slack_model.solution.raw[the_pos_slack.name] the_pos_slack.variable.lb = the_pos_slack_value - epsilon the_pos_slack.variable.ub = the_pos_slack_value + epsilon # Apply reaction delta G standard bound change if dgo_delta_lb > 0 or dgo_delta_ub > 0: # Store previous values previous_dgo_lb = the_dgo.variable.lb previous_dgo_ub = the_dgo.variable.ub if not in_place: # Apply change the_dgo.variable.lb -= (dgo_delta_lb + epsilon) the_dgo.variable.ub += (dgo_delta_ub + epsilon) # If needed, store that in a report table changes[this_reaction.id] = [ previous_dgo_lb, previous_dgo_ub, dgo_delta_lb, dgo_delta_ub, the_dgo.variable.lb, the_dgo.variable.ub] relaxed_model.repair() relaxed_model.logger.info('Testing relaxation') relaxed_model.objective = original_objective relaxed_model.objective.direction = 'max' relaxed_model.optimize() if len(changes) == 0: # The model is infeasible or something went wrong tmodel.logger.error('Relaxation could not complete ' '(no DeltaG relaxation found)') return relaxed_model, slack_model, None # Format relaxation relax_table = pd.DataFrame.from_dict(changes, orient = 'index') relax_table.columns = [ 'lb_in', 'ub_in', 'lb_change', 'ub_change', 'lb_out', 'ub_out'] return relaxed_model, slack_model, relax_table
[docs]def relax_lc(tmodel, metabolites_to_ignore = (), solver = None): """ :param metabolites_to_ignore: :param in_tmodel: :type in_tmodel: pytfa.thermo.ThermoModel: :param min_objective_value: :return: """ if solver is None: solver = tmodel.solver # Create a copy of the cobra_model on which we will perform the slack addition slack_model = deepcopy(tmodel) slack_model.solver = solver # Create a copy that will receive the relaxation relaxed_model = deepcopy(tmodel) relaxed_model.solver = solver # Do not relax if cobra_model is already optimal try: solution = tmodel.optimize() except SolverError as SE: status = tmodel.solver.status tmodel.logger.error(SE) tmodel.logger.warning('Solver status: {}'.format(status)) # if tmodel.solver.status == OPTIMAL: # raise Exception('Model is already optimal') # Find variables that represent standard Gibbs Energy change my_lc = relaxed_model.get_variables_of_type(LogConcentration) # Find constraints that represent negativity of Gibbs Energy change my_neg_dg = slack_model.get_constraints_of_type(NegativeDeltaG) changes = OrderedDict() objective = 0 pos_slack = dict() neg_slack = dict() for this_lc in my_lc: if this_lc.name in metabolites_to_ignore: continue neg_slack[this_lc.name] = slack_model.add_variable(NegSlackLC, this_lc, lb= 0, ub= BIGM_DG) pos_slack[this_lc.name] = slack_model.add_variable(PosSlackLC, this_lc, lb= 0, ub= BIGM_DG) # Update the objective with the new variables objective += (neg_slack[this_lc.name] + pos_slack[this_lc.name]) for this_neg_dg in my_neg_dg: # If there is no thermo, or relaxation forbidden, pass if this_neg_dg.id not in my_neg_dg: continue subs_dict = {k: slack_model.variables.get(k.name) \ for k in this_neg_dg.constraint.variables} # Create the new constraint by adding the slack variables to the # negative delta G constraint (from the initial cobra_model) new_expr = this_neg_dg.constraint.expression.subs(subs_dict) for this_var in this_neg_dg.constraint.variables: if not this_var.name in neg_slack: continue met_id = pos_slack[this_var.name].id the_met = slack_model.metabolites.get_by_id(met_id) stoich = this_neg_dg.reaction.metabolites[the_met] new_expr += slack_model.RT * stoich \ * \ (pos_slack[this_var.name] - neg_slack[this_var.name]) # Remove former constraint to override it slack_model.remove_constraint(this_neg_dg) # Add the new variant slack_model.add_constraint(NegativeDeltaG, this_neg_dg.reaction, expr=new_expr, lb=0, ub=0) # Change the objective to minimize slack set_objective(slack_model, objective) # Update variables and constraints references slack_model.repair() # Relax slack_model.objective.direction = 'min' relaxation = slack_model.optimize() # Extract the relaxation values from the solution, by type my_neg_slacks = slack_model.get_variables_of_type(NegSlackLC) my_pos_slacks = slack_model.get_variables_of_type(PosSlackLC) neg_slack_values = get_solution_value_for_variables(relaxation, my_neg_slacks) pos_slack_values = get_solution_value_for_variables(relaxation, my_pos_slacks) epsilon = relaxed_model.solver.configuration.tolerances.feasibility for this_met in relaxed_model.metabolites: # No thermo, or relaxation forbidden if this_met.id in metabolites_to_ignore or this_met.id not in my_lc: continue # Get the standard delta G variable the_lc = my_lc.get_by_id(this_met.id) # Get the relaxation lc_delta_lb = neg_slack_values[ my_neg_slacks \ .get_by_id(this_met.id).name] lc_delta_ub = \ pos_slack_values[my_pos_slacks \ .get_by_id(this_met.id).name] # Apply reaction delta G standard bound change if lc_delta_lb > 0 or lc_delta_ub > 0: # Store previous values previous_lc_lb = the_lc.variable.lb previous_lc_ub = the_lc.variable.ub # Apply change the_lc.variable.lb -= (lc_delta_lb + epsilon) the_lc.variable.ub += (lc_delta_ub + epsilon) # If needed, store that in a report table changes[this_met.id] = [ previous_lc_lb, previous_lc_ub, lc_delta_lb, lc_delta_ub, the_lc.variable.lb, the_lc.variable.ub] # Obtain relaxation relaxed_model.optimize() # Format relaxation relax_table = pd.DataFrame.from_dict(changes, orient = 'index') relax_table.columns = [ 'lb_in', 'ub_in', 'lb_change', 'ub_change', 'lb_out', 'ub_out'] tmodel.logger.info('\n' + relax_table.__str__()) relaxed_model.relaxation = relax_table return relaxed_model, slack_model, relax_table