Source code for cameo.core.strain_design

# Copyright 2015 Novo Nordisk Foundation Center for Biosustainability, DTU.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Core implementation of strain design. It contains core structures.
Targets (and subclasses) are identified by strain design methods.
"""

import sys

from cobra import DictList
from pandas import DataFrame

from cameo.core.result import Result
from cameo.core.target import EnsembleTarget, Target

from gnomic import Genotype

__all__ = ['StrainDesign', 'StrainDesignMethod', 'StrainDesignMethodResult']


[docs]class StrainDesignMethod(object): def __init__(self, *args, **kwargs): super(StrainDesignMethod, self).__init__(*args, **kwargs) def __call__(self, *args, **kwargs): self.run(*args, **kwargs)
[docs] def run(self, *args, **kwargs): raise NotImplementedError
[docs]class StrainDesign(object): """ A StrainDesign is a collection of targets in a COBRA model. The targets, identified by a StrainDesignMethod, map elements in the model that need to be modified to achieve the objective of the design method. """ def __init__(self, targets): self.targets = DictList(targets) def __str__(self): return ", ".join(str(t) for t in self.targets) def __repr__(self): return "<StrainDesign [" + ";".join(repr(t) for t in self.targets) + "]>" def __iter__(self): return iter(self.targets) def __len__(self): return len(self.targets) def __contains__(self, item): if isinstance(item, Target): if item in self.targets: return item == self.targets.get_by_id(item.id) else: return False elif isinstance(item, str): return item in self.targets else: return False def __eq__(self, other): if isinstance(other, StrainDesign): if len(self) != len(other): return False else: return all(t in other for t in self.targets) and all(t in self for t in other.targets) else: return False
[docs] def apply(self, model): for target in self.targets: target.apply(model)
def __add__(self, other): if not isinstance(other, StrainDesign): raise AssertionError("Only instances of StrainDesign can be added together") targets = {} for target in self.targets: if target.id not in targets: targets[target.id] = set() targets[target.id].add(target) for target in other.targets: if target.id not in targets: targets[target.id] = set() targets[target.id].add(target) targets = [next(iter(t)) if len(t) == 1 else EnsembleTarget(id, t) for id, t in targets.items()] return StrainDesign(targets) def __iadd__(self, other): if not isinstance(other, StrainDesign): raise AssertionError("Only instances of StrainDesign can be added together") targets = {} for target in self.targets: if target.id not in targets: targets[target.id] = set() targets[target.id].add(target) for target in other.targets: if target.id not in targets: targets[target.id] = set() targets[target.id].add(target) targets = [next(iter(t)) if len(t) == 1 else EnsembleTarget(id, t) for id, t in targets.items()] self.targets = DictList(targets) return self def _repr_html_(self): return " ".join(t._repr_html_() for t in self.targets)
[docs] def to_gnomic(self): return Genotype([target.to_gnomic() for target in self.targets])
[docs]class StrainDesignMethodResult(Result): __method_name__ = None _aggreate_functions_ = { "method": lambda series: tuple(sum(series.values.tolist(), [])) } def __init__(self, designs, *args, **kwargs): super(StrainDesignMethodResult, self).__init__(*args, **kwargs) self._designs = designs def __len__(self): """ Return the number of designs found. """ return len(self._designs) def __iter__(self): """ Returns an iterator that yields StrainDesign objects. """ return iter(self._designs) @property def data_frame(self): df = DataFrame(columns=['targets']) for i, design in enumerate(self._designs): df.loc[i] = [design.targets] return df
[docs] def display_on_map(self, index, map_name): raise NotImplementedError
def __add__(self, other): df = DataFrame([design.targets for design in self._designs], columns=["targets"]) df['method'] = self.__method_name__ i = len(df) for j, design in enumerate(other): df.loc[i + j] = [design, [self.__method_name__]] df = df.groupby(["design"]).aggregate(self._aggreate_functions_) return StrainDesignMethodEnsemble([StrainDesign(targets) for targets in df.targets], df.method.tolist()) def _repr_html_(self): return self.data_frame._repr_html_()
[docs] def plot(self, plotter, grid=None, width=None, height=None, title=None, *args, **kwargs): if title is None: title = "Target frequency plot for %s result" % self.__method_name__ counts = DataFrame(columns=['count']) for design in self._designs: for target in design: if target.id not in counts.index: counts.loc[target.id, 'count'] = 0 counts.loc[target.id, 'count'] += 1 counts['frequency'] = counts['count'].apply(lambda c: c / len(self._designs)) plotter.frequency(counts, title=title, width=width, height=height, **kwargs)
class StrainDesignMethodEnsemble(StrainDesignMethodResult): def __init__(self, designs, methods, *args, **kwargs): super(StrainDesignMethodEnsemble, self).__init__(designs, *args, **kwargs) self._methods = methods @property def data_frame(self): data = [] for design, method in zip(self._designs, self._methods): data.append([design.targets, method]) return DataFrame(data, columns=["design", "method"]) def __add__(self, other): df = self.data_frame i = len(df) for j, design in enumerate(other): if isinstance(other, StrainDesignMethodEnsemble): df.loc[i + j] = [design, self._methods] else: df.loc[i + j] = [design, [self.__method_name__]] df = df.groupby("design").aggregate(self._aggreate_functions_) return StrainDesignMethodEnsemble([StrainDesign(targets) for targets in df.targets], df['method'].tolist())