Source code for bionetgen.atomizer.atomizer.detectOntology

# -*- coding: utf-8 -*-
"""
Created on Sat Oct 19 15:19:35 2013

@author: proto
"""
import pprint
import difflib
from collections import Counter
import json
import ast
import pickle
import os
from os import listdir
from os.path import isfile, join
import numpy as np

try:
    from utils.util import pmemoize as memoize
    import libsbml
except ModuleNotFoundError:
    import sys

    sys.path.append("..")
    from bionetgen.atomizer.utils.util import pmemoize as memoize
    import libsbml


[docs]@memoize def levenshtein(s1, s2): l1 = len(s1) l2 = len(s2) # matrix = [range(l1 + 1)] * (l2 + 1) # for zz in range(l2 + 1): # matrix[zz] = range(zz,zz + l1 + 1) matrix = [list(range(x, x + l1 + 1)) for x in range(0, l2 + 1)] for zz in range(0, l2): for sz in range(0, l1): z = matrix[zz][sz] if s1[sz] == s2[zz] else matrix[zz][sz] + 1 matrix[zz + 1][sz + 1] = min( matrix[zz + 1][sz] + 1, matrix[zz][sz + 1] + 1, z ) return matrix[l2][l1]
[docs]def getDifferences(scoreMatrix, speciesName, threshold): """ given a list of strings and a scoreMatrix, return the list of difference between those strings with a levenshtein difference of less than threshold returns: namePairs: list of tuples containing strings with distance <2 differenceList: list of differences between the tuples in namePairs """ differenceList = [] namePairs = [] for idx in range(0, len(scoreMatrix)): for idx2 in range(0, len(scoreMatrix[idx])): if scoreMatrix[idx][idx2] <= threshold and idx < idx2: if len(speciesName[idx]) < len(speciesName[idx2]): namePairs.append([speciesName[idx], speciesName[idx2]]) else: namePairs.append([speciesName[idx2], speciesName[idx]]) for pair in namePairs: if len(pair[1]) < len(pair[0]): difference = difflib.ndiff(pair[1].lower(), pair[0].lower()) else: difference = difflib.ndiff(pair[0].lower(), pair[1].lower()) tmp = [] for diff in difference: if diff[0] == " ": continue tmp.append(diff) differenceList.append(tuple(tmp)) return namePairs, differenceList
[docs]def loadOntology(ontologyFile): if os.path.isfile(ontologyFile): tmp = {} with open(ontologyFile, "r") as fp: ontology = json.load(fp) for element in ontology["patterns"]: tmp[ast.literal_eval(element)] = ontology["patterns"][element] ontology["patterns"] = tmp return ontology else: tmp = {} ontology = { "modificationList": ["Phosporylation", "Double-Phosporylation"], "reactionSite": ["phospho"], "reactionState": ["P", "PP"], "definitions": [{"rsi": 0, "rst": 0}, {"rsi": 0, "rst": 1}], "patterns": { "('+ _', '+ P', '+ P')": "Double-Phosporylation", "('+ P', '+ P', '+ _')": "Double-Phosporylation", "('+ p', '+ p')": "Double-Phosporylation", }, } for element in ontology["patterns"]: tmp[ast.literal_eval(element)] = ontology["patterns"][element] ontology["patterns"] = tmp return ontology
[docs]def findLongestSubstring(speciesA, speciesB): sm = difflib.SequenceMatcher(a=speciesA, b=speciesB) longestMatch = sm.find_longest_match(0, len(speciesA), 0, len(speciesB)) longestMatchStr = speciesA[longestMatch[0] : longestMatch[0] + longestMatch[2]] # trim around '_' separators for more natural name cuts trimIndex = [0, 0] _index = [i for i, x in enumerate(speciesA) if x == "_"] if "_" in speciesA: if not any([longestMatch[0] + longestMatch[2] == x for x in _index]): for idx in range(0, len(_index)): if idx + 1 == len(_index): trimIndex[1] = _index[idx] elif idx == 0: continue elif _index[idx] - longestMatch[0] > 0: trimIndex[1] = _index[idx - 1] longestMatchStr = speciesA[trimIndex[0] : trimIndex[1]] return longestMatchStr
# import concurrent.futures
[docs]def stringToSet(species, idx, scoreRow, speciesName): for idx2, species2 in enumerate(speciesName): if species == species2 or scoreRow[idx2] != 0: continue scoreRow[idx2] = levenshtein(species, speciesName[idx2]) return idx, scoreRow
[docs]def defineEditDistanceMatrix3(speciesName, similarityThreshold=4, parallel=False): namePairs = [] differenceList = [] for species in speciesName: closeMatches = difflib.get_close_matches(species, speciesName) closeMatches = [x for x in closeMatches if len(x) < len(species)] for match in closeMatches: difference = [x for x in difflib.ndiff(match, species)] if len([x for x in difference if "-" in x]) == 0: namePairs.append([match, species]) differenceList.append(tuple([x for x in difference if "+" in x])) return namePairs, differenceList, ""
[docs]def defineEditDistanceMatrix(speciesName, similarityThreshold=4, parallel=False): """ obtains a distance matrix and a pairs of elements that are close in distance, along with the proposed differences """ differenceCounter = Counter() futures = [] scoreMatrix = np.zeros((len(speciesName), len(speciesName))) scoreMatrix2 = np.zeros((len(speciesName), len(speciesName))) counter = range(0, len(speciesName)) """ for idx,species in enumerate(speciesName): futures = [] with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures.append(executor.submit(stringToSet, species,idx,scoreMatrix[idx],speciesName)) for future in concurrent.futures.as_completed(futures): idx3,row = future.result() counter.remove(idx3) print(len(counter)) scoreMatrix[idx3] = row """ for idx, species in enumerate(speciesName): for idx2, species2 in enumerate(speciesName): if species == species2 or scoreMatrix2[idx][idx2] != 0: continue # comparison = difflib.SequenceMatcher(None,speciesName[idx],speciesName[idx2]).quick_ratio() comparison = levenshtein(speciesName[idx], speciesName[idx2]) scoreMatrix2[idx][idx2] = comparison scoreMatrix2[idx2][idx] = scoreMatrix2[idx][idx2] namePairs, differenceList = getDifferences( scoreMatrix2, speciesName, similarityThreshold ) differenceCounter.update(differenceList) return namePairs, differenceList, differenceCounter
[docs]def analyzeNamingConventions( speciesName, ontologyFile, ontologyDictionary={}, similarityThreshold=4 ): patternClassification = {} pairClassification = {} # ontology = loadOntology(ontologyFile) ontology = ontologyFile finalDifferenceCounter = Counter() namePairs, differenceList, differenceCounter = defineEditDistanceMatrix( speciesName, similarityThreshold ) for element in differenceCounter: if element in ontology["patterns"]: finalDifferenceCounter[element] = differenceCounter[element] patternClassification[element] = ontology["patterns"][element] for pair, difference in zip(namePairs, differenceList): if difference in patternClassification: if patternClassification[difference] not in pairClassification: pairClassification[patternClassification[difference]] = [] pairClassification[patternClassification[difference]].append(tuple(pair)) keys = finalDifferenceCounter.keys() tmp = {key: "" for key in keys} for key in tmp: tmp[key] = ontology["patterns"][key] keys = ["".join(x).replace("+ ", "") for x in keys] # print(ontology) return pairClassification, keys, tmp
[docs]def main(fileName): reader = libsbml.SBMLReader() document = reader.readSBMLFromFile(fileName) model = document.getModel() speciesName = [] for species in model.getListOfSpecies(): speciesName.append(species.getName()) return analyzeNamingConventions( speciesName, "reactionDefinitions/namingConventions.json" )
[docs]def databaseAnalysis(directory, outputFile): xmlFiles = [ f for f in listdir("./" + directory) if isfile(join("./" + directory, f)) and f.endswith("xml") ] differenceCounter = Counter() fileDict = {} for xml in xmlFiles: print(xml) reader = libsbml.SBMLReader() document = reader.readSBMLFromFile(directory + xml) model = document.getModel() if model == None: continue speciesName = [] for species in model.getListOfSpecies(): speciesName.append(species.getName()) scoreMatrix = np.zeros((len(speciesName), len(speciesName))) for idx, species in enumerate(speciesName): for idx2, species2 in enumerate(speciesName): if species == species2 or scoreMatrix[idx][idx2] != 0: continue scoreMatrix[idx][idx2] = levenshtein( speciesName[idx], speciesName[idx2] ) scoreMatrix[idx2][idx] = scoreMatrix[idx][idx2] namePairs, differenceList = getDifferences(scoreMatrix, speciesName, 3) differenceCounter.update(differenceList) for key, element in zip(differenceList, namePairs): if key == (): continue if key not in fileDict: # differenceDict[tuple(key)] = set() fileDict[tuple(key)] = set() # differenceDict[tuple(key)].add((xml,tuple(element))) fileDict[tuple(key)].add(xml) fileCounter = Counter() for element in fileDict: fileCounter[element] = len(fileDict[element]) with open(outputFile, "wb") as f: pickle.dump(differenceCounter, f) # pickle.dump(differenceDict,f) pickle.dump(fileCounter, f)
""" try: import pandas as pd except ImportError: pd = None def analyzeTrends(inputFile): with open(inputFile,'rb') as f: counter = pickle.load(f) #dictionary = pickle.load(f) fileCounter = pickle.load(f) totalCounter = Counter() for element in counter: totalCounter[element] = counter[element] * fileCounter[element]/469.0 keys = totalCounter.most_common(200) #keys = keys[1:] pp = pprint.PrettyPrinter(indent=4) pp.pprint(keys) data = pd.DataFrame(keys) #print(data.to_excel('name.xls')) #for element in keys: # print('------------------') # print(element) # pp.pprint(dictionary[element[0]]) """ if __name__ == "__main__": bioNumber = 19 main("XMLExamples/curated/BIOMD%010i.xml" % bioNumber) # databaseAnalysis('XMLExamples/non_curated/', 'non_ontologies.dump') # analyzeTrends('ontologies.dump')