import marshal
import functools
from . import analyzeSBML
from collections import Counter, defaultdict
import itertools
from copy import deepcopy, copy
from bionetgen.atomizer.utils.util import logMess, memoize, memoizeMapped
from . import atomizationAux as atoAux
import bionetgen.atomizer.utils.pathwaycommons as pwcm
[docs]class SCTSolver:
def __init__(self, database, memoizedResolver=False):
self.database = database
self.memoizedResolver = memoizedResolver
self.graph_map = {}
self.dg = None
[docs] def createSpeciesCompositionGraph(
self,
parser,
configurationFile,
namingConventions,
speciesEquivalences=None,
bioGridFlag=False,
):
"""
Main method for the SCT creation.
It first does stoichiometry analysis, then lexical...
"""
_, rules, _ = parser.getReactions(atomize=True, database=self.database)
molecules, _, _, _, _, _ = parser.getSpecies()
self.database.sbmlAnalyzer = analyzeSBML.SBMLAnalyzer(
parser,
configurationFile,
namingConventions,
speciesEquivalences,
conservationOfMass=True,
)
# classify reactions
(
self.database.classifications,
equivalenceTranslator,
self.database.eequivalenceTranslator,
indirectEquivalenceTranslator,
adhocLabelDictionary,
lexicalDependencyGraph,
userEquivalenceTranslator,
) = self.database.sbmlAnalyzer.classifyReactions(rules, molecules, {})
self.database.reactionProperties = (
self.database.sbmlAnalyzer.getReactionProperties()
)
syndecs = [
1 if i == "Generation" or i == "Decay" else 0
for i in self.database.classifications
]
# user defined and lexical analysis naming conventions are stored here
self.database.reactionProperties.update(adhocLabelDictionary)
(
self.database.translator,
self.database.userLabelDictionary,
self.database.lexicalLabelDictionary,
self.database.partialUserLabelDictionary,
) = self.database.sbmlAnalyzer.getUserDefinedComplexes()
self.database.dependencyGraph = {}
self.database.alternativeDependencyGraph = {}
# fill in the annotation dictionary
self.database.annotationDict = parser.getFullAnnotation()
# just molecule names without parenthesis
strippedMolecules = [x.strip("()") for x in molecules]
# self.database.annotationDict = {}
# ###dependency graph
# binding reactions
for reaction, classification in zip(rules, self.database.classifications):
self.bindingReactionsAnalysis(
self.database.dependencyGraph,
list(atoAux.parseReactions(reaction)),
classification,
)
# let's store each step separately for analysis downstream
self.database.scts = {}
self.database.scts["01_binding_sct"] = deepcopy(self.database.dependencyGraph)
# lexical dependency graph contains lexically induced binding compositions. atomizer gives preference to binding obtained this way as opposed to stoichiometry
# stronger bounds on stoichiometry based binding can be defined in
# reactionDefinitions.json.
for element in lexicalDependencyGraph:
if (
element in self.database.dependencyGraph
and element not in self.database.userLabelDictionary
):
if len(lexicalDependencyGraph[element]) == 0:
continue
"""
oldDependency = self.database.dependencyGraph[element]
if sorted(lexicalDependencyGraph[element][0]) in [sorted(x) for x in oldDependency]:
# if len(oldDependency) > 1:
# logMess('DEBUG:Atomization', 'Species {0} was confirmed to be {1} based on lexical information'.format(element,lexicalDependencyGraph[element]))
self.database.dependencyGraph[
element] = lexicalDependencyGraph[element]
else:
# logMess('INFO:Atomization', 'Species {0} was determined to be {1} instead of {2} based on \
# lexical information'.format(element,
# lexicalDependencyGraph[element], oldDependency))
"""
if self.database.dependencyGraph[element] != []:
self.database.alternativeDependencyGraph[
element
] = lexicalDependencyGraph[element]
else:
logMess(
"INFO:LAE009",
"{0}: being set to be a modification of constructed species {1}".format(
element, lexicalDependencyGraph[element][0]
),
)
atoAux.addToDependencyGraph(
self.database.dependencyGraph,
element,
lexicalDependencyGraph[element][0],
)
else:
if element not in strippedMolecules:
self.database.constructedSpecies.add(element)
self.database.dependencyGraph[element] = lexicalDependencyGraph[element]
# Check if I'm using a molecule that hasn't been used yet
for dependencyCandidate in self.database.dependencyGraph[element]:
for molecule in [
x
for x in dependencyCandidate
if x not in self.database.dependencyGraph
]:
# this is a species that was not originally in the model. in case theres conflict later this is
# to indicate it is given less priority
self.database.dependencyGraph[molecule] = []
# let's store each step separately for analysis downstream
self.database.scts["02_post_lexical_sct"] = deepcopy(
self.database.dependencyGraph
)
# user defined transformations
for key in userEquivalenceTranslator:
for namingEquivalence in userEquivalenceTranslator[key]:
baseElement = min(namingEquivalence, key=len)
modElement = max(namingEquivalence, key=len)
if baseElement not in self.database.dependencyGraph:
self.database.dependencyGraph[baseElement] = []
atoAux.addToDependencyGraph(
self.database.dependencyGraph, modElement, [baseElement]
)
# let's store each step separately for analysis downstream
self.database.scts["03_post_user_sct"] = deepcopy(self.database.dependencyGraph)
# self.database.eequivalence translator contains 1:1 equivalences
# FIXME: do we need this update step or is it enough with the later one?
# catalysis reactions
"""
for key in self.database.eequivalenceTranslator:
for namingEquivalence in self.database.eequivalenceTranslator[key]:
baseElement = min(namingEquivalence, key=len)
modElement = max(namingEquivalence, key=len)
if key != 'Binding':
if baseElement not in self.database.dependencyGraph or self.database.dependencyGraph[baseElement] == []:
if modElement not in self.database.dependencyGraph or self.database.dependencyGraph[modElement] == []:
self.database.dependencyGraph[baseElement] = []
# do we have a meaningful reverse dependence?
# elif all([baseElement not in x for x in self.database.dependencyGraph[modElement]]):
# atoAux.addToDependencyGraph(self.database.dependencyGraph,baseElement,[modElement])
# continue
if baseElement in self.database.annotationDict and modElement in self.database.annotationDict:
baseSet = set([y for x in self.database.annotationDict[
baseElement] for y in self.database.annotationDict[baseElement][x]])
modSet = set([y for x in self.database.annotationDict[
modElement] for y in self.database.annotationDict[modElement][x]])
if len(baseSet.intersection(modSet)) > 0 or len(baseSet) == 0 or len(modSet) == 0:
atoAux.addToDependencyGraph(self.database.dependencyGraph, modElement,
[baseElement])
else:
logMess("ERROR:ANN201", "{0} and {1} have a direct correspondence according to reaction information however their annotations are completely different.".format(
baseElement, modElement))
else:
atoAux.addToDependencyGraph(self.database.dependencyGraph, modElement,
[baseElement])
"""
# include user label information.
for element in self.database.userLabelDictionary:
if self.database.userLabelDictionary[element] in [0, [(0,)]]:
self.database.dependencyGraph[element] = ["0"]
elif (
len(self.database.userLabelDictionary[element][0]) == 0
or element == self.database.userLabelDictionary[element][0][0]
):
self.database.dependencyGraph[element] = []
else:
self.database.dependencyGraph[element] = [
list(self.database.userLabelDictionary[element][0])
]
# If the user is introducing a new molecule term, add it to the SCT
if (
self.database.userLabelDictionary[element][0][0]
not in self.database.dependencyGraph
):
self.database.dependencyGraph[
self.database.userLabelDictionary[element][0][0]
] = []
# let's store each step separately for analysis downstream
self.database.scts["04_post_label_sct"] = deepcopy(
self.database.dependencyGraph
)
# add species elements defined by the user into the naming convention
# definition
molecules.extend(
[
"{0}()".format(x)
for x in self.database.userLabelDictionary
if "{0}()".format(x) not in molecules
]
)
# recalculate 1:1 equivalences now with binding information
(
_,
_,
self.database.eequivalenceTranslator2,
_,
adhocLabelDictionary,
_,
_,
) = self.database.sbmlAnalyzer.classifyReactions(
rules, molecules, self.database.dependencyGraph
)
self.database.reactionProperties.update(adhocLabelDictionary)
# update catalysis equivalences
# catalysis reactions
for key in self.database.eequivalenceTranslator2:
for namingEquivalence in self.database.eequivalenceTranslator2[key]:
baseElement = min(namingEquivalence, key=len)
modElement = max(namingEquivalence, key=len)
# dont overwrite user information
if (
key != "Binding"
and modElement not in self.database.userLabelDictionary
):
if baseElement not in self.database.dependencyGraph:
self.database.constructedSpecies.add(baseElement)
self.database.dependencyGraph[baseElement] = []
if modElement not in self.database.dependencyGraph or not [
True
for x in self.database.dependencyGraph[modElement]
if baseElement in x and len(x) > 1
]:
if (
baseElement in self.database.annotationDict
and modElement in self.database.annotationDict
):
baseSet = set(
[
y
for x in self.database.annotationDict[baseElement]
for y in self.database.annotationDict[baseElement][
x
]
]
)
modSet = set(
[
y
for x in self.database.annotationDict[modElement]
for y in self.database.annotationDict[modElement][x]
]
)
if (
len(baseSet.intersection(modSet)) > 0
or len(baseSet) == 0
or len(modSet) == 0
):
if modElement not in self.database.dependencyGraph:
# if the entry doesnt exist from previous information accept this
atoAux.addToDependencyGraph(
self.database.dependencyGraph,
modElement,
[baseElement],
)
else:
# otherwise add it to the lexical repository
atoAux.addToDependencyGraph(
self.database.alternativeDependencyGraph,
modElement,
[baseElement],
)
else:
baseDB = set(
[
x.split("/")[-2]
for x in baseSet
if "identifiers.org" in x
]
)
modDB = set(
[
x.split("/")[-2]
for x in modSet
if "identifiers.org" in x
]
)
# it is still ok if they each refer to different self.databases
if len(baseDB.intersection(modDB)) == 0:
if modElement not in self.database.dependencyGraph:
# if the entry doesnt exist from previous information accept this
atoAux.addToDependencyGraph(
self.database.dependencyGraph,
modElement,
[baseElement],
)
else:
# otherwise add it to the lexical repository
atoAux.addToDependencyGraph(
self.database.alternativeDependencyGraph,
modElement,
[baseElement],
)
else:
logMess(
"WARNING:ANN201",
"{0} and {1} have a direct correspondence according to reaction information however their annotations are completely different.".format(
baseElement, modElement
),
)
else:
atoAux.addToDependencyGraph(
self.database.dependencyGraph, modElement, [baseElement]
)
else:
logMess(
"WARNING:ATO114",
"Definition conflict between binding information {0} and lexical analyis {1} for molecule {2},\
choosing binding".format(
self.database.dependencyGraph[modElement],
baseElement,
modElement,
),
)
# let's store each step separately for analysis downstream
self.database.scts["05_post_lex_catalysis_sct"] = deepcopy(
self.database.dependencyGraph
)
# non lexical-analysis catalysis reactions
if self.database.forceModificationFlag:
for reaction, classification in zip(rules, self.database.classifications):
preaction = list(atoAux.parseReactions(reaction))
if len(preaction[0]) == 1 and len(preaction[1]) == 1:
if (preaction[0][0] in [0, "0"]) or (preaction[1][0] in [0, "0"]):
continue
if preaction[1][0].lower() in preaction[0][0].lower() or len(
preaction[1][0]
) < len(preaction[0][0]):
base = preaction[1][0]
mod = preaction[0][0]
else:
mod = preaction[1][0]
base = preaction[0][0]
if (
self.database.dependencyGraph[mod] == []
and mod not in self.database.userLabelDictionary
):
if (
base in self.database.userLabelDictionary
and self.database.userLabelDictionary[base] == 0
):
continue
if (
mod in self.database.userLabelDictionary
and self.database.userLabelDictionary[mod] == 0
):
continue
if [mod] in self.database.dependencyGraph[base]:
continue
# can we just match it up through existing species instead of forcing a modification?
greedyMatch = (
self.database.sbmlAnalyzer.greedyModificationMatching(
mod, self.database.dependencyGraph.keys()
)
)
if greedyMatch not in [-1, -2, []]:
self.database.dependencyGraph[mod] = [greedyMatch]
if mod in self.database.alternativeDependencyGraph:
del self.database.alternativeDependencyGraph[mod]
logMess(
"INFO:LAE006",
"{0}: Mapped to {1} using lexical analysis/greedy matching".format(
mod, greedyMatch
),
)
continue
# if the annotations have no overlap whatsoever don't force
# this modifications
if (
base in self.database.annotationDict
and mod in self.database.annotationDict
):
baseSet = set(
[
y
for x in self.database.annotationDict[base]
for y in self.database.annotationDict[base][x]
]
)
modSet = set(
[
y
for x in self.database.annotationDict[mod]
for y in self.database.annotationDict[mod][x]
]
)
if (
(len(baseSet.intersection(modSet))) == 0
and len(baseSet) > 0
and len(modSet) > 0
):
baseDB = set(
[
x.split("/")[-2]
for x in baseSet
if "identifiers.org" in x
]
)
modDB = set(
[
x.split("/")[-2]
for x in modSet
if "identifiers.org" in x
]
)
# we stil ahve to check that they both reference the same self.database
if len(baseDB.intersection(modDB)) > 0:
logMess(
"WARNING:ANN201",
"{0} and {1} have a direct correspondence according to reaction information however their annotations are completely different.".format(
base, mod
),
)
continue
self.database.dependencyGraph[mod] = [[base]]
# let's store each step separately for analysis downstream
self.database.scts["06_post_nonlex_catalysis_sct"] = deepcopy(
self.database.dependencyGraph
)
"""
#complex catalysis reactions
for key in indirectEquivalenceTranslator:
#first remove these entries from the dependencyGraph since
#they are not true bindingReactions
for namingEquivalence in indirectEquivalenceTranslator[key]:
removedElement = ''
tmp3 = deepcopy(namingEquivalence[1])
if tmp3 in self.database.dependencyGraph[namingEquivalence[0][0]]:
removedElement = namingEquivalence[0][0]
elif tmp3 in self.database.dependencyGraph[namingEquivalence[0][1]]:
removedElement = namingEquivalence[0][1]
else:
tmp3.reverse()
if tmp3 in self.database.dependencyGraph[namingEquivalence[0][0]]:
removedElement = namingEquivalence[0][0]
elif tmp3 in self.database.dependencyGraph[namingEquivalence[0][1]]:
removedElement = namingEquivalence[0][1]
#then add the new, true dependencies
#if its not supposed to be a basic element
tmp = [x for x in namingEquivalence[1] if x not in namingEquivalence[2]]
tmp.extend([x for x in namingEquivalence[2] if x not in namingEquivalence[1]])
tmp2 = deepcopy(tmp)
tmp2.reverse()
##TODO: map back for the elements in namingEquivalence[2]
if tmp not in self.database.dependencyGraph[namingEquivalence[3][0]] \
and tmp2 not in self.database.dependencyGraph[namingEquivalence[3][0]]:
if sorted(tmp) == sorted(tmp3):
continue
if all(x in self.database.dependencyGraph for x in tmp):
if removedElement in self.database.dependencyGraph:
self.database.dependencyGraph[removedElement].remove(tmp3)
logMess('INFO:Atomization','Removing {0}={1} and adding {2}={3} instead\
from the dependency list since we determined it is not a true binding reaction based on lexical analysis'\
.format(removedElement,tmp3,namingEquivalence[3][0],tmp))
self.database.dependencyGraph[namingEquivalence[3][0]] = [tmp]
else:
logMess('WARNING:Atomization','We determined that {0}={1} based on lexical analysis instead of \
{2}={3} (stoichiometry) but one of the constituent components in {1} is not a molecule so no action was taken'.format(namingEquivalence[3][0],
tmp,removedElement,tmp3))
#user defined stuff
"""
# stuff obtained from string similarity analysis
for element in self.database.lexicalLabelDictionary:
# similarity analysis has less priority than anything we discovered
# before
if (
element in self.database.dependencyGraph
and len(self.database.dependencyGraph[element]) > 0
):
continue
if (
len(self.database.lexicalLabelDictionary[element][0]) == 0
or element == self.database.lexicalLabelDictionary[element][0][0]
):
self.database.constructedSpecies.add(element)
atoAux.addToDependencyGraph(self.database.dependencyGraph, element, [])
else:
# logMess('INFO:Atomization', ' added induced speciesStructure {0}={1}'
# .format(element, self.database.lexicalLabelDictionary[element][0]))
self.database.dependencyGraph[element] = [
list(self.database.lexicalLabelDictionary[element][0])
]
# let's store each step separately for analysis downstream
self.database.scts["07_post_similarity_sct"] = deepcopy(
self.database.dependencyGraph
)
# Now let's go for annotation analysis and last resort stuff on the remaining orphaned molecules
orphanedSpecies = [
x
for x in strippedMolecules
if x not in self.database.dependencyGraph
or self.database.dependencyGraph[x] == []
]
orphanedSpecies.extend(
[
x
for x in self.database.dependencyGraph
if self.database.dependencyGraph[x] == [] and x not in orphanedSpecies
]
)
# Fill SCT with annotations for those species that still dont have any
# mapping
annotationDependencyGraph, _ = self.fillSCTwithAnnotationInformation(
orphanedSpecies, self.database.annotationDict
)
# use an empty dictionary if we wish to turn off annotation information in atomization
# annotationDependencyGraph = {}
for annotatedSpecies in annotationDependencyGraph:
if (
len(annotationDependencyGraph[annotatedSpecies]) > 0
and annotatedSpecies not in self.database.userLabelDictionary
):
atoAux.addToDependencyGraph(
self.database.dependencyGraph,
annotatedSpecies,
annotationDependencyGraph[annotatedSpecies][0],
)
logMess(
"INFO:ANN004",
"Added equivalence from annotation information {0}={1}".format(
annotatedSpecies, annotationDependencyGraph[annotatedSpecies][0]
),
)
for element in annotationDependencyGraph[annotatedSpecies][0]:
# in case one of the compositional elements is not yet in the
# dependency graph
if element not in self.database.dependencyGraph:
atoAux.addToDependencyGraph(
self.database.dependencyGraph, element, []
)
# let's store each step separately for analysis downstream
self.database.scts["08_post_annotation_sct"] = deepcopy(
self.database.dependencyGraph
)
# can we now add information to the non orphaned species? maybe annotation tells me stuff that contradicts the reaction-network
nonOrphanedSpecies = [x for x in strippedMolecules if x not in orphanedSpecies]
annotationDependencyGraph, _ = self.fillSCTwithAnnotationInformation(
nonOrphanedSpecies,
self.database.annotationDict,
self.database,
tentativeFlag=True,
)
orphanedSpecies = [
x
for x in strippedMolecules
if x not in self.database.dependencyGraph
or self.database.dependencyGraph[x] == []
]
orphanedSpecies.extend(
[
x
for x in self.database.dependencyGraph
if self.database.dependencyGraph[x] == [] and x not in orphanedSpecies
]
)
orphanedSpecies.extend(self.database.constructedSpecies)
strippedMolecules.extend(
[x.strip("()") for x in self.database.constructedSpecies]
)
# TODO: merge both lists and use them as a tiebreaker for consolidation
# completeAnnotationDependencyGraph, completePartialMatches = fillSCTwithAnnotationInformation(strippedMolecules, annotationDict, self.database, False)
# pure lexical analysis for the remaining orphaned molecules
(
tmpDependency,
self.database.tmpEquivalence,
) = self.database.sbmlAnalyzer.findClosestModification(
orphanedSpecies,
strippedMolecules,
self.database.annotationDict,
self.database.dependencyGraph,
)
for species in tmpDependency:
if species not in self.database.userLabelDictionary:
if tmpDependency[species] == []:
atoAux.addToDependencyGraph(
self.database.dependencyGraph, species, []
)
for instance in tmpDependency[species]:
atoAux.addToDependencyGraph(
self.database.dependencyGraph, species, instance
)
if (
len(instance) == 1
and instance[0] not in self.database.dependencyGraph
):
atoAux.addToDependencyGraph(
self.database.dependencyGraph, instance[0], []
)
# let's store each step separately for analysis downstream
self.database.scts["09_post_tiebreaker_sct"] = deepcopy(
self.database.dependencyGraph
)
orphanedSpecies = [
x
for x in strippedMolecules
if x not in self.database.dependencyGraph
or self.database.dependencyGraph[x] == []
]
orphanedSpecies.extend(
[
x
for x in self.database.dependencyGraph
if self.database.dependencyGraph[x] == [] and x not in orphanedSpecies
]
)
orphanedSpecies.extend(self.database.constructedSpecies)
# greedy lexical analysis for the remaining orhpaned species
for reactant in orphanedSpecies:
greedyMatch = self.database.sbmlAnalyzer.greedyModificationMatching(
reactant, self.database.dependencyGraph.keys()
)
if greedyMatch not in [-1, -2, []]:
atoAux.addToDependencyGraph(
self.database.dependencyGraph, reactant, greedyMatch
)
logMess(
"INFO:LAE006",
"Mapped {0} to {1} using lexical analysis/greedy matching".format(
reactant, greedyMatch
),
)
# let's store each step separately for analysis downstream
self.database.scts["10_post_greedy_lex_sct"] = deepcopy(
self.database.dependencyGraph
)
# for key in self.database.scts:
# print(key)
# print(self.database.scts[key])
if len(self.database.constructedSpecies) > 0:
logMess(
"INFO:SCT031",
"The following species names do not appear in the original model but where created to have more appropiate naming conventions: [{0}]".format(
",".join(self.database.constructedSpecies)
),
)
# initialize and remove zero elements
(
self.database.prunnedDependencyGraph,
self.database.weights,
unevenElementDict,
self.database.artificialEquivalenceTranslator,
) = self.consolidateDependencyGraph(
self.database.dependencyGraph,
equivalenceTranslator,
self.database.eequivalenceTranslator,
self.database.sbmlAnalyzer,
)
return self.database
[docs] def bindingReactionsAnalysis(self, dependencyGraph, reaction, classification):
"""
adds addBond based reactions based dependencies to the dependency graph
>>> dg = dg2 = {}
>>> dummy = SCTSolver(None)
>>> dummy.bindingReactionsAnalysis(dg, [['A', 'B'], ['C']], 'Binding')
>>> dg == {'A': [], 'C': [['A', 'B']], 'B': []}
True
>>> dummy.bindingReactionsAnalysis(dg2, [['C'], ['A', 'B']], 'Binding')
>>> dg2 == {'A': [], 'C': [['A', 'B']], 'B': []}
True
"""
totalElements = [item for sublist in reaction for item in sublist]
for element in totalElements:
atoAux.addToDependencyGraph(dependencyGraph, element, [])
if classification == "Binding":
if len(reaction[1]) == 1 and element not in reaction[0]:
atoAux.addToDependencyGraph(dependencyGraph, element, reaction[0])
elif len(reaction[0]) == 1 and element not in reaction[1]:
atoAux.addToDependencyGraph(dependencyGraph, element, reaction[1])
[docs] def consolidateDependencyGraph(
self,
dependencyGraph,
equivalenceTranslator,
equivalenceDictionary,
sbmlAnalyzer,
loginformation=True,
):
"""
The second part of the Atomizer algorithm, once the lexical and stoichiometry information has been extracted
it is time to state all elements of the system in unequivocal terms of their molecule types
"""
equivalenceTranslator = {}
def selectBestCandidate(
reactant,
candidates,
dependencyGraph,
sbmlAnalyzer,
equivalenceTranslator=equivalenceTranslator,
equivalenceDictionary=equivalenceDictionary,
):
tmpCandidates = []
modifiedElementsPerCandidate = []
unevenElements = []
candidateDict = {}
for individualAnswer in candidates:
try:
tmpAnswer = []
flag = True
if len(individualAnswer) == 1 and individualAnswer[0] == reactant:
continue
modifiedElements = []
for chemical in individualAnswer:
# we cannot handle tuple naming conventions for now
if type(chemical) == tuple:
flag = False
continue
# associate elements in the candidate description with their
# modified version
rootChemical = self.resolveDependencyGraph(
dependencyGraph, chemical
)
mod = self.resolveDependencyGraph(
dependencyGraph, chemical, True
)
if mod != []:
modifiedElements.extend(mod)
for element in rootChemical:
if len(element) == 1 and type(element[0]) == tuple:
continue
if element == chemical:
tmpAnswer.append(chemical)
elif type(element) == tuple:
tmpAnswer.append(element)
else:
tmpAnswer.append(element[0])
modifiedElementsPerCandidate.append(modifiedElements)
if flag:
tmpAnswer = sorted(tmpAnswer)
tmpCandidates.append(tmpAnswer)
except atoAux.CycleError:
if loginformation:
logMess(
"ERROR:SCT221",
"{0}:{1}:Dependency cycle found when mapping molecule to candidate".format(
reactant, individualAnswer[0]
),
)
continue
# we cannot handle tuple naming conventions for now
if len(tmpCandidates) == 0:
# logMess('CRITICAL:Atomization', 'I dont know how to process these candidates and I have no \
# way to make an educated guess. Politely refusing to translate
# {0}={1}.'.format(reactant, candidates))
return None, None, None
originalTmpCandidates = deepcopy(tmpCandidates)
# if we have more than one modified element for a single reactant
# we can try to choose the one that is most similar to the original
# reactant
# FIXME:Fails if there is a double modification
newModifiedElements = {}
# modifiedElementsCounter = Counter()
modifiedElementsCounters = [Counter() for x in range(len(candidates))]
# keep track of how many times we need to modify elements in the candidate description
# FIXME: This only keeps track of the stuff in the fist candidates list
for idx, modifiedElementsInCandidate in enumerate(
modifiedElementsPerCandidate
):
for element in modifiedElementsInCandidate:
if element[0] not in newModifiedElements or element[1] == reactant:
newModifiedElements[element[0]] = element[1]
modifiedElementsCounters[idx][element[0]] += 1
# actually modify elements and store final version in tmpCandidates
# if tmpCandidates[1:] == tmpCandidates[:-1] or len(tmpCandidates) ==
# 1:
for tmpCandidate, modifiedElementsCounter in zip(
tmpCandidates, modifiedElementsCounters
):
flag = True
while flag:
flag = False
for idx, chemical in enumerate(tmpCandidate):
if modifiedElementsCounter[chemical] > 0:
modifiedElementsCounter[chemical] -= 1
tmpCandidate[idx] = newModifiedElements[chemical]
flag = True
break
candidateDict = {tuple(x): y for x, y in zip(tmpCandidates, candidates)}
bcan = []
btmp = []
borig = []
# filter out those dependencies to the 0 element
# if this is related to the zero element
if len(tmpCandidates) == 1 and tmpCandidates[0] == ["0"]:
return ["0"], None, None
for candidate, tmpcandidate, originaltmpcandidate in zip(
candidates, tmpCandidates, originalTmpCandidates
):
if originaltmpcandidate != ["0"]:
bcan.append(candidate)
btmp.append(tmpcandidate)
borig.append(originaltmpcandidate)
candidates = bcan
tmpCandidates = btmp
originalTmpCandidates = borig
if len(tmpCandidates) == 0:
return None, None, None
# FIXME: I have no idea wtf this is doing so im commenting it out. i
# think it's old code that is no longer ncessary
"""
# update candidate chemical references to their modified version if required
if len(tmpCandidates) > 1:
# temporal solution for defaulting to the first alternative
totalElements = [y for x in tmpCandidates for y in x]
elementDict = {}
for word in totalElements:
if word not in elementDict:
elementDict[word] = 0
elementDict[word] += 1
newTmpCandidates = [[]]
for element in elementDict:
if elementDict[element] % len(tmpCandidates) == 0:
newTmpCandidates[0].append(element)
#elif elementDict[element] % len(tmpCandidates) != 0 and re.search('(_|^){0}(_|$)'.format(element),reactant):
# newTmpCandidates[0].append(element)
# unevenElements.append([element])
else:
logMess('WARNING:Atomization', 'Are these actually the same? {0}={1}.'.format(reactant,candidates))
unevenElements.append(element)
flag = True
# FIXME:this should be done on newtmpCandidates instead of tmpcandidates
while flag:
flag = False
for idx, chemical in enumerate(tmpCandidates[0]):
if chemical in newModifiedElements: #and newModifiedElements[chemical] in reactant:
tmpCandidates[0][idx] = newModifiedElements[chemical]
flag = True
break
"""
# if all the candidates are about modification changes to a complex
# then try to do it through lexical analysis
if (
all([len(candidate) == 1 for candidate in candidates])
and candidates[0][0] != reactant
and len(tmpCandidates[0]) > 1
):
if reactant is not None:
pass
# analyze based on standard modifications
# lexCandidate, translationKeys, tmpequivalenceTranslator = sbmlAnalyzer.analyzeSpeciesModification(candidates[0][0], reactant, originalTmpCandidates[0])
# print '++++'
(
lexCandidate,
translationKeys,
tmpequivalenceTranslator,
) = sbmlAnalyzer.analyzeSpeciesModification2(
candidates[0][0], reactant, originalTmpCandidates[0]
)
# lexCandidate, translationKeys, tmpequivalenceTranslator = sbmlAnalyzer.analyzeSpeciesModification(candidates[0][0], reactant, tmpCandidates[0]) # FIXME: this is iffy. is it always an append modification? could be prepend
# lexCandidate = None
if lexCandidate is not None:
lexCandidate = tmpCandidates[0][
originalTmpCandidates[0].index(lexCandidate)
]
if translationKeys[0] + lexCandidate in dependencyGraph:
lexCandidateModification = translationKeys[0] + lexCandidate
else:
lexCandidateModification = lexCandidate + translationKeys[0]
for element in tmpequivalenceTranslator:
if element not in equivalenceTranslator:
equivalenceTranslator[element] = []
equivalenceTranslator[element].append(
(lexCandidate, lexCandidateModification)
)
while lexCandidate in tmpCandidates[0]:
tmpCandidates[0].remove(lexCandidate)
tmpCandidates[0].append(lexCandidateModification)
break
if lexCandidateModification not in dependencyGraph:
logMess(
"WARNING:SCT711",
"While analyzing {0}={1} we discovered equivalence {2}={3}, please verify \
this the correct behavior or provide an alternative for {0}".format(
reactant,
tmpCandidates[0],
lexCandidateModification,
lexCandidate,
),
)
dependencyGraph[lexCandidateModification] = [[lexCandidate]]
return [tmpCandidates[0]], unevenElements, candidates
else:
fuzzyCandidateMatch = None
"""
if nothing else works and we know the result is a bimolecular
complex and we know which are the basic reactants then try to
do fuzzy string matching between the two.
TODO: extend this to more than 2 molecule complexes.
"""
if len(tmpCandidates[0]) == 2:
tmpmolecules = []
tmpmolecules.extend(originalTmpCandidates[0])
tmpmolecules.extend(tmpCandidates[0])
# FIXME: Fuzzy artificial reaction is using old methods. Try to fix this
# or maybe not, no one was using it and when it was used it was wrong
# fuzzyCandidateMatch = sbmlAnalyzer.fuzzyArtificialReaction(originalTmpCandidates[0],[reactant],tmpmolecules)
fuzzyCandidateMatch = None
if fuzzyCandidateMatch is not None:
# logMess('INFO:Atomization', 'Used fuzzy string matching from {0} to {1}'.format(reactant, fuzzyCandidateMatch))
return [fuzzyCandidateMatch], unevenElements, candidates
else:
# map based on greedy matching
greedyMatch = sbmlAnalyzer.greedyModificationMatching(
reactant, dependencyGraph.keys()
)
if greedyMatch not in [-1, -2]:
return (
selectBestCandidate(
reactant,
[greedyMatch],
dependencyGraph,
sbmlAnalyzer,
)[0],
unevenElements,
candidates,
)
# last ditch attempt using straighforward lexical analysis
(
tmpDependency,
tmpEquivalence,
) = sbmlAnalyzer.findClosestModification(
[reactant],
dependencyGraph.keys(),
self.database.annotationDict,
self.database.dependencyGraph,
)
if (
reactant in tmpDependency
and tmpDependency[reactant] in tmpCandidates[0]
):
for element in tmpDependency:
if element not in dependencyGraph:
dependencyGraph[element] = tmpDependency[element]
for element in tmpEquivalence:
if element not in equivalenceDictionary:
equivalenceDictionary[element] = []
for equivalence in tmpEquivalence[element]:
if (
equivalence[0]
not in equivalenceDictionary[element]
):
equivalenceDictionary[element].append(
equivalence[0]
)
if len(tmpDependency.keys()) > 0:
return (
tmpDependency[reactant],
unevenElements,
candidates,
)
# XXX: be careful of this change. This basically forces changes to happen
# the ive no idea whats going on branch
# modificationCandidates = {}
# if modificationCandidates == {}:
activeCandidates = []
for individualCandidate in tmpCandidates:
for tmpCandidate in individualCandidate:
activeQuery = None
uniprotkey = atoAux.getURIFromSBML(
tmpCandidate, self.database.parser, ["uniprot"]
)
if len(uniprotkey) > 0:
uniprotkey = uniprotkey[0].split("/")[-1]
activeQuery = pwcm.queryActiveSite(uniprotkey, None)
if activeQuery and len(activeQuery) > 0:
activeCandidates.append(tmpCandidate)
# enter modification information to self.database
# logMess('INFO:SCT051', '{0}:Determined that {0} has an active site for modication'.format(reactant, tmpCandidate))
# return [individualCandidate], unevenElements, candidates
# we want relevant biological names, its useless if they are too short
elif len(tmpCandidate) >= 3:
# else:
individualMajorCandidates = [
y for x in candidates for y in x
]
activeQuery = pwcm.queryActiveSite(
tmpCandidate, None
)
if activeQuery and len(activeQuery) > 0:
otherMatches = [
x
for x in tmpCandidates[0]
if x in activeQuery
]
if any(
[
x
for x in otherMatches
if len(x) > len(tmpCandidate)
]
):
continue
activeCandidates.append(tmpCandidate)
# enter modification information to self.database
# logMess('INFO:SCT051', '{0}:Determined that {1} has an active site for modication'.format(reactant, tmpCandidate))
# return [individualCandidate], unevenElements, candidates
if len(activeCandidates) > 0:
if len(activeCandidates) == 1:
logMess(
"INFO:SCT051",
"{0}:Determined through uniprot active site query that {1} has an active site for modication".format(
reactant, activeCandidates[0]
),
)
if len(activeCandidates) > 1:
logMess(
"WARNING:SCT151",
"{0}:Determined through uniprot active site query that {1} have active site for modication. Defaulting to {2}".format(
reactant, activeCandidates, activeCandidates[0]
),
)
for tmpCandidate, candidate in zip(
tmpCandidates, candidates
):
fuzzyList = sbmlAnalyzer.processAdHocNamingConventions(
reactant,
candidate[0],
{},
False,
dependencyGraph.keys(),
)
if len(fuzzyList) > 0 and fuzzyList[0][1]:
if sbmlAnalyzer.testAgainstExistingConventions(
fuzzyList[0][1],
sbmlAnalyzer.namingConventions[
"modificationList"
],
):
self.database.eequivalenceTranslator2[
fuzzyList[0][1]
].append(
(
activeCandidates[0],
"{0}{1}".format(
activeCandidates, fuzzyList[0][1]
),
)
)
else:
self.database.eequivalenceTranslator2[
fuzzyList[0][1]
] = [
(
activeCandidates[0],
"{0}{1}".format(
activeCandidates[0], fuzzyList[0][1]
),
)
]
if (
"{0}{1}".format(
activeCandidates[0], fuzzyList[0][1]
)
not in dependencyGraph
):
dependencyGraph[
"{0}{1}".format(
activeCandidates[0], fuzzyList[0][1]
)
] = [[activeCandidates[0]]]
for idx, element in enumerate(tmpCandidate):
if element == activeCandidates[0]:
tmpCandidates[0][idx] = "{0}{1}".format(
activeCandidates[0], fuzzyList[0][1]
)
break
return (
[tmpCandidates[0]],
unevenElements,
candidates,
)
if len(tmpCandidates) != 1:
if not self.database.softConstraints:
if loginformation:
logMess(
"ERROR:SCT213",
"{0}:Atomizer needs user information to determine which element is being modified among components {1}={2}.".format(
reactant, candidates, tmpCandidates
),
)
# print self.database.userLabelDictionary
return None, None, None
else:
if not self.database.softConstraints:
if loginformation:
modification = (
sbmlAnalyzer.findMatchingModification(
reactant, candidates[0][0]
)
)
modification = (
modification[0] if modification else "mod"
)
logMess(
"ERROR:SCT212",
"{1}:{0}:Atomizer needs user information to determine which element is being modified among component species:{2}:{3}".format(
reactant,
candidates[0],
tmpCandidates[0],
modification,
),
)
return None, None, None
# return [tmpCandidates[0]], unevenElements
elif len(tmpCandidates) > 1:
# all candidates are equal/consistent
if all(sorted(x) == sorted(tmpCandidates[0]) for x in tmpCandidates):
tmpCandidates = [tmpCandidates[0]]
elif (
reactant in self.database.alternativeDependencyGraph
and loginformation
):
# candidates contradict each other but we have naming convention information in alternativeDependencyGraph
if not all(
sorted(x) == sorted(originalTmpCandidates[0])
for x in originalTmpCandidates
):
if loginformation:
logMess(
"INFO:SCT001",
"{0}:Using lexical analysis since stoichiometry gives non-consistent information naming({1})!=stoichiometry({2})".format(
reactant,
self.database.alternativeDependencyGraph[reactant][
0
],
tmpCandidates,
),
)
# else:
# print self.database.alternativeDependencyGraph[reactant],tmpCandidates,reactant
# logMess('INFO:Atomization', 'Using lexical analysis for species {0} = {1} since stoichiometry gave conflicting information {2}'.format(reactant,
# self.database.alternativeDependencyGraph[reactant][0],
# tmpCandidates))
# fallback to naming conventions
candidate = self.database.alternativeDependencyGraph[reactant]
# resolve naming convention candidate to its basic components
# (molecule types)
namingTmpCandidates = selectBestCandidate(
reactant, [candidate[0]], dependencyGraph, sbmlAnalyzer
)[0]
if not namingTmpCandidates:
logMess(
"ERROR:SCT211",
"{0}:{1}:{2}:Cannot converge to solution, conflicting definitions".format(
reactant, tmpCandidates, originalTmpCandidates
),
)
return None, None, None
if not any(
[
sorted(subcandidate) == sorted(namingTmpCandidates[0])
for subcandidate in tmpCandidates
]
):
if loginformation:
logMess(
"WARNING:SCT112",
"{0}:Stoichiometry analysis:{1}:results in non self-consistent definitions and conflicts with lexical analysis:{2}:Selecting lexical analysis".format(
reactant, tmpCandidates, namingTmpCandidates
),
)
atoAux.addAssumptions(
"lexicalVsstoch",
(
reactant,
("lexical", str(namingTmpCandidates)),
("stoch", str(tmpCandidates)),
("original", str(originalTmpCandidates)),
),
self.database.assumptions,
)
tmpCandidates = namingTmpCandidates
if loginformation:
self.database.alternativeDependencyGraph[
reactant
] = tmpCandidates
elif all(
sorted(x) == sorted(originalTmpCandidates[0])
for x in originalTmpCandidates
):
# the basic elements are the same but we are having trouble matching modifciations together
sortedCandidates = sorted(
[
([y for y in x if y in reactant], i)
for i, x in enumerate(tmpCandidates)
],
key=lambda z: [len(z[0]), sum([len(w) for w in z[0]])],
reverse=True,
)
if loginformation:
logMess(
"WARNING:SCT113",
"{0}:candidates:{1}:agree on the basic components but naming conventions cannot determine specific modifications. Selecting:{2}:based on longest partial match".format(
reactant,
tmpCandidates,
tmpCandidates[sortedCandidates[0][1]],
),
)
replacementCandidate = [tmpCandidates[sortedCandidates[0][1]]]
atoAux.addAssumptions(
"lexicalVsstoch",
(
reactant,
("current", str(replacementCandidate)),
(
"alternatives",
str(
[
x
for x in tmpCandidates
if x != replacementCandidate[0]
]
),
),
("original", str(originalTmpCandidates)),
),
self.database.assumptions,
)
tmpCandidates = replacementCandidate
else:
tmpCandidates2 = [
x
for x in tmpCandidates
if all(y not in x for y in self.database.constructedSpecies)
]
# if we had constructed species disregard those since they are introducing noise
if len(tmpCandidates2) > 0 and len(tmpCandidates) != len(
tmpCandidates2
):
return selectBestCandidate(
reactant, tmpCandidates2, dependencyGraph, sbmlAnalyzer
)
elif len(tmpCandidates2) == 0:
# the differences is between species that we created so its the LAE fault. Just choose one.
tmpCandidates.sort(key=len)
tmpCandidates = [tmpCandidates[0]]
else:
if loginformation:
logMess(
"ERROR:SCT211",
"{0}:{1}:{2}:Cannot converge to solution, conflicting definitions".format(
reactant, tmpCandidates, originalTmpCandidates
),
)
return None, None, None
elif (
reactant in self.database.alternativeDependencyGraph and loginformation
):
# there is one stoichionetry candidate but the naming convention
# and the stoichionetry dotn agree
if (
tmpCandidates[0]
!= self.database.alternativeDependencyGraph[reactant][0]
):
# make sure the naming convention is resolved to basic
# omponents
candidate = self.database.alternativeDependencyGraph[reactant]
# this is to avoid recursion
if loginformation:
del self.database.alternativeDependencyGraph[reactant]
namingtmpCandidates = selectBestCandidate(
reactant, [candidate[0]], dependencyGraph, sbmlAnalyzer
)[0]
# if they still disagree print error and use stoichiometry
if (
namingtmpCandidates
and tmpCandidates[0] != namingtmpCandidates[0]
):
if loginformation:
if (
namingtmpCandidates[0][0]
in self.database.constructedSpecies
):
namingTmpCandidates = tmpCandidates
else:
self.database.alternativeDependencyGraph[
reactant
] = namingtmpCandidates
logMess(
"WARNING:SCT111",
"{0}:stoichiometry analysis:{1}:conflicts with and naming conventions:{2}:Selecting lexical analysis".format(
reactant,
tmpCandidates,
self.database.alternativeDependencyGraph[
reactant
],
),
)
tmpCandidates = namingtmpCandidates
atoAux.addAssumptions(
"lexicalVsstoch",
(
reactant,
("stoch", str(tmpCandidates)),
("lexical", str(namingtmpCandidates)),
("original", str(originalTmpCandidates)),
),
self.database.assumptions,
)
for element in tmpCandidates[0]:
if element not in prunnedDependencyGraph:
# elemental species that were not used anywhere
# else but for those entries discovered through
# naming conventions
prunnedDependencyGraph[element] = []
elif not namingtmpCandidates:
if loginformation:
logMess(
"WARNING:SCT121",
"{0}:could not resolve naming({1}) into a viable compositional candidate. choosing stoichiometry({2})".format(
reactant, candidate, tmpCandidates[0]
),
)
originalCandidateName = (
candidateDict[tuple(tmpCandidates[0])]
if tuple(tmpCandidates[0]) in candidateDict
else None
)
return [tmpCandidates[0]], unevenElements, originalCandidateName
prunnedDependencyGraph = deepcopy(dependencyGraph)
tempMergedDependencyGraph = deepcopy(prunnedDependencyGraph)
for element in self.database.alternativeDependencyGraph:
if element in tempMergedDependencyGraph:
tempMergedDependencyGraph[element].extend(
self.database.alternativeDependencyGraph[element]
)
weights = self.weightDependencyGraph(tempMergedDependencyGraph)
# raise Exception
unevenElementDict = {}
for element in weights:
candidates = [x for x in prunnedDependencyGraph[element[0]]]
if len(candidates) == 1 and type(candidates[0][0]) == tuple:
prunnedDependencyGraph[element[0]] = []
if len(candidates) >= 1:
candidates, uneven, originalCandidate = selectBestCandidate(
element[0], candidates, prunnedDependencyGraph, sbmlAnalyzer
)
# except CycleError:
# candidates = None
# uneven = []
if uneven != []:
unevenElementDict[element[0]] = uneven
if candidates is None:
prunnedDependencyGraph[element[0]] = []
else:
prunnedDependencyGraph[element[0]] = [sorted(x) for x in candidates]
weights = self.weightDependencyGraph(prunnedDependencyGraph)
return prunnedDependencyGraph, weights, unevenElementDict, equivalenceTranslator
[docs] @memoize
def measureGraph(self, element, path):
"""
Calculates the weight of individual paths as the sum of the weights of the individual candidates plus the number of
candidates. The weight of an individual candidate is equal to the sum of strings contained in that candidate different
from the original reactant
>>> dummy = SCTSolver(None)
>>> dummy.measureGraph('Trash',['0'])
1
>>> dummy.measureGraph('EGF',[['EGF']])
2
>>> dummy.measureGraph('EGFR_P',[['EGFR']])
3
>>> dummy.measureGraph('EGF_EGFR', [['EGF', 'EGFR']])
4
>>> dummy.measureGraph('A_B_C',[['A', 'B_C'], ['A_B', 'C']])
7
"""
counter = 1
for x in path:
if type(x) == list or type(x) == tuple:
counter += self.measureGraph(element, x)
elif x != "0" and x != element:
counter += 1
return counter
# ASS: From my testing the iterative version is not only identical
# but also significantly faster for most models since measure
# graph doesn't get the same inputs, memoization doesn't pay off.
[docs] def measureGraph2(self, element, path):
"""
Identical to previous function but iterative instead of
recursive
"""
counter = 1
if len(path) == 1:
if type(path[0]) == list or type(path[0]) == tuple:
counter += 1
# check inside
for x in path[0]:
if x != "0" and x != element:
counter += 1
else:
if path[0] != "0" and path[0] != element:
counter += 1
else:
# it's a longer thing
counter += len(path)
# flatten and check
flat = [i for sb in path for i in sb if i]
for x in flat:
if x != "0" and x != element:
counter += 1
return counter
[docs] def weightDependencyGraph(self, dependencyGraph):
"""
Given a dependency Graph it will return a list indicating the weights of its elements
a path is calculated
>>> dummy = SCTSolver(None)
>>> dummy.weightDependencyGraph({'EGF_EGFR_2':[['EGF_EGFR','EGF_EGFR']],'EGF_EGFR':[['EGF','EGFR']],'EGFR':[],'EGF':[],\
'EGFR_P':[['EGFR']],'EGF_EGFR_2_P':[['EGF_EGFR_2']]})
[['EGF', 2], ['EGFR', 2], ['EGFR_P', 4], ['EGF_EGFR', 5], ['EGF_EGFR_2', 9], ['EGF_EGFR_2_P', 10]]
>>> dependencyGraph2 = {'A':[],'B':[],'C':[],'A_B':[['A','B']],'B_C':[['B','C']],'A_B_C':[['A_B','C'],['B_C','A']]}
>>> dummy.weightDependencyGraph(dependencyGraph2)
[['A', 2], ['C', 2], ['B', 2], ['B_C', 5], ['A_B', 5], ['A_B_C', 13]]
"""
weights = []
for element in dependencyGraph:
path = self.resolveDependencyGraph(dependencyGraph, element)
try:
path2 = self.resolveDependencyGraph(dependencyGraph, element, True)
except atoAux.CycleError:
path2 = []
# ASS: Swapping to iterative version of the function
# weight = self.measureGraph(element, path) + len(path2)
weight = self.measureGraph2(element, path) + len(path2)
weights.append([element, weight])
weights = sorted(weights, key=lambda rule: (rule[1], len(rule[0])))
return weights
# ASS: New method to make hashes from graphs. Some key points
# 1) sorting is done to ensure same graph gives the same key, consistently
# 2) python internal hashing function is used for the hashing
# 3) should be very collision proof
[docs] def make_key_from_graph(self, graph):
hashable_tuples = []
# If graph is empty just return the empty tuple result
if len(graph) == 0:
return marshal.dumps(hashable_tuples)
# So we don't modify original graph
tmpGraph = deepcopy(graph)
# This turns the graph into a traditional graph implementation
# where there are no edges that go to nodes that do not exist in the
# graph, I'm making sure every node exists in the graph itself
all_elems = set(
[item[0] for sublist in tmpGraph.values() for item in sublist if item]
)
for elem in all_elems:
try:
a = tmpGraph[elem]
except KeyError:
tmpGraph[elem] = []
# Now we should have a traditional graph implementation
# I also want to unroll every element to turn this into a hashable
# tuple of tuples type deal
for key in sorted(tmpGraph):
tmpGraph[key] = functools.reduce(
lambda x, y: x + y, sorted(tmpGraph[key]), []
)
# Now we can turn this into a proper hashable object
for key in sorted(tmpGraph):
hashable_tuples.append((key, tuple(tmpGraph[key])))
# Turn the list into tuples to it's hashable
hashable_tuples = tuple(hashable_tuples)
# return hash
return hashable_tuples.__hash__()
[docs] def resolveDependencyGraph(
self, dependencyGraph, reactant, withModifications=False
):
"""
Given a full species composition table and a reactant it will return an unrolled list of the molecule types
(elements with no dependencies that define this reactant). The classification to the original candidates is lost
since elements are fully unrolled. For getting dependencies keeping candidate consistency use consolidateDependencyGraph
instead
Args:
withModifications (bool): returns a list of the 1:1 transformation relationships found in the path to this graph
>>> dummy = SCTSolver(None)
>>> dependencyGraph = {'EGF_EGFR_2':[['EGF_EGFR','EGF_EGFR']],'EGF_EGFR':[['EGF','EGFR']],'EGFR':[],'EGF':[],\
'EGFR_P':[['EGFR']],'EGF_EGFR_2_P':[['EGF_EGFR_2']]}
>>> dependencyGraph2 = {'A':[],'B':[],'C':[],'A_B':[['A','B']],'B_C':[['B','C']],'A_B_C':[['A_B','C'],['B_C','A']]}
>>> dummy.resolveDependencyGraph(dependencyGraph, 'EGFR')
[['EGFR']]
>>> dummy.resolveDependencyGraph(dependencyGraph, 'EGF_EGFR')
[['EGF'], ['EGFR']]
>>> sorted(dummy.resolveDependencyGraph(dependencyGraph, 'EGF_EGFR_2_P'))
[['EGF'], ['EGF'], ['EGFR'], ['EGFR']]
>>> sorted(dummy.resolveDependencyGraph(dependencyGraph, 'EGF_EGFR_2_P', withModifications=True))
[('EGF_EGFR_2', 'EGF_EGFR_2_P')]
>>> sorted(dummy.resolveDependencyGraph(dependencyGraph2,'A_B_C'))
[['A'], ['A'], ['B'], ['B'], ['C'], ['C']]
"""
gkey = self.make_key_from_graph(dependencyGraph)
try:
self.dg = self.graph_map[gkey]
except KeyError:
self.graph_map[gkey] = dependencyGraph
self.dg = dependencyGraph
if self.memoizedResolver:
topCandidate = self.resolveDependencyGraphHelper(
gkey, reactant, [], withModifications
)
else:
topCandidate = self.unMemoizedResolveDependencyGraphHelper(
self.dg, reactant, [], withModifications
)
return topCandidate
[docs] @memoizeMapped
def resolveDependencyGraphHelper(
self, gkey, reactant, memory, withModifications=False
):
"""
Helper function for resolveDependencyGraph that adds a memory field to resolveDependencyGraphHelper to avoid
cyclical definitions problems
>>> dummy = SCTSolver(None)
>>> dependencyGraph = {'EGF_EGFR_2':[['EGF_EGFR','EGF_EGFR']],'EGF_EGFR':[['EGF','EGFR']],'EGFR':[],'EGF':[],\
'EGFR_P':[['EGFR']],'EGF_EGFR_2_P':[['EGF_EGFR_2']]}
>>> dependencyGraph2 = {'A':[],'B':[],'C':[],'A_B':[['A','B']],'B_C':[['B','C']],'A_B_C':[['A_B','C'],['B_C','A']]}
>>> sorted(dummy.resolveDependencyGraphHelper(dependencyGraph, 'EGF_EGFR_2_P',[]))
[['EGF'], ['EGF'], ['EGFR'], ['EGFR']]
>>> sorted(dummy.resolveDependencyGraphHelper(dependencyGraph, 'EGF_EGFR_2_P', [], withModifications=True))
[('EGF_EGFR_2', 'EGF_EGFR_2_P')]
>>> sorted(dummy.resolveDependencyGraphHelper(dependencyGraph2, 'A_B_C', []))
[['A'], ['A'], ['B'], ['B'], ['C'], ['C']]
>>> dependencyGraph3 = {'C1': [['C2']],'C2':[['C3']],'C3':[['C1']]}
>>> resolveDependencyGraphHelper(dummy.dependencyGraph3, 'C3', [], withModifications=True)
Traceback (innermost last):
File "<stdin>", line 1, in ?
CycleError
"""
result = []
# if type(reactant) == tuple:
# return []
if (
reactant not in self.dg
or self.dg[reactant] == []
or self.dg[reactant] == [[reactant]]
):
if not withModifications:
result.append([reactant])
else:
for option in self.dg[reactant]:
tmp = []
for element in option:
if element in memory and not withModifications:
result.append([element])
continue
elif element in memory:
# logMess(
# 'ERROR:SCT201', 'dependency cycle detected on {0}'.format(element))
raise atoAux.CycleError(memory)
baseElement = self.resolveDependencyGraphHelper(
gkey, element, memory + [element], withModifications
)
if baseElement is not None:
tmp.extend(baseElement)
# if not withModifications:
result.extend(tmp)
if len(option) == 1 and withModifications and option[0] != reactant:
result.append((option[0], reactant))
return result
[docs] def unMemoizedResolveDependencyGraphHelper(
self, dependencyGraph, reactant, memory, withModifications=False
):
"""
Helper function for resolveDependencyGraph that adds a memory field to resolveDependencyGraphHelper to avoid
cyclical definitions problems
>>> dummy = SCTSolver(None)
>>> dependencyGraph = {'EGF_EGFR_2':[['EGF_EGFR','EGF_EGFR']],'EGF_EGFR':[['EGF','EGFR']],'EGFR':[],'EGF':[],\
'EGFR_P':[['EGFR']],'EGF_EGFR_2_P':[['EGF_EGFR_2']]}
>>> dependencyGraph2 = {'A':[],'B':[],'C':[],'A_B':[['A','B']],'B_C':[['B','C']],'A_B_C':[['A_B','C'],['B_C','A']]}
>>> sorted(dummy.resolveDependencyGraphHelper(dependencyGraph, 'EGF_EGFR_2_P',[]))
[['EGF'], ['EGF'], ['EGFR'], ['EGFR']]
>>> sorted(dummy.resolveDependencyGraphHelper(dependencyGraph, 'EGF_EGFR_2_P', [], withModifications=True))
[('EGF_EGFR_2', 'EGF_EGFR_2_P')]
>>> sorted(dummy.resolveDependencyGraphHelper(dependencyGraph2, 'A_B_C', []))
[['A'], ['A'], ['B'], ['B'], ['C'], ['C']]
>>> dependencyGraph3 = {'C1': [['C2']],'C2':[['C3']],'C3':[['C1']]}
>>> resolveDependencyGraphHelper(dummy.dependencyGraph3, 'C3', [], withModifications=True)
Traceback (innermost last):
File "<stdin>", line 1, in ?
CycleError
"""
result = []
# if type(reactant) == tuple:
# return []
if (
reactant not in dependencyGraph
or dependencyGraph[reactant] == []
or dependencyGraph[reactant] == [[reactant]]
):
if not withModifications:
result.append([reactant])
else:
for option in dependencyGraph[reactant]:
tmp = []
for element in option:
if element in memory and not withModifications:
result.append([element])
continue
elif element in memory:
# logMess(
# 'ERROR:SCT201', 'dependency cycle detected on {0}'.format(element))
raise atoAux.CycleError(memory)
baseElement = self.unMemoizedResolveDependencyGraphHelper(
dependencyGraph, element, memory + [element], withModifications
)
if baseElement is not None:
tmp.extend(baseElement)
# if not withModifications:
result.extend(tmp)
if len(option) == 1 and withModifications and option[0] != reactant:
result.append((option[0], reactant))
return result