# -*- coding: utf-8 -*-
"""
.. module:: pytfa
:platform: Unix, Windows
:synopsis: Thermodynamics-based Flux Analysis
.. moduleauthor:: pyTFA team
Relaxation of models with constraint too tight
"""
from copy import deepcopy
import optlang
import pandas as pd
import sympy
from cobra.core.solution import Solution
from sympy.core.numbers import Zero
from numbers import Number
from .constraints import GenericConstraint
from .variables import ForwardUseVariable, BackwardUseVariable
from .variables import GenericVariable
[docs]SYMPY_ADD_CHUNKSIZE = 100
[docs]INTEGER_VARIABLE_TYPES = ('binary','integer')
[docs]def get_all_subclasses(cls):
"""
Given a variable or constraint class, get all the subclassses
that inherit from it
:param cls:
:return:
"""
all_subclasses = []
for subclass in cls.__subclasses__():
all_subclasses.append(subclass)
all_subclasses.extend(get_all_subclasses(subclass))
return all_subclasses
def chunk_sum(variables):
"""
This functions handles the sum of many sympy variables by chunks, which
somehow increases the speed of the computation
You can test it in IPython:
```python
a = sympy.symbols('a0:100')
%timeit (sum(a))
# >>> 198 µs ± 11.4 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
b = sympy.symbols('b0:1000')
%timeit (sum(b))
# >>> 1.85 ms ± 356 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
c = sympy.symbols('c0:3000')
%timeit (sum(c))
# >>> 5min 7s ± 2.57 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
```
See the `github thread <https://github.com/sympy/sympy/issues/13945>`_
:param variables:
:return:
"""
partial_sums = []
for chunk_no in range(len(variables) % SYMPY_ADD_CHUNKSIZE):
first_index = chunk_no * SYMPY_ADD_CHUNKSIZE
last_index = (chunk_no + 1) * SYMPY_ADD_CHUNKSIZE
this_chunk = variables[first_index:last_index]
this_sum = sum(this_chunk)
partial_sums.append(this_sum)
return sum(partial_sums)
def symbol_sum(variables):
"""
``` python
a = symbols('a0:100')
%timeit Add(*a)
# >>> 10000 loops, best of 3: 34.1 µs per loop
b = symbols('b0:1000')
%timeit Add(*b)
# >>> 1000 loops, best of 3: 343 µs per loop
c = symbols('c0:3000')
%timeit Add(*c)
# >>> 1 loops, best of 3: 1.03 ms per loop
```
See the `github thread <https://github.com/sympy/sympy/issues/13945>`_
:param variables:
:return:
"""
from sympy import Add
k=0
# If we encounter a zero, which is a special type, increase k
while isinstance(variables[k], Zero) and k<len(variables):
k+=1
if k == len(variables):
# everything is 0
return 0
if k>len(variables): #it's only zeroes
return 0
if isinstance(variables[k], GenericVariable):
return Add(*[x.variable for x in variables])
elif isinstance(variables[k], optlang.interface.Variable) or \
isinstance(variables[k], sympy.Mul) or \
isinstance(variables[k], sympy.Add) or \
isinstance(variables[k], Number):
return Add(*variables)
else:
raise ValueError('Arguments should be of type Number, sympy.Add, or sympy.Mul, '
'or optlang.Variable, or GenericVariable')
def get_solution_value_for_variables(solution, these_vars, index_by_reaction = False):
if isinstance(these_vars[0],GenericVariable):
var_ids = [x.name for x in these_vars]
elif isinstance(these_vars[0],optlang.Variable):
var_ids = [x.name for x in these_vars]
elif isinstance(these_vars[0],str):
var_ids = these_vars
else:
raise TypeError('''<these_vars> should be of type pytfa.solving.Variable,
optlang.Variable, or str''')
if index_by_reaction:
var2rxn = {v.name:v.id for v in these_vars}
ret = solution.raw[var_ids]
ret = ret.index.replace(var2rxn)
return ret
else:
return solution.raw[var_ids]
[docs]def compare_solutions(models):
"""
returns the solution dictionnary for each cobra_model
:param (iterable (pytfa.thermo.ThermoModel)) models:
:return:
"""
return pd.concat([x.solution.raw for x in models], axis=1)
[docs]def evaluate_constraint_at_solution(constraint, solution):
"""
:param expression:
:param solution: pandas.DataFrame, with index as variable names
:return:
"""
if isinstance(solution,Solution):
solution = solution.raw
if isinstance(constraint, GenericConstraint):
constraint = constraint.constraint
# subs_dict = {x:solution.loc[x.name] for x in constraint.variables}
# return constraint.expression.subs(subs_dict)
coefs = constraint.get_linear_coefficients(constraint.expression.free_symbols)
values = {x:solution.loc[x.name] for x in constraint.expression.free_symbols}
return symbol_sum([coefs[x]*values[x] for x in coefs])
[docs]def get_active_use_variables(tmodel,solution):
"""
Returns the active use variables in a solution. Use Variables are binary
variables that control the directionality of the reaction
ex:
FU_ACALDt
BU_PFK
:type tmodel: pytfa.core.ThermoModel
:param tmodel:
:param solution:
:return:
"""
use_variables = tuple(tmodel.get_variables_of_type(BackwardUseVariable)) \
+ tuple(tmodel.get_variables_of_type(ForwardUseVariable))
epsilon = tmodel.solver.configuration.tolerances.integrality
return [x for x in use_variables if abs(solution.raw[x.name]-1)<epsilon]
[docs]def get_direction_use_variables(tmodel,solution):
"""
Returns the active use variables in a solution. Use Variables are binary
variables that control the directionality of the reaction
The difference with get_active_use_variables is that variables with both
UseVariables at 0 will return as going forwards. This is to ensure that the
output size of the function is equal to the number of FDPs
ex:
FU_ACALDt
BU_PFK
:type tmodel: pytfa.core.ThermoModel
:param tmodel:
:param solution:
:return:
"""
fwd_use_variables = tmodel.get_variables_of_type(ForwardUseVariable)
bwd_use_variables = tmodel.get_variables_of_type(BackwardUseVariable)
epsilon = tmodel.solver.configuration.tolerances.feasibility
return [fwd_use_variables.get_by_id(x.id) if solution.raw[x.id] > epsilon
else bwd_use_variables.get_by_id(x.id)
for x in tmodel.reactions ]
[docs]def get_primal(tmodel, vartype, index_by_reactions = False):
"""
Returns the primal value of the cobra_model for variables of a given type
:param tmodel:
:param vartype: Class of variable. Ex: pytfa.optim.variables.ThermoDisplacement
:param index_by_reactions: Set to true to get reaction names as index instead of
variables. Useful for Escher export
:return:
"""
the_vars = tmodel.get_variables_of_type(vartype)
if index_by_reactions:
return pd.Series({x.id:x.variable.primal
for x in the_vars})
else:
return pd.Series({x.name:x.variable.primal
for x in the_vars})
[docs]def strip_from_integer_variables(tmodel):
"""
Removes all integer and binary variables of a cobra_model, to make it sample-able
:param tmodel:
:return:
"""
continuous_model = tmodel.copy()
continuous_model.name = tmodel.name + ' - continuous'
integer_variables = set()
constraints_with_integer_variables = []
# We go through all the constraint descriptors and check if at least one of
# their variables is in the integer variable list
for this_cons in continuous_model._cons_dict.values():
has_integer_variable = False
for this_var in this_cons.constraint.variables:
if this_var.type in INTEGER_VARIABLE_TYPES:
has_integer_variable += True
this_var_descriptor = continuous_model._var_dict[this_var.name]
integer_variables.add(this_var_descriptor)
if has_integer_variable:
constraints_with_integer_variables.append(this_cons)
for this_cons in constraints_with_integer_variables:
continuous_model.remove_constraint(this_cons)
for this_var in integer_variables:
continuous_model.remove_variable(this_var)
continuous_model.solver.update()
# This will update the values =
print('Is the cobra_model still integer ? {}' \
.format(continuous_model.solver.is_integer))
return continuous_model
[docs]def copy_solver_configuration(source, target):
"""
Copies the solver configuration from a source model to a target model
:param source:
:param target:
:return:
"""
# LP method
try:
target.solver.configuration.lp_method = source.solver.configuration.lp_method
except AttributeError:
pass
# Presolve
target.solver.configuration.presolve = source.solver.configuration.presolve
# Timeout
target.solver.configuration.timeout = source.solver.configuration.timeout
# Tolerances
for tol_name in dir(source.solver.configuration.tolerances):
tol = getattr(source.solver.configuration.tolerances, tol_name)
setattr(target.solver.configuration.tolerances, tol_name, tol)
# Additionnal solver-specific settings
try:
# Gurobi
if source.solver.interface.__name__ == 'optlang.gurobi_interface':
from gurobipy import GurobiError
for k in dir(source.solver.problem.Params):
if not k.startswith('_'):
try:
v = getattr(source.solver.problem.Params, k)
setattr(target.solver.problem.Params, k, v)
except GurobiError:
pass
except ModuleNotFoundError:
pass
# Verbosity
target.solver.configuration.verbosity = source.solver.configuration.verbosity