Source code for cameo.strain_design.pathway_prediction.pathway_predictor

# Copyright 2014 Novo Nordisk Foundation Center for Biosustainability, DTU.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import, print_function

import logging
import re
import warnings
from collections import Counter
from functools import partial
from math import ceil
from copy import copy

from cobra import DictList
from sympy import Add, Mul, RealNumber

from cobra import Model, Metabolite, Reaction
from cobra.util import SolverNotFound
from cobra.exceptions import OptimizationError

from cameo import fba
from cameo import models, phenotypic_phase_plane
from cameo.config import non_zero_flux_threshold
from cameo.core.pathway import Pathway
from cameo.core.result import Result, MetaInformation
from cameo.core.strain_design import StrainDesignMethodResult, StrainDesign, StrainDesignMethod
from cameo.core.target import ReactionKnockinTarget
from cameo.data import metanetx
from cameo.strain_design.pathway_prediction import util
from cameo.util import TimeMachine

__all__ = ['PathwayPredictor']

logger = logging.getLogger(__name__)

add = Add._from_args
mul = Mul._from_args


class PathwayResult(Pathway, Result, StrainDesign):
    def __init__(self, reactions, exchanges, adapters, product, *args, **kwargs):
        self._meta_information = MetaInformation()
        self.reactions = reactions
        self.exchanges = exchanges
        self.adapters = adapters
        self.product = product
        self.targets = self._build_targets()

    def _replace_adapted_metabolites(self, reaction):
        """
        Replace adapted metabolites by model metabolites

        Parameters
        ----------
        reaction: cameo.core.reaction.Reaction

        Returns
        -------
        cameo.core.reaction.Reaction
        """
        stoichiometry = {}

        for metabolite, coefficient in reaction.metabolites.items():
            found = False
            for adapter in self.adapters:
                if metabolite == adapter.products[0]:
                    metabolite = adapter.reactants[0]
                    found = False
                    break
            if not found:
                metabolite = metabolite

            stoichiometry[metabolite] = coefficient

        reaction = Reaction(id=reaction.id,
                            name=reaction.name,
                            lower_bound=reaction.lower_bound,
                            upper_bound=reaction.upper_bound)
        reaction.add_metabolites(stoichiometry)

        return reaction

    def _build_targets(self):
        targets = DictList()
        for reaction in self.reactions:
            reaction = self._replace_adapted_metabolites(reaction)
            if reaction.id in metanetx.mnx2all:
                target = ReactionKnockinTarget(reaction.id, reaction, accession_id=reaction.id, accession_db='metanetx')
            else:
                target = ReactionKnockinTarget(reaction.id, reaction)
            targets.append(target)

        for reaction in self.exchanges:
            reaction = self._replace_adapted_metabolites(reaction)
            targets.append(ReactionKnockinTarget(reaction.id, reaction))

        product = self._replace_adapted_metabolites(self.product)
        product.lower_bound = 0
        targets.append(ReactionKnockinTarget(product.id, product))

        return targets

    def plot(self, **kwargs):
        pass

    def needs_optimization(self, model, objective=None):
        area = self.production_envelope(model, objective).area
        return area > 1e-5

    def production_envelope(self, model, objective=None):
        with model:
            self.apply(model)
            return phenotypic_phase_plane(model, variables=[objective], objective=self.product.id)

    def plug_model(self, model, adapters=True, exchanges=True):
        warnings.warn("The 'plug_model' method as been deprecated. Use apply instead.", DeprecationWarning)
        model.add_reactions(self.reactions)
        if adapters:
            model.add_reactions(self.adapters)
        if exchanges:
            model.add_reactions(self.exchanges)
        try:
            model.add_reaction(self.product)
        except Exception:
            logger.warning("Exchange %s already in model" % self.product.id)
            pass
        self.product.lower_bound = 0


class PathwayPredictions(StrainDesignMethodResult):
    __method_name__ = "PathwayPredictor"

    def __init__(self, pathways, *args, **kwargs):
        super(PathwayPredictions, self).__init__(pathways, *args, **kwargs)

    @property
    def pathways(self):
        return self._designs

    def plug_model(self, model, index):
        warnings.warn("The 'plug_model' method as been deprecated. You can use result[i].apply instead",
                      DeprecationWarning)
        self.pathways[index].plug_model(model)

    def __getitem__(self, item):
        return self.pathways[item]

    def __str__(self):
        string = str()
        for i, pathway in enumerate(self.pathways):
            string += 'Pathway No. {}'.format(i + 1)
            for reaction in pathway.reactions:
                string += '{}, {}: {}'.format(
                    reaction.id, reaction.name,
                    reaction.build_reaction_string(use_metabolite_names=True)
                )
        return string

    def plot(self, grid=None, width=None, height=None, title=None):
        # TODO: small pathway visualizations would be great.
        raise NotImplementedError

    def plot_production_envelopes(
        self, plotter, model, objective=None, title=None
    ):
        rows = int(ceil(len(self.pathways) / 2.0))
        title = "Production envelops for %s" % self.pathways[0].product.name if title is None else title
        grid = plotter.grid(n_rows=rows, title=title)
        with grid:
            for i, pathway in enumerate(self.pathways):
                ppp = pathway.production_envelope(model, objective=objective)
                ppp.plot(grid=grid, width=450, title="Pathway %i" % (i + 1))


[docs]class PathwayPredictor(StrainDesignMethod): """Pathway predictions from a universal set of reaction. Parameters ---------- model : cobra.Model The model that represents the host organism. universal_model : cobra.Model, optional The model that represents the universal set of reactions. A default model will be used if omitted. mapping : dict, optional A dictionary that contains a mapping between metabolite identifiers in `model` and `universal_model` compartment_regexp : str, optional A regular expression that matches host metabolites' compartments that should be connected to the universal reaction model. If not provided, the compartment containing most metabolites will be chosen. Attributes ---------- model : cobra.Model The provided model + universal_model + adapter reactions Examples -------- Determine production pathways for propane-1,3-diol (MNXM2861 in the metanetx namespace) >>> from cameo.api import hosts >>> pathway_predictor = PathwayPredictor(hosts.ecoli.iJO1366) >>> pathway_predictor.run(product=pathway_predictor.model.metabolites.MNXM2861) """ def __init__(self, model, universal_model=None, mapping=None, compartment_regexp=None): """""" self.original_model = model if compartment_regexp is None: compartments_tally = Counter(metabolite.compartment for metabolite in self.original_model.metabolites) most_common_compartment = compartments_tally.most_common(n=1)[0][0] compartment_regexp = re.compile('^' + most_common_compartment + '$') else: compartment_regexp = re.compile(compartment_regexp) if universal_model is None: logger.debug("Loading default universal model.") self.universal_model = models.universal.metanetx_universal_model_bigg elif isinstance(universal_model, Model): self.universal_model = universal_model else: raise ValueError('Provided universal_model %s is not a model.' % universal_model) self.products = self.universal_model.metabolites if mapping is None: self.mapping = metanetx.all2mnx else: self.mapping = mapping self.model = model.copy() try: logger.info('Trying to set solver to cplex to speed up pathway predictions.') self.model.solver = 'cplex' except SolverNotFound: logger.info('cplex not available for pathway predictions.') self.new_reactions = self._extend_model(model.boundary) logger.debug("Adding adapter reactions to connect model with universal model.") self.adpater_reactions = util.create_adapter_reactions(model.metabolites, self.universal_model, self.mapping, compartment_regexp) self.model.add_reactions(self.adpater_reactions) self._add_switches(self.new_reactions)
[docs] def run(self, product=None, max_predictions=float("inf"), min_production=.1, timeout=None, callback=None, silent=False, allow_native_exchanges=False): """Run pathway prediction for a desired product. Parameters ---------- product : Metabolite, str Metabolite or id or name of metabolite to find production pathways for. max_predictions : int, optional The maximum number of predictions to compute. min_production : float The minimum acceptable production flux to product. timeout : int The time limit [seconds] per attempted prediction. callback : function A function that takes a successfully predicted pathway. silent : bool If True will print the pathways and max flux values. allow_native_exchanges: bool If True, exchange reactions for native metabolites will be allowed. Returns ------- PathwayPredictions The predicted pathways. """ product = self._find_product(product) pathways = list() with TimeMachine() as tm, self.model: tm(do=partial(setattr, self.model.solver.configuration, 'timeout', timeout), undo=partial(setattr, self.model.solver.configuration, 'timeout', self.model.solver.configuration.timeout)) try: product_reaction = self.model.reactions.get_by_id('DM_' + product.id) except KeyError: product_reaction = self.model.add_boundary(product, type='demand') product_reaction.lower_bound = min_production pathway_counter = 1 integer_cut_counter = 1 while pathway_counter <= max_predictions: logger.debug('Predicting pathway No. %d' % pathway_counter) try: self.model.slim_optimize(error_value=None) except OptimizationError as err: logger.error('No pathway could be predicted. Terminating pathway predictions.') logger.error(err) break vars_to_cut = list() for i, y_var_id in enumerate(self._y_vars_ids): y_var = self.model.solver.variables[y_var_id] if y_var.primal == 1.0: vars_to_cut.append(y_var) logger.info(vars_to_cut) if len(vars_to_cut) == 0: # no pathway found: logger.info("It seems %s is a native product in model %s. " "Let's see if we can find better heterologous pathways.", product, self.model) # knockout adapter with native product for adapter in self.adpater_reactions: if product in adapter.metabolites: logger.info('Knocking out adapter reaction %s ' 'containing native product.', adapter) adapter.knock_out() continue pathway = [self.model.reactions.get_by_id(y_var.name[2:]) for y_var in vars_to_cut] pathway_metabolites = set([m for pathway_reaction in pathway for m in pathway_reaction.metabolites]) logger.info('Pathway predicted: %s', '\t'.join( [r.build_reaction_string(use_metabolite_names=True) for r in pathway])) pathway_metabolites.add(product) # Figure out adapter reactions to include adapters = [adapter for adapter in self.adpater_reactions if adapter.products[0] in pathway_metabolites] # Figure out exchange reactions to include exchanges = [exchange for exchange in self._exchanges if abs(exchange.flux) > non_zero_flux_threshold and exchange.id != product_reaction.id] if allow_native_exchanges: exchanges = [exchange for exchange in exchanges if list(exchange.metabolites)[0] in pathway_metabolites] pathway = PathwayResult(pathway, exchanges, adapters, product_reaction) if not silent: util.display_pathway(pathway, pathway_counter) integer_cut = self.model.solver.interface.Constraint(Add(*vars_to_cut), name="integer_cut_" + str(integer_cut_counter), ub=len(vars_to_cut) - 1) logger.debug('Adding integer cut.') tm( do=partial(self.model.solver.add, integer_cut), undo=partial(self.model.solver.remove, integer_cut)) # Test pathway in the original model with self.original_model: pathway.apply(self.original_model) self.original_model.objective = pathway.product.id try: production_flux = self.original_model.slim_optimize(error_value=None) except OptimizationError as err: logger.error(err) logger.error( "Addition of pathway %r made the model unsolvable. " "Skipping pathway.", pathway) continue else: if production_flux > non_zero_flux_threshold: pathways.append(pathway) logger.info("Max flux: %.5G", production_flux) pathway_counter += 1 if callback is not None: callback(pathway) else: logger.warning( "Pathway %r could not be verified. Production " "flux %.5G is below the requirement %.5G. " "Skipping.", pathway, production_flux, non_zero_flux_threshold) finally: integer_cut_counter += 1 return PathwayPredictions(pathways)
def _add_switches(self, reactions): logger.info("Adding switches.") y_vars = list() switches = list() self._exchanges = list() for reaction in reactions: if reaction.id.startswith('DM_'): # demand reactions don't need integer switches self._exchanges.append(reaction) continue y = self.model.solver.interface.Variable('y_' + reaction.id, lb=0, ub=1, type='binary') y_vars.append(y) # The following is a complicated but efficient way to write the following constraints # switch_lb = self.model.solver.interface.Constraint(y * reaction.lower_bound - reaction.flux_expression, # name='switch_lb_' + reaction.id, ub=0) # switch_ub = self.model.solver.interface.Constraint(y * reaction.upper_bound - reaction.flux_expression, # name='switch_ub_' + reaction.id, lb=0) forward_term = mul((RealNumber(-1), reaction.forward_variable)) reverse_term = mul((RealNumber(-1), reaction.reverse_variable)) switch_lb_term = mul((RealNumber(reaction.lower_bound), y)) switch_ub_term = mul((RealNumber(reaction.upper_bound), y)) switch_lb = self.model.solver.interface.Constraint(add((switch_lb_term, forward_term, reverse_term)), name='switch_lb_' + reaction.id, ub=0, sloppy=True) switch_ub = self.model.solver.interface.Constraint(add((switch_ub_term, forward_term, reverse_term)), name='switch_ub_' + reaction.id, lb=0, sloppy=True) switches.extend([switch_lb, switch_ub]) self.model.solver.add(y_vars) self.model.solver.add(switches, sloppy=True) logger.info("Setting minimization of switch variables as objective.") self.model.objective = self.model.solver.interface.Objective(Add(*y_vars), direction='min') self._y_vars_ids = [var.name for var in y_vars] def _extend_model(self, original_exchanges): for exchange in self.model.boundary: if len(exchange.reactants) > 0 >= exchange.lower_bound: exchange.upper_bound = 999999. logger.info("Adding reactions from universal model to host model.") new_reactions = list() original_model_metabolites = [self.mapping.get('bigg:' + m.id[0:-2], m.id) for r in original_exchanges for m, coeff in r.metabolites.items() if len(r.metabolites) == 1 and coeff < 0 < r.upper_bound] universal_exchanges = self.universal_model.boundary for reaction in self.universal_model.reactions: if reaction in self.model.reactions: continue if reaction in universal_exchanges: metabolite = list(reaction.metabolites.keys())[0] if metabolite.id in original_model_metabolites: continue new_reactions.append(copy(reaction)) self.model.add_reactions(new_reactions) return new_reactions def _find_product(self, product): if isinstance(product, str): for metabolite in self.model.metabolites: if metabolite.id == product: return metabolite if metabolite.name == product: return metabolite raise ValueError( "Specified product '{product}' could not be found. " "Try searching pathway_predictor_obj.universal_model.metabolites".format(product=product)) elif isinstance(product, Metabolite): try: return self.model.metabolites.get_by_id(product.id) except KeyError: raise ValueError('Provided product %s cannot be found in universal reaction database.' % product) else: raise ValueError('Provided product %s is neither a metabolite nor an ID or name.' % product)
if __name__ == '__main__': from cameo.api import hosts pathway_predictor = PathwayPredictor(hosts.ecoli.models.EcoliCore) print(pathway_predictor.run(product=pathway_predictor.model.metabolites.MNXM53)) # MNXM53 = L-serine