Source code for pytfa.io.base

"""
.. module:: pytfa
   :platform: Unix, Windows
   :synopsis: Thermodynamics-based Flux Analysis

.. moduleauthor:: pyTFA team

Input/Output tools to import or export pytfa models


"""

import pickle
import zlib
import numpy as np
import re

from cobra import Model, Reaction, Metabolite
from cobra.io import load_matlab_model
from scipy.io import loadmat, savemat

from ..utils.numerics import BIGM_DG

from warnings import warn

try:
    from scipy.sparse import dok_matrix, lil_matrix
except ImportError:
    dok_matrix, lil_matrix = None, None


[docs]def import_matlab_model(path, variable_name=None): """ Convert at matlab cobra_model to a pyTFA cobra_model, with Thermodynamic values :param variable_name: :param string path: The path of the file to import :returns: The converted cobra_model :rtype: cobra.thermo.model.Model """ # We're going to import the Matlab cobra_model and convert it to COBApy mat_data = loadmat(path) if variable_name: mat_model = mat_data[variable_name][0, 0] else: meta_vars = {"__globals__", "__header__", "__version__"} possible_keys = sorted(i for i in mat_data if i not in meta_vars) if len(possible_keys) == 1: variable_name = possible_keys[0] else: raise Exception( 'Need to specify a variable name if several ' + 'variables are contained int the mat file') mat_model = mat_data[variable_name][0, 0] # cobra_model = Model(mat_model['description'][0]) cobra_model = load_matlab_model(path) cobra_model.description = mat_model['description'][0] cobra_model.name = mat_model['description'][0] metabolites = cobra_model.metabolites ## METABOLITES # In the Matlab cobra_model, the corresponding components are : # * mets = Identifiers # * metNames = Names # * metFormulas = formulas # * metCompSymbol = compartments # def read_mat_model(mat_struct, field_name, index): # try: # return mat_struct[field_name][index,0][0] # except IndexError: # return None # metabolites = [Metabolite( read_mat_model(mat_model,'mets',i), # formula=read_mat_model(mat_model,'metFormulas',i), # name=read_mat_model(mat_model,'metNames',i), # compartment=read_mat_model(mat_model,'metCompSymbol',i)) # for i in range(len(mat_model['metNames']))] # Get the metSEEDID seed_id = mat_model['metSEEDID'] for i, _ in enumerate(metabolites): metabolites[i].annotation = {"seed_id": seed_id[i, 0][0]} # ## REACTIONS # # In the Matlab cobra_model, the corresponding components are : # # * rxns = Names # # * rev = Reversibility (not used, see https://cobrapy.readthedocs.io/en/0.5.11/faq.html#How-do-I-change-the-reversibility-of-a-Reaction?) # # * lb = Lower bounds # # * ub = Upper bounds # # * subSystems = subsystem names # # * S : Reactions matrix # # 1 line = 1 metabolite # # 1 column = 1 reaction # # * c : Objective coefficient # # * genes : Genes names # # * rules : Genes rules # # Some utilities for gene generation rules (convert the IDs to the names of the genes) # gene_pattern = re.compile(r'x\([0-9]+\)') # def gene_id_to_name(match): # id_ = int(match.group()[2:-1]) # # /!\ These are indexed from 1, while python indexes from 0 # return mat_model['genes'][id_ - 1, 0][0] # # Add each reaction # for i in range(mat_model['S'].shape[1]): # # Name of the reaction # reaction = Reaction(str(mat_model['rxns'][i, 0][0])) # # Add the reaction to the cobra_model # cobra_model.add_reactions([reaction]) # # NOTE : The str() conversion above is needed, otherwise the CPLEX solver # # does not work : "Invalid matrix input type --" # # Reaction description # # reaction.name not set # # Subsystem (only if set in the Matlab cobra_model) # if (len(mat_model['subSystems'][i, 0])): # reaction.subsystem = mat_model['subSystems'][i, 0][0] # # Lower bound # reaction.lower_bound = float(mat_model['lb'][i, 0]) # # Upper bound # reaction.upper_bound = float(mat_model['ub'][i, 0]) # # Objective coefficient # reaction.objective_coefficient = float(mat_model['c'][i, 0]) # # Metabolites # react_mets = {} # # Iterate over each metabolite and see if it is part of the reaction # # (stoechiomectric coefficient not equal to 0) # for j, _ in enumerate(metabolites): # if (mat_model['S'][j, i] != 0): # react_mets[metabolites[j]] = mat_model['S'][j, i] # reaction.add_metabolites(react_mets) # # Genes # try: # if len(mat_model['rules'][i, 0]): # rule = mat_model['rules'][i, 0][0] # # Call the regex magic to convert IDs to gene names # rule = gene_pattern.sub(gene_id_to_name, rule) # # Add the data to the reaction # reaction.gene_reaction_rule = rule # except ValueError: # pass # except IndexError: # # The gene number is higher than the length of the gene list # warn('The gene reaction rule {} appears to be misaligned with ' # 'the gene list'.format(rule)) Compartments = dict() CompartmentDB = mat_model['CompartmentData'][0] num_comp = len(CompartmentDB['pH'][0][0]) comps = [{} for i in range(num_comp)] for i in range(num_comp): comp = comps[i] comp['pH'] = CompartmentDB['pH'][0][0][i] comp['ionicStr'] = CompartmentDB['ionicStr'][0][0][i] comp['symbol'] = CompartmentDB['compSymbolList'][0][0, i][0] comp['name'] = CompartmentDB['compNameList'][0][0, i][0] comp['c_max'] = CompartmentDB['compMaxConc'][0][0][i] comp['c_min'] = CompartmentDB['compMinConc'][0][0][i] # Register our compartment by its name if comp['symbol'] in Compartments: raise Exception("Duplicate compartment ID : " + comp['symbol']) Compartments[comp['symbol']] = comp # We need to iterate first once to set the names of the compartments, so that we have the keys for our dictionnary... for i in range(num_comp): comp = comps[i] comp['membranePot'] = {} for j in range(num_comp): comp['membranePot'][comps[j]['symbol']] = float(CompartmentDB['membranePot'][0][i, j]) cobra_model.compartments = Compartments recover_compartments(cobra_model, list(Compartments.keys())) return cobra_model
[docs]def recover_compartments(model, compartments_list): for met in model.metabolites: if met.compartment is None: comp_candidate = met.id.split('_')[-1] if comp_candidate in compartments_list: met.compartment = comp_candidate else: model.logger.info('No compartment data found for metabolite {}'.format(met.id)) else: continue
[docs]def write_matlab_model(tmodel, path, varname='tmodel'): """ Writes the Thermo Model to a Matlab-compatible structure :param varname: :param tmodel: :param path: :return: None """ from cobra.io.mat import create_mat_dict mat = create_mat_dict(tmodel) mat.update(create_thermo_dict(tmodel)) mat.update(create_problem_dict(tmodel)) savemat(path, {varname: mat}, appendmat=True, oned_as="column")
[docs]def create_thermo_dict(tmodel): """ Dumps the thermodynamic information in a mat-compatible dictionary (similar to the output of cobra.io.mat.create_mat_dict) :param tmodel: pytfa.thermo.tmodel.ThermoModel :return: dict object """ mat = {} met_map = { 'metSEEDID':('id',''), 'metDeltaGFstd':('deltaGf_std',BIGM_DG), 'metDeltaGFerr':('deltaGf_err',BIGM_DG), 'metMass':('mass',BIGM_DG), 'metCharge':('charge_std',BIGM_DG), 'metDelGFtr':('deltaGf_tr',BIGM_DG), 'metCompSymbol':('compartment','') } for column,(key,default_value) in met_map.items(): the_data = np.full(len(tmodel.metabolites),default_value) for e,the_met in enumerate(tmodel.metabolites): try: the_value = the_met.thermo[key] the_data[e] = the_value except KeyError: continue if column == 'metSEEDID': mat[column] = np.array(the_data, dtype=np.object) else: mat[column] = np.array(the_data) rxn_map = { 'rxnDeltaGR':('DeltaGR',BIGM_DG), 'rxnDeltaGRerr':('DeltaGRerr',BIGM_DG), 'rxnThermo':('computed',None), 'isTrans':('isTrans',None), } for column,(key,default_value) in rxn_map.items(): the_data = np.full(len(tmodel.reactions),default_value) for e,the_rxn in enumerate(tmodel.reactions): try: the_value = the_rxn.thermo[key] the_data[e] = the_value except KeyError: continue mat[column] = np.array(the_data) # Adding compartment data CompartmentDB = {} compartments = [v for v in tmodel.compartments.values()] CompartmentDB['pH'] = np.array([x['pH'] for x in compartments]) CompartmentDB['ionicStr'] = np.array([x['ionicStr'] for x in compartments]) CompartmentDB['compMaxConc'] = np.array([x['c_max'] for x in compartments]) CompartmentDB['compMinConc'] = np.array([x['c_min'] for x in compartments]) #Write symbols and names in collumn cell arrays CompartmentDB['compSymbolList'] = np.zeros((1, len(compartments)), dtype=np.object) CompartmentDB['compNameList'] = np.zeros((1, len(compartments)), dtype=np.object) mat_to_python_string = [('compSymbolList', 'symbol'), ('compNameList', 'name')] for i, _ in enumerate(compartments): for mat_name, py_name in mat_to_python_string: CompartmentDB[mat_name][0, i] = compartments[i][py_name] # The membrane potential is an NxN matrix in the matlab format membrane_pot = np.zeros((len(compartments),len(compartments))) for i, _ in enumerate(compartments): for j, _ in enumerate(compartments): symbol_j = compartments[j]['symbol'] membrane_pot[i,j] = compartments[i]['membranePot'][symbol_j] CompartmentDB['membranePot'] = membrane_pot mat['CompartmentData'] = CompartmentDB return mat
[docs]def varnames2matlab(name, tmodel): """ Transforms reaction variable pairs from `('ACALD','ACALD_reverse_xxxxx')` to `('F_ACALD','B_ACALD')` if it is a reaction, else leaves is as is :return: """ reverse_regex = re.compile(r'(.+_reverse)_[a-f0-9]{5}') new_name = name if new_name in tmodel.reactions: new_name = 'F_' + new_name else: test = reverse_regex.match(new_name) if test: new_name = 'R_' + test.groups()[0] return new_name
[docs]def create_problem_dict(tmodel): """ Dumps the the MILP formulation for TFA in a mat-compatible dictionary (similar to the output of cobra.io.mat.create_mat_dict) :param tmodel: pytfa.thermo.tmodel.ThermoModel :ret """ vartype_map = { 'continuous':'C', 'binary':'B', 'integer':'B' # FIXME Waiting for optlang fix on binary vars } mat= {} mat['A'] = create_generalized_matrix(tmodel) # Variables mat['var_lb'] = np.array([x.lb for x in tmodel.variables]) * 1. mat['var_ub'] = np.array([x.ub for x in tmodel.variables]) * 1. vname = np.full(len(tmodel.variables),'', dtype=np.object) vtype = np.full(len(tmodel.variables),'', dtype=np.object) for e, this_var in enumerate(tmodel.variables): this_name = this_var.name new_name = varnames2matlab(this_name, tmodel) vname[e] = new_name vtype[e] = vartype_map[this_var.type] mat['varNames'] = vname mat['vartypes'] = vtype obj = tmodel.objective mat['objtype'] = -1 if tmodel.objective.direction.startswith('max') else 1 mat['f'] = np.full(len(tmodel.variables), 0, dtype = np.double) * 1.0 for this_var,this_coeff in obj.get_linear_coefficients(obj.variables).items(): matlab_name = varnames2matlab(this_var.name, tmodel) mat['f'][np.where(mat['varNames']==matlab_name)[0][0]] = this_coeff # Constraints rhs = np.empty(len(tmodel.constraints)) ctype = np.full(len(tmodel.constraints),'', dtype=np.object) cname = np.full(len(tmodel.constraints),'', dtype=np.object) for e,this_cons in enumerate(tmodel.constraints): if this_cons.lb is None and this_cons.ub is not None: this_type = '<' this_rhs = this_cons.ub elif this_cons.ub is None and this_cons.lb is not None: this_type = '>' this_rhs = this_cons.lb elif this_cons.ub is not None and this_cons.lb is not None: if this_cons.lb == this_cons.ub: this_type = '=' this_rhs = this_cons.ub else: raise(Exception('Matlab formulation does not support ' 'constraints under the form: lb<x<ub')) else: raise(ValueError('Constraint type not recognized')) rhs[e] = this_rhs ctype[e] = this_type cname[e] = this_var.name mat['rhs'] = rhs *1 mat['constraintType'] = ctype mat['constraintNames'] = cname return mat
[docs]def create_generalized_matrix(tmodel, array_type = 'dense'): """ Returns the generalized stoichiomatric matrix used for TFA :param array_type: :param tmodel: pytfa.ThermoModel :returns: matrix. """ if array_type not in ('DataFrame', 'dense') and not dok_matrix: raise ValueError('Sparse matrices require scipy') dtype = np.float64 array_constructor = {'dense': np.zeros, 'dok': dok_matrix, 'lil': lil_matrix, 'DataFrame': np.zeros, } n_constraints = len(tmodel.constraints) n_variables = len(tmodel.variables) array = array_constructor[array_type]((n_constraints, n_variables), dtype=dtype) c_ind = {x:e for e,x in enumerate(tmodel.constraints)} v_ind = {x:e for e,x in enumerate(tmodel.variables)} for this_cons in tmodel.constraints: var_coeff_dict = this_cons.get_linear_coefficients(this_cons.variables) for this_var,coeff in var_coeff_dict.items(): array[c_ind[this_cons], v_ind[this_var]] = coeff if array_type == 'DataFrame': metabolite_ids = [met.id for met in tmodel.constraints] reaction_ids = [rxn.id for rxn in tmodel.variables] return pd.DataFrame(array, index=metabolite_ids, columns=reaction_ids) else: return array
[docs]def load_thermoDB(path): """ Load a thermodynamic database :param string path: The path of the file to load :returns: The thermodynamic database :rtype: dict """ with open(path, 'rb') as file: ReactionDB = pickle.loads(zlib.decompress(file.read())) return ReactionDB
[docs]def printLP(model): """ Print the LP file corresponding to the cobra_model :param cobra.thermo.model.Model model: The cobra_model to output the LP file for :returns: The content of the LP file :rtype: str Usually, you pass the result of this function to :py:func:`file.write` to write it on disk. If you prefer, you can use :func:`pytfa.io.writeLP` to write the result directly to a file. """ # It's much more efficient to allocate the file in memory then write it as # a whole to the disk, rather than making multiple disk calls res = '' # Write the problem name res += '\\Problem name: {}_LP\n\nMaximize\n obj: '.format(model.description) # Write the objective for rxn in model.reactions: if rxn.objective_coefficient != 0: if rxn.objective_coefficient != 1: res += rxn.objective_coefficient + ' ' res += rxn.id # Constraints res += '\nSubject To\n' for cons in model.constraints: # Name of the constraint : res += cons.name + ': ' # Write the lower bound if applicable... if cons.lb != cons.ub: if cons.lb is not None: res += str(cons.lb) + ' < ' # Write the bound res += str(cons.expression) # Write the upper bound if cons.lb == cons.ub: res += ' = ' + str(cons.ub) elif cons.ub is not None: res += ' < ' + str(cons.ub) # Next line res += '\n' # Variables res += 'Bounds\n' for var in model.variables: # optlang already does the hard job for us, yay ! res += str(var) + '\n' # Binrary constraints res += 'Binaries\n' # Number of variables on the current line we're writing count = 1 for var in model.variables: # FIXME using the integer trick to be able to constraint binary variables # if var.type=='binary': if var.type in ['binary', 'integer']: res += var.name + '\t' count += 1 # Print at most 7 variables per line if count == 7: res += '\n' count = 1 # Done ! res += '\nEnd' return res
[docs]def writeLP(model, path=None): """ Write the LP file of the specified cobra_model to the file indicated by path. :param cobra.thermo.model.Model model: The COBRApy cobra_model to write the LP file for :param string path: `Optional` The path of the file to be written. If not specified, the name of the COBRApy cobra_model will be used. """ if not path: path = model.description + '.lp' with open(path, 'w') as file: file.write(printLP(model))