Source code for bionetgen.core.tools.gdiff

from multiprocessing.sharedctypes import Value
import xmltodict, copy, os, json

from bionetgen.core.utils.logging import BNGLogger


[docs]class BNGGdiff: """ Class to compare two contact maps generated by Usage: BNGGdiff(inp1, inp2).run() BNGGdiff(inp1, inp2, out1).run() BNGGdiff(inp1, inp2, out1, out2).run() BNGGdiff(inp1, inp2, out, mode="matrix").run() Arguments --------- inp1 : str path to the first contact map graphml file inp2 : str path to the second contact map graphml file out1 : str (optional) path to the output file for inp1 - inp2 graph out2 : str (optional) path to the second output file for inp2 - inp1 graph mode : str diff mode, currently available modes are "matrix" and "union" """ def __init__( self, inp1, inp2, out=None, out2=None, mode="matrix", colors={ "g1": ["#dadbfd", "#e6e7fe", "#f3f3ff"], "g2": ["#ff9e81", "#ffbfaa", "#ffdfd4"], "intersect": ["#c4ed9e", "#d9f4be", "#ecf9df"], }, app=None, ) -> None: self.app = app self.logger = BNGLogger(app=self.app) self.logger.debug( "Setting up BNGGdiff object", loc=f"{__file__} : BNGGdiff.__init__()" ) self.input = inp1 self.input2 = inp2 self.output = out self.output2 = out2 self.logger.debug( "Loading graph colors", loc=f"{__file__} : BNGGdiff.__init__()" ) if isinstance(colors, dict): self.colors = colors elif isinstance(colors, str): # load json with open(colors, "r") as f: self.colors = json.load(f) elif colors is None: self.colors = { "g1": ["#dadbfd", "#e6e7fe", "#f3f3ff"], "g2": ["#ff9e81", "#ffbfaa", "#ffdfd4"], "intersect": ["#c4ed9e", "#d9f4be", "#ecf9df"], } else: raise ValueError(f"Color type {type(colors)} not recognized") self.available_modes = ["matrix", "union"] if mode not in self.available_modes: raise ValueError( f"Mode {mode} is not a valid mode, please choose from {self.available_modes}" ) self.mode = mode self.logger.debug( f"Loading graphml files {self.input} and {self.input2}", loc=f"{__file__} : BNGGdiff.__init__()", ) with open(self.input, "r") as f: self.gdict_1 = xmltodict.parse(f.read()) with open(self.input2, "r") as f: self.gdict_2 = xmltodict.parse(f.read())
[docs] def diff_graphs( self, g1, g2, colors={ "g1": ["#dadbfd", "#e6e7fe", "#f3f3ff"], "g2": ["#c4ed9e", "#d9f4be", "#ecf9df"], "intersect": ["#c4ed9e", "#d9f4be", "#ecf9df"], }, ): """ Given two XML dictionaries (using xmltodict) of two graphml graphs, do the diff and return the difference graphml xml in dictionary format The result is g1-g2. By default g1 only stuff are colored green g2 only nodes are colored red and common elements are colored blue. These can be changed by the colors kwarg which is a dictionary with keys g1, g2 and intersect and colors are given as hexcode strings. Usage: diff_graphs(g1_dict, g2_dict) diff_graphs(g1_dict, g2_dict, colors={"g1": "#hexstr1", "g2": "#hexstr2", "intersect": "#hexstr3"}) Arguments --------- g1 : dict input dictionary of the input XML file for the first contact map g2 : dict second input dictionary of the second input XML file. colors (opt): dict (optional) A dictionary with keys "g1", "g2" and "intersect". The values are color hex strings for the colors you want for graph 1, graph 2 and the color for common elements between the two graphs. Returns ------- diff : dict A dictionary of graphs each of which is a dictionary for the XML file of the difference graph. Can be converted back to an XML file using `xmltodict` function `unparse`. Each key in the dictionary returned by this function is the intended file name for that graph. """ self.logger.debug( "Calculating diff for graphs", loc=f"{__file__} : BNGGdiff.diff_graphs()" ) # first do a deepcopy so we don't have to # manually do add boilerpate if self.mode == "matrix": self.logger.debug("Matrix mode", loc=f"{__file__} : BNGGdiff.diff_graphs()") iname1 = os.path.basename(self.input).replace(".graphml", "") iname2 = os.path.basename(self.input2).replace(".graphml", "") if self.output is None: self.output = f"{iname1}_{iname2}_diff.graphml" # set def if self.output2 is None: self.output2 = f"{iname2}_{iname1}_diff.graphml" # set def graphs = {} diff_gml, _ = self._find_diff(g1, g2, colors=colors) graphs[self.output] = diff_gml # save recolored g1 g1_recolor_name = os.path.basename(self.input).replace( ".graphml", "_recolored.graphml" ) graphs[g1_recolor_name] = self.gdict_1_recolor # save recolored g2 g2_recolor_name = os.path.basename(self.input2).replace( ".graphml", "_recolored.graphml" ) graphs[g2_recolor_name] = self.gdict_2_recolor # let's do the reverse diff_gml_2, _ = self._find_diff( g2, g1, colors={ "g1": colors["g2"], "g2": colors["g1"], "intersect": colors["intersect"], }, ) graphs[self.output2] = diff_gml_2 return graphs elif self.mode == "union": self.logger.debug("Union mode", loc=f"{__file__} : BNGGdiff.diff_graphs()") graphs = {} g1_name = os.path.basename(self.input).replace(".graphml", "") # write recolored g2 g2_name = os.path.basename(self.input2).replace(".graphml", "") if self.output is None: union_name = f"{g1_name}_{g2_name}_union.graphml" else: union_name = self.output union_gml = self._find_diff_union(g1, g2, colors=colors) graphs[union_name] = union_gml return graphs else: raise ValueError( f"Mode {self.mode} is not a valid mode, please choose from {self.available_modes}" )
def _find_diff_union( self, g1, g2, dg=None, colors={ "g1": ["#dadbfd", "#e6e7fe", "#f3f3ff"], "g2": ["#c4ed9e", "#d9f4be", "#ecf9df"], "intersect": ["#c4ed9e", "#d9f4be", "#ecf9df"], }, ): """ Usage: diff_graphs(g1_dict, g2_dict) diff_graphs(g1_dict, g2_dict, colors={"g1": "#hexstr1", "g2": "#hexstr2", "intersect": "#hexstr3"}) Arguments --------- g1 : dict input dictionary of the input XML file for the first contact map g2 : dict second input dictionary of the second input XML file. dg : dict (optional) dictionary to be modified with the difference. If not given it'll be a copy of g1 by default. colors : dict (optional) A dictionary with keys "g1", "g2" and "intersect". The values are color hex strings for the colors you want for graph 1, graph 2 and the color for common elements between the two graphs. Returns ------- diff : dict A dictionary for the XML file of the difference graph. Can be converted back to an XML file using `xmltodict` function `unparse`. """ self.logger.debug( "Calculating union diff", loc=f"{__file__} : BNGGdiff._find_diff_union()" ) # we first want to do the regular diff # we'll need to remap g2 names dg, rename_map = self._find_diff(g1, g2, dg=dg, colors=colors) # now we loop over g2 nodes and add them to dg with the right # colors to get the union version node_stack = [(["graphml"], [], g2["graphml"])] # now we can loop over nodes while len(node_stack) > 0: dnode = None curr_keys, curr_names, curr_node = node_stack.pop(-1) # let's take a look at the difference dnode = self._get_node_from_names(g1, curr_names) if dnode is None and len(curr_names) > 0: # this means we don't have this node in diff graph # we need to add it in dgnode = self._get_node_from_names(dg, curr_names) if dgnode is None: curr_dnode = self._add_node_to_graph( curr_node, dg, curr_names, colors=colors, rmap=rename_map ) else: rename_map[self._get_node_id(curr_node)] = self._get_node_id(dgnode) elif dnode is not None and len(curr_names) > 0: # we have the same node in g1 rename_map[self._get_node_id(curr_node)] = self._get_node_id(dnode) # if we have graphs in there, add the nodes to the stack if "graph" in curr_node.keys(): # there is a graph in the node, add the nodes to stack if isinstance(curr_node["graph"]["node"], list): for inode, node in enumerate(curr_node["graph"]["node"]): ckey = curr_keys + [node["@id"]] node_stack.append( (ckey, curr_names + [self._get_node_name(node)], node) ) else: ckey = curr_keys + [curr_node["graph"]["node"]["@id"]] node_stack.append( ( ckey, curr_names + [self._get_node_name(curr_node["graph"]["node"])], curr_node["graph"]["node"], ) ) # now we add edges, gotta deal with node renaming edge_ctr = len(dg["graphml"]["graph"]["edge"]) for edge in g2["graphml"]["graph"]["edge"]: copied_edge = copy.deepcopy(edge) copied_edge["@source"] = rename_map[edge["@source"]] copied_edge["@target"] = rename_map[edge["@target"]] # ensure we don't already have the same edge to_add = True for dedge in dg["graphml"]["graph"]["edge"]: # exact edge? if (dedge["@source"] == copied_edge["@source"]) and ( dedge["@target"] == copied_edge["@target"] ): to_add = False break # inverse direction? if (dedge["@target"] == copied_edge["@source"]) and ( dedge["@source"] == copied_edge["@target"] ): to_add = False break if to_add: copied_edge["@id"] = f"e{edge_ctr}" dg["graphml"]["graph"]["edge"].append(copied_edge) edge_ctr += 1 return dg def _find_diff( self, g1, g2, dg=None, colors={ "g1": ["#dadbfd", "#e6e7fe", "#f3f3ff"], "g2": ["#c4ed9e", "#d9f4be", "#ecf9df"], "intersect": ["#c4ed9e", "#d9f4be", "#ecf9df"], }, ): self.logger.debug("Calculating diff", loc=f"{__file__} : BNGGdiff._find_diff()") if dg is None: dg = copy.deepcopy(g1) # keep track of naming rename_map = {} # first find differences in nodes # FIXME: Check for single nodes before looping node_stack = [(["graphml"], [], g1["graphml"])] dnode_stack = [(["graphml"], [], dg["graphml"])] while len(node_stack) > 0: curr_keys, curr_names, curr_node = node_stack.pop(-1) curr_dkeys, curr_dnames, curr_dnode = dnode_stack.pop(-1) # write down ID map rename_map[self._get_node_id(curr_node)] = self._get_node_id(curr_node) # let's take a look at the difference g2name = None g2node = self._get_node_from_names(g2, curr_names) if len(curr_names) > 0: # let's get IDs and map them curr_name = self._get_node_name(curr_node) if not (g2node is None): # also check for name if "data" in g2node.keys(): g2name = self._get_node_name(g2node) if g2name is not None or curr_name is not None: if g2name == curr_name: # we have the node in g2, we color it appropriately self._color_node( curr_dnode, colors["intersect"][self._get_color_id(curr_dnode)], ) else: self._color_node( curr_dnode, colors["g1"][self._get_color_id(curr_dnode)], ) else: if "data" in curr_dnode.keys(): # we don't have the node in g2, we color it appropriately self._color_node( curr_dnode, colors["g1"][self._get_color_id(curr_dnode)] ) # if we have graphs in there, add the nodes to the stack if "graph" in curr_node.keys(): # there is a graph in the node, add the nodes to stack if isinstance(curr_node["graph"]["node"], list): for inode, node in enumerate(curr_node["graph"]["node"]): ckey = curr_keys + [node["@id"]] node_stack.append( (ckey, curr_names + [self._get_node_name(node)], node) ) dnode = curr_dnode["graph"]["node"][inode] dnode_stack.append( ( curr_dkeys + [dnode["@id"]], curr_dnames + [self._get_node_name(dnode)], dnode, ) ) else: ckey = curr_keys + [curr_node["graph"]["node"]["@id"]] node_stack.append( ( ckey, curr_names + [self._get_node_name(curr_node["graph"]["node"])], curr_node["graph"]["node"], ) ) dnode_stack.append( ( ckey, curr_dnames + [self._get_node_name(curr_dnode["graph"]["node"])], curr_dnode["graph"]["node"], ) ) # let's recolor both graphs self.gdict_1_recolor = self._recolor_graph(self.gdict_1, self.colors["g1"]) self.gdict_2_recolor = self._recolor_graph(self.gdict_2, self.colors["g2"]) # resize all fonts, this adds +20 self._resize_fonts(self.gdict_1, 20) self._resize_fonts(self.gdict_2, 20) self._resize_fonts(dg, 20) return dg, rename_map def _recolor_graph(self, g, color_list): self.logger.debug( "Recoloring graphs", loc=f"{__file__} : BNGGdiff._recolor_graph()" ) recol_g = copy.deepcopy(g) node_stack = [(["graphml"], [], recol_g["graphml"])] while len(node_stack) > 0: curr_keys, curr_names, curr_node = node_stack.pop(-1) if len(curr_names) > 0: self._color_node(curr_node, color_list[self._get_color_id(curr_node)]) # if we have graphs in there, add the nodes to the stack if "graph" in curr_node.keys(): # there is a graph in the node, add the nodes to stack if isinstance(curr_node["graph"]["node"], list): for inode, node in enumerate(curr_node["graph"]["node"]): ckey = curr_keys + [node["@id"]] node_stack.append( (ckey, curr_names + [self._get_node_name(node)], node) ) else: ckey = curr_keys + [curr_node["graph"]["node"]["@id"]] node_stack.append( ( ckey, curr_names + [self._get_node_name(curr_node["graph"]["node"])], curr_node["graph"]["node"], ) ) return recol_g def _resize_fonts(self, g, add_to_font): self.logger.debug( "Resizing fonts", loc=f"{__file__} : BNGGdiff._resize_fonts()" ) node_stack = [(["graphml"], [], g["graphml"])] while len(node_stack) > 0: curr_keys, curr_names, curr_node = node_stack.pop(-1) if len(curr_names) > 0: self._resize_node_font(curr_node, add_to_font) # if we have graphs in there, add the nodes to the stack if "graph" in curr_node.keys(): # there is a graph in the node, add the nodes to stack if isinstance(curr_node["graph"]["node"], list): for inode, node in enumerate(curr_node["graph"]["node"]): ckey = curr_keys + [node["@id"]] node_stack.append( (ckey, curr_names + [self._get_node_name(node)], node) ) else: ckey = curr_keys + [curr_node["graph"]["node"]["@id"]] node_stack.append( ( ckey, curr_names + [self._get_node_name(curr_node["graph"]["node"])], curr_node["graph"]["node"], ) ) def _get_node_from_names(self, g, names): if "graphml" in g.keys(): nodes = g["graphml"]["graph"]["node"] if len(names) == 0: return g["graphml"] else: nodes = g["graph"]["node"] if len(names) == 0: return g copy_names = copy.copy(names) while len(copy_names) > 0: found = False key = copy_names.pop(0) if isinstance(nodes, list): for cnode in nodes: cname = self._get_node_name(cnode) if cname == key: found = True node = cnode if "graph" in node.keys(): nodes = node["graph"]["node"] if found: break else: cname = self._get_node_name(nodes) if cname == key: found = True node = nodes if "graph" in node.keys(): nodes = node["graph"]["node"] if not found: return None return node def _get_node_properties(self, node): if isinstance(node["data"], list): found = False for datum in node["data"]: if "y:ProxyAutoBoundsNode" in datum.keys(): gnode = datum["y:ProxyAutoBoundsNode"]["y:Realizers"]["y:GroupNode"] if isinstance(gnode, list): properties = gnode[0] else: properties = gnode found = True elif "y:ShapeNode" in datum.keys(): snode = datum["y:ShapeNode"] if isinstance(snode, list): properties = snode[0] else: properties = snode found = True if not found: raise RuntimeError("Can't find properties for nodes") else: if "y:ProxyAutoBoundsNode" in node["data"].keys(): properties = node["data"]["y:ProxyAutoBoundsNode"]["y:Realizers"][ "y:GroupNode" ] elif "y:ShapeNode" in node["data"].keys(): properties = node["data"]["y:ShapeNode"] else: raise RuntimeError("Can't find properties for nodes") return properties def _get_node_name(self, node): # node['data'] can be a list if there are # multiple data types properties = self._get_node_properties(node) return properties["y:NodeLabel"]["#text"] def _get_node_fill(self, node): properties = self._get_node_properties(node) return properties["y:Fill"] def _get_node_color(self, node): return self._get_node_fill(node)["@color"] def _resize_node_font(self, node, size): properties = self._get_node_properties(node) properties["y:NodeLabel"]["@fontSize"] = str(size) def _get_font_size(self, node): properties = self._get_node_properties(node) return int(properties["y:NodeLabel"]["@fontSize"]) def _get_color_id(self, node): # FIXME: This should be fixed at bng level by attaching # an attribute to graphml node stating the type of node # instead of using colors to check the type curr_color = self._get_node_color(node) if curr_color == "#D2D2D2": # grey indicates a species cid = 0 elif curr_color == "#FFFFFF": # white indicates a component cid = 1 elif curr_color == "#FFCC00": # yellow indicates a state cid = 2 else: raise RuntimeError(f"Node color {curr_color} doesn't match known colors") return cid def _get_node_from_keylist(self, g, keylist): copy_keylist = copy.copy(keylist) gkey = copy_keylist.pop(0) if len(copy_keylist) == 0: # we only have "graphml" as key return g[gkey] # we are out of group nodes if "graph" not in g[gkey].keys(): return None # everything up to here is good, # loop over to find the node nodes = g[gkey]["graph"]["node"] while len(copy_keylist) > 0: key = copy_keylist.pop(0) found = False if isinstance(nodes, list): for cnode in nodes: if cnode["@id"] == key: found = True node = cnode try: nodes = node["graph"]["node"] except: break else: if cnode["@id"] == key: found = True node = cnode if not found: return None return node def _color_node(self, node, color) -> bool: """ This uses yEd attributes to change the color of a node arguments node : dict the dictionary version of the node XML color : dict color dictionary with g1/g2/intersect keys and color hex strings as values returns bool True if colored correctly, False if not """ try: fill = self._get_node_fill(node) fill["@color"] = color return True except Exception as e: print(f"Couldn't color node, error: {e}") return False def _get_node_text(self, node): noded = node["data"]["y:ProxyAutoBoundsNode"]["y:Realizers"] for key in noded.keys(): if "y:" in key: return noded[key]["y:NodeLabel"]["#text"] return None def _get_node_id(self, node): if "@id" in node: return node["@id"] else: return None def _set_node_id(self, node, idstr) -> bool: if "@id" in node: node["@id"] = idstr return True else: return False def _get_id_list(self, idstr) -> list: id_str_list = idstr.split("::") id_int_list = [int(x[1:]) for x in id_str_list] return id_int_list def _get_id_str(self, id_list) -> str: return "::".join([f"n{i}" for i in id_list]) def _add_node_to_graph(self, node, dg, names, colors=None, rmap={}) -> dict: node_to_add_to = self._get_node_from_names(dg, names[:-1]) copied_node = copy.deepcopy(node) if colors is not None: self._color_node(copied_node, colors["g2"][self._get_color_id(copied_node)]) if "graph" in node_to_add_to.keys(): if isinstance(node_to_add_to["graph"]["node"], list): # first do renaming node_ids = [ self._get_node_id(node) for node in node_to_add_to["graph"]["node"] ] node_lists = [self._get_id_list(idstr) for idstr in node_ids] new_id = node_lists[-1] new_id[-1] += 1 new_id = self._get_id_str(new_id) self._set_node_id(copied_node, new_id) # now we can add node_to_add_to["graph"]["node"].append(copied_node) else: # TODO: check if this is done correctly # it's a single node and we need to turn # it into a list instead copied_original_node = copy.deepcopy(node_to_add_to["graph"]["node"]) og_node_id = self._get_node_id(copied_original_node) new_id = self._get_id_list(og_node_id) new_id[-1] += 1 new_id = self._get_id_str(new_id) self._set_node_id(copied_node, new_id) nodes_to_add = [copied_original_node, copied_node] node_to_add_to["graph"]["node"] = nodes_to_add # add to rename map rmap[self._get_node_id(node)] = self._get_node_id(copied_node) # TODO: Need to get in there and rename and recolor each # node under the one we just copied if "graph" in copied_node: # let's rename the graph if "@id" in copied_node["graph"]: copied_node["graph"]["@id"] = self._get_node_id(copied_node) + ":" node_stack = [([], [], copied_node)] while len(node_stack) > 0: curr_keys, curr_names, curr_node = node_stack.pop(-1) # Do stuff here # we need to recolor, re-ID each node and add to rename map if len(curr_names) > 0: parent_node = self._get_node_from_names( copied_node, curr_names[:-1] ) if colors is not None: self._color_node( curr_node, colors["g2"][self._get_color_id(curr_node)] ) parent_node_id = self._get_node_id(parent_node) new_id = self._get_id_list(parent_node_id) curr_id = self._get_id_list(self._get_node_id(curr_node)) new_id += [curr_id[-1]] new_id = self._get_id_str(new_id) self._set_node_id(curr_node, new_id) rmap[self._get_id_str(curr_id)] = new_id # if we have graphs in there, add the nodes to the stack if "graph" in curr_node.keys(): # there is a graph in the node, add the nodes to stack if isinstance(curr_node["graph"]["node"], list): for inode, node in enumerate(curr_node["graph"]["node"]): ckey = curr_keys + [node["@id"]] node_stack.append( ( ckey, curr_names + [self._get_node_name(node)], node, ) ) else: ckey = curr_keys + [curr_node["graph"]["node"]["@id"]] node_stack.append( ( ckey, curr_names + [self._get_node_name(curr_node["graph"]["node"])], curr_node["graph"]["node"], ) ) return copied_node
[docs] def run(self) -> dict: self.logger.debug("Running", loc=f"{__file__} : BNGGdiff.run()") # Now we have the graphml files, now we do diff graphs = self.diff_graphs(self.gdict_1, self.gdict_2, self.colors) for graph_name in graphs.keys(): # now write gml as graphml with open(graph_name, "w") as f: xmltodict.unparse(graphs[graph_name], output=f, pretty=True) return graphs