Source code for cameo.flux_analysis.analysis

# coding=utf-8
# Copyright 2014 Novo Nordisk Foundation Center for Biosustainability, DTU.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import, print_function, division

import itertools
import logging
import re
from collections import OrderedDict
from functools import reduce
from operator import itemgetter

import numpy
import pandas
from cobra import Reaction, Metabolite
from cobra.core import get_solution
from cobra.util import fix_objective_as_constraint, get_context
from cobra.exceptions import OptimizationError
from numpy import trapz
from optlang.interface import UNBOUNDED, OPTIMAL
from optlang.symbolics import Zero

from cameo import config
from cameo.core.result import Result
from cameo.flux_analysis.util import remove_infeasible_cycles, fix_pfba_as_constraint
from cameo.parallel import SequentialView
from cameo.ui import notice
from cameo.util import partition, _BIOMASS_RE_
from cameo.core.utils import get_reaction_for

logger = logging.getLogger(__name__)

__all__ = ['find_blocked_reactions', 'flux_variability_analysis', 'phenotypic_phase_plane',
           'flux_balance_impact_degree']


def knock_out_metabolite(metabolite, force_steady_state=False):
    """'Knockout' a metabolite. This can be done in 2 ways:

    1. Implementation follows the description in [1] "All fluxes around
    the metabolite M should be restricted to only produce the
    metabolite, for which balancing constraint of mass conservation is
    relaxed to allow nonzero values of the incoming fluxes whereas all
    outgoing fluxes are limited to zero."

    2. Force steady state All reactions consuming the metabolite are
    restricted to only produce the metabolite. A demand reaction is
    added to sink the metabolite produced to keep the problem feasible
    under the S.v = 0 constraint.


    Knocking out a metabolite overrules the constraints set on the
    reactions producing the metabolite.

    Parameters
    ----------
    force_steady_state: bool
        If True, uses approach 2.

    References
    ----------
    .. [1] Kim, P.-J., Lee, D.-Y., Kim, T. Y., Lee, K. H., Jeong, H.,
    Lee, S. Y., & Park, S. (2007). Metabolite essentiality elucidates
    robustness of Escherichia coli metabolism. PNAS, 104(34), 13638-13642

    """
    # restrict reactions to produce metabolite
    for rxn in metabolite.reactions:
        if rxn.metabolites[metabolite] > 0:
            rxn.bounds = (0, 0) if rxn.upper_bound < 0 \
                else (0, rxn.upper_bound)
        elif rxn.metabolites[metabolite] < 0:
            rxn.bounds = (0, 0) if rxn.lower_bound > 0 \
                else (rxn.lower_bound, 0)
    if force_steady_state:
        metabolite._model.add_boundary(metabolite, type="knock-out",
                                       lb=0, ub=1000,
                                       reaction_id="KO_{}".format(metabolite.id))
    else:
        previous_bounds = metabolite.constraint.lb, metabolite.constraint.ub
        metabolite.constraint.lb, metabolite.constraint.ub = -1000, 1000
        context = get_context(metabolite)
        if context:
            def reset():
                metabolite.constraint.lb, metabolite.constraint.ub = previous_bounds

            context(reset)


def find_essential_metabolites(model, threshold=1e-6, force_steady_state=False):
    """Return a list of essential metabolites.

    This can be done in 2 ways:

    1. Implementation follows the description in [1]:
        "All fluxes around the metabolite M should be restricted to only produce the metabolite,
         for which balancing constraint of mass conservation is relaxed to allow nonzero values
         of the incoming fluxes whereas all outgoing fluxes are limited to zero."

    2. Force Steady State approach:
        All reactions consuming the metabolite are restricted to only produce the metabolite. A demand
        reaction is added to sink the metabolite produced to keep the problem feasible under
        the S.v = 0 constraint.

    Briefly, for each metabolite, all reactions that consume that metabolite are blocked and if that makes the
    model either infeasible or results in near-zero flux in the model objective, then the metabolite is
    considered essential.

    Parameters
    ----------
    model : cobra.Model
        The model to find the essential metabolites for.
    threshold : float (default 1e-6)
        Minimal objective flux to be considered viable.
    force_steady_state: bool
        If True, uses approach 2.

    References
    ----------
    .. [1] Kim, P.-J., Lee, D.-Y., Kim, T. Y., Lee, K. H., Jeong, H., Lee, S. Y., & Park, S. (2007).
     Metabolite essentiality elucidates robustness of Escherichia coli metabolism. PNAS, 104(34), 13638–13642
    """

    essential = []
    # Essential metabolites are only in reactions that carry flux.
    metabolites = set()
    solution = model.optimize(raise_error=True)
    for reaction_id, flux in solution.fluxes.items():
        if abs(flux) > 0:
            reaction = model.reactions.get_by_id(reaction_id)
            metabolites.update(reaction.metabolites.keys())

    for metabolite in metabolites:
        with model:
            knock_out_metabolite(metabolite, force_steady_state=force_steady_state)
            model.solver.optimize()
            if model.solver.status != OPTIMAL or model.objective.value < threshold:
                essential.append(metabolite)
    return essential


[docs]def find_blocked_reactions(model): """Determine reactions that cannot carry steady-state flux. Parameters ---------- model: cobra.Model Returns ------- list A list of reactions. """ with model: for exchange in model.boundary: exchange.bounds = (-9999, 9999) fva_solution = flux_variability_analysis(model) return frozenset( reaction for reaction in model.reactions if round( fva_solution.lower_bound(reaction.id), config.ndecimals) == 0 and round( fva_solution.upper_bound(reaction.id), config.ndecimals) == 0 )
[docs]def flux_variability_analysis(model, reactions=None, fraction_of_optimum=0., pfba_factor=None, remove_cycles=False, view=None): """Flux variability analysis. Parameters ---------- model : cobra.Model reactions: None or iterable The list of reaction whose lower and upper bounds should be determined. If `None`, all reactions in `model` will be assessed. fraction_of_optimum : float Fix the objective of the model to a fraction of it's max. Expected to be within [0;1]. Lower values increase variability. pfba_factor : float If not None, fix the total sum of reaction fluxes to its minimum times a factor. Expected to be within [ 1;inf]. Higher factors increase flux variability to a certain point since the bound for the objective is still fixed. remove_cycles : bool If true, apply the CycleFreeFlux algorithm to remove loops from each simulated flux distribution. view: cameo.parallel.SequentialView or cameo.parallel.MultiprocessingView or ipython.cluster.DirectView A parallelization view. Returns ------- FluxVariabilityResult FluxVariabilityResult with DataFrame data_frame property containing the results of the flux variability analysis. """ if view is None: view = config.default_view if reactions is None: reactions = model.reactions with model: if fraction_of_optimum > 0.: fix_objective_as_constraint(model, fraction=fraction_of_optimum) if pfba_factor is not None: # don't add the objective-constraint again so fraction_of_optimum=0 fix_pfba_as_constraint(model, multiplier=pfba_factor, fraction_of_optimum=0) reaction_chunks = (chunk for chunk in partition(reactions, len(view))) if remove_cycles: func_obj = _FvaFunctionObject(model, _cycle_free_fva) else: func_obj = _FvaFunctionObject(model, _flux_variability_analysis) chunky_results = view.map(func_obj, reaction_chunks) solution = pandas.concat(chunky_results) return FluxVariabilityResult(solution)
[docs]def phenotypic_phase_plane(model, variables, objective=None, source=None, points=20, view=None): """Phenotypic phase plane analysis [1]. Implements a phenotypic phase plan analysis with interpretation same as that presented in [1] but calculated by optimizing the model for all steps of the indicated variables (instead of using shadow prices). Parameters ---------- model: cobra.Model variables: str or reaction or iterable A reaction ID, reaction, or list of reactions to be varied. objective: str or reaction or optlang.Objective or Metabolite, optional An objective, a reaction's flux, or a metabolite's production to be minimized/maximized (defaults to the current model objective). source: Reaction or reaction identifier The reaction to use as the source when calculating mass and carbon yield. Set to the medium reaction with the highest input carbon flux if left None. points: int or iterable Number of points to be interspersed between the variable bounds. A list of same same dimensions as `variables` can be used to specify variable specific numbers of points. view: SequentialView or MultiprocessingView or ipython.cluster.DirectView A parallelization view. Returns ------- PhenotypicPhasePlaneResult The phenotypic phase plane with flux, mol carbon input / mol carbon output, gram input / gram output References ---------- [1] Edwards, J. S., Ramakrishna, R. and Palsson, B. O. (2002). Characterizing the metabolic phenotype: a phenotype phase plane analysis. Biotechnology and Bioengineering, 77(1), 27–36. doi:10.1002/bit.10047 """ if isinstance(variables, str): variables = [variables] elif isinstance(variables, Reaction): variables = [variables] variable_ids = [var if isinstance(var, str) else var.id for var in variables] if view is None: view = config.default_view with model: if objective is not None: if isinstance(objective, Metabolite): try: objective = model.reactions.get_by_id("DM_%s" % objective.id) except KeyError: objective = model.add_boundary(objective, type='demand') # try: # objective = model.reaction_for(objective, time_machine=tm) # except KeyError: # pass model.objective = objective if source is not None: source_reaction = get_reaction_for(model, source) else: source_reaction = get_c_source_reaction(model) variable_reactions = model.reactions.get_by_any(variables) variables_min_max = flux_variability_analysis(model, reactions=variable_reactions, view=SequentialView()) grid = [numpy.linspace(lower_bound, upper_bound, points, endpoint=True) for reaction_id, lower_bound, upper_bound in variables_min_max.data_frame.loc[variable_ids].itertuples()] grid_generator = itertools.product(*grid) chunks_of_points = partition(list(grid_generator), len(view)) evaluator = _PhenotypicPhasePlaneChunkEvaluator(model, variable_reactions, source_reaction) chunk_results = view.map(evaluator, chunks_of_points) envelope = reduce(list.__add__, chunk_results) nice_variable_ids = [_nice_id(reaction) for reaction in variable_reactions] variable_reactions_ids = [reaction.id for reaction in variable_reactions] phase_plane = pandas.DataFrame( envelope, columns=(variable_reactions_ids + [ 'objective_lower_bound', 'objective_upper_bound', 'c_yield_lower_bound', 'c_yield_upper_bound', 'mass_yield_lower_bound', 'mass_yield_upper_bound' ]) ) if objective is None: objective = model.objective nice_objective_id = _nice_id(objective) objective = objective.id if isinstance(objective, Reaction) else str(objective) return PhenotypicPhasePlaneResult(phase_plane, variable_reactions_ids, objective, nice_variable_ids=nice_variable_ids, source_reaction=_nice_id(source_reaction), nice_objective_id=nice_objective_id)
def _nice_id(reaction): if isinstance(reaction, Reaction): if hasattr(reaction, 'nice_id'): nice_id = reaction.nice_id elif len(reaction.metabolites) < 5: nice_id = reaction else: nice_id = reaction.id else: nice_id = str(reaction) return nice_id class _FvaFunctionObject(object): def __init__(self, model, fva): self.model = model self.fva = fva def __call__(self, reactions): return self.fva(self.model, reactions) def _flux_variability_analysis(model, reactions=None): if reactions is None: reactions = model.reactions else: reactions = model.reactions.get_by_any(reactions) fva_sol = OrderedDict() lb_flags = dict() with model: model.objective = Zero model.objective.direction = 'min' for reaction in reactions: lb_flags[reaction.id] = False fva_sol[reaction.id] = dict() model.solver.objective.set_linear_coefficients({reaction.forward_variable: 1., reaction.reverse_variable: -1.}) model.solver.optimize() if model.solver.status == OPTIMAL: fva_sol[reaction.id]['lower_bound'] = model.objective.value elif model.solver.status == UNBOUNDED: fva_sol[reaction.id]['lower_bound'] = -numpy.inf else: lb_flags[reaction.id] = True model.solver.objective.set_linear_coefficients({reaction.forward_variable: 0., reaction.reverse_variable: 0.}) assert model.objective.expression == 0, model.objective.expression model.objective.direction = 'max' for reaction in reactions: ub_flag = False model.solver.objective.set_linear_coefficients({reaction.forward_variable: 1., reaction.reverse_variable: -1.}) model.solver.optimize() if model.solver.status == OPTIMAL: fva_sol[reaction.id]['upper_bound'] = model.objective.value elif model.solver.status == UNBOUNDED: fva_sol[reaction.id]['upper_bound'] = numpy.inf else: ub_flag = True if lb_flags[reaction.id] is True and ub_flag is True: fva_sol[reaction.id]['lower_bound'] = 0 fva_sol[reaction.id]['upper_bound'] = 0 [lb_flags[reaction.id], ub_flag] = [False, False] elif lb_flags[reaction.id] is True and ub_flag is False: fva_sol[reaction.id]['lower_bound'] = fva_sol[reaction.id]['upper_bound'] lb_flags[reaction.id] = False elif lb_flags[reaction.id] is False and ub_flag is True: fva_sol[reaction.id]['upper_bound'] = fva_sol[reaction.id]['lower_bound'] ub_flag = False model.solver.objective.set_linear_coefficients({reaction.forward_variable: 0., reaction.reverse_variable: 0.}) assert model.objective.expression == 0, model.objective.expression assert lb_flags[reaction.id] is False and ub_flag is False, "Something is wrong with FVA (%s)" % reaction.id df = pandas.DataFrame.from_dict(fva_sol, orient='index') lb_higher_ub = df[df.lower_bound > df.upper_bound] # this is an alternative solution to what I did above with flags # Assert that these cases really only numerical artifacts try: assert ((lb_higher_ub.lower_bound - lb_higher_ub.upper_bound) < 1e-6).all() except AssertionError: logger.debug(list(zip(model.reactions, (lb_higher_ub.lower_bound - lb_higher_ub.upper_bound) < 1e-6))) df.lower_bound[lb_higher_ub.index] = df.upper_bound[lb_higher_ub.index] return df def get_c_source_reaction(model): """ carbon source reactions Returns ------- Reaction The medium reaction with highest input carbon flux """ try: model.slim_optimize(error_value=None) except OptimizationError: return None medium_reactions = model.reactions.get_by_any(list(model.medium)) medium_reactions_fluxes = [(rxn, total_carbon_flux(rxn, consumption=True)) for rxn in medium_reactions] source_reactions = [(rxn, c_flux) for rxn, c_flux in medium_reactions_fluxes if c_flux > 0] try: return max(source_reactions, key=itemgetter(1))[0] except ValueError: return None def total_carbon_flux(reaction, consumption=True): """summed product carbon flux for a reaction Parameters ---------- reaction : Reaction the reaction to carbon return flux for consumption : bool flux for consumed metabolite, else produced Returns ------- float reaction flux multiplied by number of carbon for the products of the reaction""" direction = 1 if consumption else -1 c_flux = [reaction.flux * coeff * met.elements.get('C', 0) * direction for met, coeff in reaction.metabolites.items()] return sum([flux for flux in c_flux if flux > 0]) def single_flux(reaction, consumption=True): """flux into single product for a reaction only defined for reactions with single products Parameters ---------- reaction : Reaction the reaction to product flux for consumption : bool flux for consumed metabolite, else produced Returns ------- tuple metabolite, flux for the metabolite""" if len(list(reaction.metabolites)) != 1: raise ValueError('product flux only defined for single metabolite reactions') met, coeff = list(reaction.metabolites.items())[0] direction = 1 if consumption else -1 return met, reaction.flux * coeff * direction def _cycle_free_fva(model, reactions=None, sloppy=True, sloppy_bound=666): """Cycle free flux-variability analysis. (http://cran.r-project.org/web/packages/sybilcycleFreeFlux/index.html) Parameters ---------- model : cobra.Model reactions : list List of reactions whose flux-ranges should be determined. sloppy : boolean, optional If true, only fluxes v with abs(v) > sloppy_bound are checked to be futile cycles (defaults to True). sloppy_bound : int, optional The threshold bound used by sloppy (defaults to the number of the beast). """ cycle_count = 0 if reactions is None: reactions = model.reactions else: reactions = model.reactions.get_by_any(reactions) fva_sol = OrderedDict() for reaction in reactions: fva_sol[reaction.id] = dict() model.objective = reaction model.objective.direction = 'min' model.solver.optimize() if model.solver.status == UNBOUNDED: fva_sol[reaction.id]['lower_bound'] = -numpy.inf continue elif model.solver.status != OPTIMAL: fva_sol[reaction.id]['lower_bound'] = 0 continue bound = model.objective.value if sloppy and abs(bound) < sloppy_bound: fva_sol[reaction.id]['lower_bound'] = bound else: logger.debug('Determine if {} with bound {} is a cycle'.format(reaction.id, bound)) solution = get_solution(model) v0_fluxes = solution.fluxes v1_cycle_free_fluxes = remove_infeasible_cycles(model, v0_fluxes) if abs(v1_cycle_free_fluxes[reaction.id] - bound) < 10 ** -6: fva_sol[reaction.id]['lower_bound'] = bound else: logger.debug('Cycle detected: {}'.format(reaction.id)) cycle_count += 1 v2_one_cycle_fluxes = remove_infeasible_cycles(model, v0_fluxes, fix=[reaction.id]) with model: for key, v1_flux in v1_cycle_free_fluxes.items(): if round(v1_flux, config.ndecimals) == 0 and round(v2_one_cycle_fluxes[key], config.ndecimals) != 0: knockout_reaction = model.reactions.get_by_id(key) knockout_reaction.knock_out() model.objective.direction = 'min' model.solver.optimize() if model.solver.status == OPTIMAL: fva_sol[reaction.id]['lower_bound'] = model.objective.value elif model.solver.status == UNBOUNDED: fva_sol[reaction.id]['lower_bound'] = -numpy.inf else: fva_sol[reaction.id]['lower_bound'] = 0 for reaction in reactions: model.objective = reaction model.objective.direction = 'max' model.solver.optimize() if model.solver.status == UNBOUNDED: fva_sol[reaction.id]['upper_bound'] = numpy.inf continue elif model.solver.status != OPTIMAL: fva_sol[reaction.id]['upper_bound'] = 0 continue bound = model.objective.value if sloppy and abs(bound) < sloppy_bound: fva_sol[reaction.id]['upper_bound'] = bound else: logger.debug('Determine if {} with bound {} is a cycle'.format(reaction.id, bound)) solution = get_solution(model) v0_fluxes = solution.fluxes v1_cycle_free_fluxes = remove_infeasible_cycles(model, v0_fluxes) if abs(v1_cycle_free_fluxes[reaction.id] - bound) < 1e-6: fva_sol[reaction.id]['upper_bound'] = v0_fluxes[reaction.id] else: logger.debug('Cycle detected: {}'.format(reaction.id)) cycle_count += 1 v2_one_cycle_fluxes = remove_infeasible_cycles(model, v0_fluxes, fix=[reaction.id]) with model: for key, v1_flux in v1_cycle_free_fluxes.items(): if round(v1_flux, config.ndecimals) == 0 and round(v2_one_cycle_fluxes[key], config.ndecimals) != 0: knockout_reaction = model.reactions.get_by_id(key) knockout_reaction.knock_out() model.objective.direction = 'max' model.solver.optimize() if model.solver.status == OPTIMAL: fva_sol[reaction.id]['upper_bound'] = model.objective.value elif model.solver.status == UNBOUNDED: fva_sol[reaction.id]['upper_bound'] = numpy.inf else: fva_sol[reaction.id]['upper_bound'] = 0 df = pandas.DataFrame.from_dict(fva_sol, orient='index') lb_higher_ub = df[df.lower_bound > df.upper_bound] # Assert that these cases really only numerical artifacts assert ((lb_higher_ub.lower_bound - lb_higher_ub.upper_bound) < 1e-6).all() df.lower_bound[lb_higher_ub.index] = df.upper_bound[lb_higher_ub.index] return df class _PhenotypicPhasePlaneChunkEvaluator(object): def __init__(self, model, variable_reactions, source): self.model = model self.source = source self.variable_reactions = variable_reactions objective_reactions = [reaction for reaction in model.reactions if reaction.objective_coefficient != 0] if len(objective_reactions) != 1: raise NotImplementedError('complex objectives not supported') self.product_reaction = objective_reactions[0] def carbon_yield(self): """ mol product per mol carbon input Returns ------- float the mol carbon atoms in the product (as defined by the model objective) divided by the mol carbon in the input reactions (as defined by the model medium) or zero in case of division by zero arises""" if self.source is None: return numpy.nan carbon_input_flux = total_carbon_flux(self.source, consumption=True) carbon_output_flux = total_carbon_flux(self.product_reaction, consumption=False) try: return carbon_output_flux / carbon_input_flux except ZeroDivisionError: return numpy.nan def mass_yield(self): """ gram product divided by gram of feeding source only defined when we have only one product (as defined by the model objective) and only one compound as carbon source (as defined by the model medium). Returns ------- float gram product per 1 g of feeding source or nan if more than one product or feeding source """ if self.source is None: return numpy.nan try: source, source_flux = single_flux(self.source, consumption=True) product, product_flux = single_flux(self.product_reaction, consumption=False) mol_prod_mol_src = product_flux / source_flux except (ValueError, ZeroDivisionError): return numpy.nan return (mol_prod_mol_src * product.formula_weight) / source.formula_weight def __call__(self, points): return [self._production_envelope_inner(point) for point in points] def _interval_estimates(self): self.model.solver.optimize() if self.model.solver.status == OPTIMAL: flux = self.model.solver.objective.value carbon_yield = self.carbon_yield() mass_yield = self.mass_yield() else: flux = numpy.nan carbon_yield = numpy.nan mass_yield = numpy.nan return flux, carbon_yield, mass_yield def _production_envelope_inner(self, point): original_direction = self.model.objective.direction with self.model: for (reaction, coordinate) in zip(self.variable_reactions, point): reaction.bounds = (coordinate, coordinate) interval = [] interval_carbon_yield = [] interval_mass_yield = [] self.model.objective.direction = 'min' flux, carbon_yield, mass_yield = self._interval_estimates() interval.append(flux) interval_carbon_yield.append(carbon_yield) interval_mass_yield.append(mass_yield) self.model.objective.direction = 'max' flux, carbon_yield, mass_yield = self._interval_estimates() interval.append(flux) interval_carbon_yield.append(carbon_yield) interval_mass_yield.append(mass_yield) self.model.objective.direction = original_direction intervals = tuple(interval) + tuple(interval_carbon_yield) + tuple(interval_mass_yield) return point + intervals
[docs]def flux_balance_impact_degree(model, knockouts, view=config.default_view, method="fva"): """ Flux balance impact degree by Zhao et al 2013 Parameters ---------- model: cobra.Model Wild-type model knockouts: list Reactions to knockout method: str The method to compute the perturbation. default is "fva" - Flux Variability Analysis. It can also be computed with "em" - Elementary modes (not implemented) Returns ------- FluxBalanceImpactDegreeResult: perturbation The changes in reachable reactions (reactions that can carry flux) """ if method == "fva": reachable_reactions, perturbed_reactions = _fbid_fva(model, knockouts, view) elif method == "em": raise NotImplementedError("Elementary modes approach is not implemented") else: raise ValueError("%s method is not valid to compute Flux Balance Impact Degree" % method) return FluxBalanceImpactDegreeResult(reachable_reactions, perturbed_reactions, method)
def _fbid_fva(model, knockouts, view): with model: for reaction in model.reactions: if reaction.reversibility: reaction.bounds = (-1, 1) else: reaction.bounds = (0, 1) wt_fva = flux_variability_analysis(model, view=view, remove_cycles=False) wt_fva._data_frame['upper_bound'] = wt_fva._data_frame.upper_bound.apply(numpy.round) wt_fva._data_frame['lower_bound'] = wt_fva._data_frame.lower_bound.apply(numpy.round) reachable_reactions = wt_fva.data_frame.query("lower_bound != 0 | upper_bound != 0") for reaction in model.reactions.get_by_any(knockouts): reaction.knock_out() mt_fva = flux_variability_analysis(model, reactions=reachable_reactions.index, view=view, remove_cycles=False) mt_fva._data_frame['upper_bound'] = mt_fva._data_frame.upper_bound.apply(numpy.round) mt_fva._data_frame['lower_bound'] = mt_fva._data_frame.lower_bound.apply(numpy.round) perturbed_reactions = [] for reaction in reachable_reactions.index: if wt_fva.upper_bound(reaction) != mt_fva.upper_bound(reaction) or wt_fva.lower_bound( reaction) != wt_fva.lower_bound(reaction): perturbed_reactions.append(reaction) return list(reachable_reactions.index), perturbed_reactions class PhenotypicPhasePlaneResult(Result): def __init__(self, phase_plane, variable_ids, objective, nice_variable_ids=None, nice_objective_id=None, source_reaction=None, *args, **kwargs): super(PhenotypicPhasePlaneResult, self).__init__(*args, **kwargs) self._phase_plane = phase_plane self.variable_ids = variable_ids self.nice_variable_ids = nice_variable_ids self.objective = objective self.nice_objective_id = nice_objective_id self.source_reaction = source_reaction @property def data_frame(self): return pandas.DataFrame(self._phase_plane) def plot(self, plotter, grid=None, width=None, height=None, title=None, axis_font_size=None, palette=None, points=None, points_colors=None, estimate='flux', **kwargs): """plot phenotypic phase plane result create a plot of a phenotypic phase plane analysis Parameters ---------- grid: plotting grid the grid for plotting width: int the width of the plot height: int the height of the plot title: string the height of the plot axis_font_size: int the font sizes for the axis palette: string name of color palette to use, e.g. RdYlBlu points: iterable of points additional points to plot as x, y iterable points_colors: iterable of strings iterable with colors for the points estimate: string either flux, mass_yield (g output / g output) or c_yield (mol carbon output / mol carbon input) """ possible_estimates = {'flux': ('objective_upper_bound', 'objective_lower_bound', 'flux', '[mmol gDW^-1 h^-1]'), 'mass_yield': ('mass_yield_upper_bound', 'mass_yield_lower_bound', 'mass yield, src={}'.format(self.source_reaction), '[g/g(src) h^-1]'), 'c_yield': ('c_yield_upper_bound', 'c_yield_lower_bound', 'carbon yield, src={}'.format(self.source_reaction), '[mmol(C)/mmol(C(src)) h^-1]')} if estimate not in possible_estimates: raise ValueError('estimate must be one of %s' % ', '.join(possible_estimates.keys())) upper, lower, description, unit = possible_estimates[estimate] if title is None: title = "Phenotypic Phase Plane ({})".format(description) if len(self.variable_ids) == 1: variable = self.variable_ids[0] y_axis_label = self._axis_label(self.objective, self.nice_objective_id, unit) x_axis_label = self._axis_label(variable, self.nice_variable_ids[0], '[mmol gDW^-1 h^-1]') dataframe = pandas.DataFrame(columns=["ub", "lb", "value", "strain"]) for _, row in self.iterrows(): _df = pandas.DataFrame([[row[upper], row[lower], row[variable], "WT"]], columns=dataframe.columns) dataframe = dataframe.append(_df) plot = plotter.production_envelope(dataframe, grid=grid, width=width, height=height, title=title, y_axis_label=y_axis_label, x_axis_label=x_axis_label, palette=palette, points=points, points_colors=points_colors) elif len(self.variable_ids) == 2: var_1 = self.variable_ids[0] var_2 = self.variable_ids[1] x_axis_label = self._axis_label(var_1, self.nice_variable_ids[0], '[mmol gDW^-1 h^-1]') y_axis_label = self._axis_label(var_2, self.nice_variable_ids[1], '[mmol gDW^-1 h^-1]') z_axis_label = self._axis_label(self.objective, self.nice_objective_id, unit) dataframe = pandas.DataFrame(columns=["ub", "lb", "value1", "value2", "strain"]) for _, row in self.iterrows(): _df = pandas.DataFrame([[row[upper], row[lower], row[var_1], row[var_2], "WT"]], columns=dataframe.columns) dataframe = dataframe.append(_df) plot = plotter.production_envelope_3d(dataframe, grid=grid, width=width, height=height, title=title, y_axis_label=y_axis_label, x_axis_label=x_axis_label, z_axis_label=z_axis_label, palette=palette, points=points, points_colors=points_colors) else: notice("Multi-dimensional plotting is not supported") return if grid is None: plotter.display(plot) @staticmethod def _axis_label(variable_id, nice_variable_id, unit): if re.search(_BIOMASS_RE_, variable_id): return '{} [h^-1]'.format(nice_variable_id) else: return '{} {}'.format(nice_variable_id, unit) def __getitem__(self, item): return self._phase_plane[item] def iterrows(self): return self._phase_plane.iterrows() @property def area(self): area = 0 for variable_id in self.variable_ids: area += self.area_for(variable_id) return area def area_for(self, variable_id): data_frame = self._phase_plane.sort_values(by=variable_id, ascending=True) auc_max = trapz(data_frame.objective_upper_bound.values, x=data_frame[variable_id]) auc_min = trapz(data_frame.objective_lower_bound.values, x=data_frame[variable_id]) return auc_max - auc_min class FluxVariabilityResult(Result): def __init__(self, data_frame, *args, **kwargs): super(FluxVariabilityResult, self).__init__(*args, **kwargs) self._data_frame = data_frame @property def data_frame(self): return self._data_frame def plot(self, plotter, index=None, grid=None, width=None, height=None, title=None, palette=None, **kwargs): if index is None: index = self.data_frame.index[0:10] fva_result = self.data_frame.loc[index] if title is None: title = "Flux Variability Analysis" dataframe = pandas.DataFrame(columns=["lb", "ub", "strain", "reaction"]) for reaction_id, row in fva_result.iterrows(): _df = pandas.DataFrame([[row['lower_bound'], row['upper_bound'], "WT", reaction_id]], columns=dataframe.columns) dataframe = dataframe.append(_df) plot = plotter.flux_variability_analysis(dataframe, grid=grid, width=width, height=height, title=title, y_axis_label="Reactions", x_axis_label="Flux limits", palette=palette) if grid is None: plotter.display(plot) def __getitem__(self, item): return self._data_frame[item] def upper_bound(self, item): if isinstance(item, Reaction): item = item.id return self['upper_bound'][item] def lower_bound(self, item): if isinstance(item, Reaction): item = item.id return self['lower_bound'][item] def iterrows(self): return self._data_frame.iterrows() class FluxBalanceImpactDegreeResult(Result): def __init__(self, reachable_reactions, perturbed_reactions, method, *args, **kwargs): super(FluxBalanceImpactDegreeResult, self).__init__(*args, **kwargs) self._method = method self._reachable_reactions = reachable_reactions self._perturbed_reactions = perturbed_reactions def __contains__(self, item): if isinstance(item, Reaction): item = item.id return item in self._reachable_reactions def _repr_html_(self): return """ <h3>Flux Balance Impact Degree</h3> <ul> <li> Degree: %i</li> <li> Reactions: %i</li> </ul> %s """ % (self.degree, len(self._reachable_reactions), self.data_frame._repr_html_()) @property def degree(self): return len(self._perturbed_reactions) @property def data_frame(self): data_frame = pandas.DataFrame(columns=["perturbed"]) for reaction in self._reachable_reactions: data_frame.loc[reaction] = [reaction in self._perturbed_reactions] return data_frame def plot(self, grid=None, width=None, height=None, title=None): pass def n_carbon(reaction): return sum(metabolite.elements.get('C', 0) for metabolite in reaction.metabolites)