Source code for bionetgen.core.utils.utils

import os, subprocess
from bionetgen.core.exc import BNGPerlError
from distutils import spawn

from bionetgen.core.utils.logging import BNGLogger


[docs]class ActionList: """ Class to store everything related to BioNetGen actions. This class stores information about the list of BNG actions, their arguments as well as their syntax information. The class also provides an argument parser using pyparsing. Usage: ActionList() Attributes ---------- normal_types : list no_setter_syntax : list actions without => syntax square_braces : list actions that use [] syntax in them before_model : list actions that are supposed to come before the model possible_types : list the list of all possible actions arg_dict : dict dictionary that contains every argument of every action irregular_args : dict actions that have arguments that aren't simply arg=>val Methods --------- is_before_model : bool checks if a given action is supposed to come before `begin model` define_parser : None sets ActionList.action_parser to a pyparsing parser that's capable of splitting up any BNG action into parts """ def __init__(self): # these are all the action types, categorized # by their argument syntax self.normal_types = [ "generate_network", "generate_hybrid_model", "simulate", "simulate_ode", "simulate_ssa", "simulate_pla", "simulate_nf", "parameter_scan", "bifurcate", "readFile", "writeFile", "writeModel", "writeNetwork", "writeXML", "writeSBML", "writeMfile", "writeCPYfile", "writeMexfile", "writeMDL", "visualize", ] self.no_setter_syntax = [ "setConcentration", "addConcentration", "setParameter", "quit", "setModelName", "substanceUnits", "version", "setOption", ] self.square_braces = [ "saveConcentrations", "resetConcentrations", "resetParameters", "saveParameters", ] # remember what's written before models self.before_model = [ "setModelName", "substanceUnits", "version", "setOption", ] self.possible_types = ( self.normal_types + self.no_setter_syntax + self.square_braces ) # Use dictionary to keep track of all possible args (and types?) for each action self.arg_dict = {} # arg_dict["action"] = ["arg1", "arg2", "etc."] # normal_types self.arg_dict["generate_network"] = [ "prefix", "suffix", "verbose", "overwrite", "print_iter", "max_agg", "max_iter", "max_stoich", "TextReaction", "TextSpecies", ] self.arg_dict["generate_hybrid_model"] = [ "prefix", "suffix", "verbose", "overwrite", "actions", "execute", "safe", ] self.arg_dict["simulate"] = [ "prefix", "suffix", "verbose", "method", "argfile", "continue", "t_start", "t_end", "n_steps", "n_output_steps", "sample_times", "output_step_interval", "max_sim_steps", "stop_if", "print_on_stop", "print_end", "print_net", "save_progress", "print_CDAT", "print_functions", "netfile", "seed", # TODO: arguments for a method called "psa" that is not documented in # https://docs.google.com/spreadsheets/d/1Co0bPgMmOyAFxbYnGCmwKzoEsY2aUCMtJXQNpQCEUag/ "poplevel", "check_product_scale", ] self.arg_dict["simulate_ode"] = [ "prefix", "suffix", "verbose", "argfile", "continue", "t_start", "t_end", "n_steps", "n_output_steps", "sample_times", "output_step_interval", "max_sim_steps", "stop_if", "print_on_stop", "print_end", "print_net", "save_progress", "print_CDAT", "print_functions", "netfile", "seed", "atol", "rtol", "sparse", "steady_state", ] self.arg_dict["simulate_ssa"] = [ "prefix", "suffix", "verbose", "argfile", "continue", "t_start", "t_end", "n_steps", "n_output_steps", "sample_times", "output_step_interval", "max_sim_steps", "stop_if", "print_on_stop", "print_end", "print_net", "save_progress", "print_CDAT", "print_functions", "netfile", "seed", ] self.arg_dict["simulate_pla"] = [ "prefix", "suffix", "verbose", "argfile", "continue", "t_start", "t_end", "n_steps", "n_output_steps", "sample_times", "output_step_interval", "max_sim_steps", "stop_if", "print_on_stop", "print_end", "print_net", "save_progress", "print_CDAT", "print_functions", "netfile", "seed", "pla_config", "pla_output", ] self.arg_dict["simulate_nf"] = [ "prefix", "suffix", "verbose", "argfile", "continue", "t_start", "t_end", "n_steps", "n_output_steps", "sample_times", "output_step_interval", "max_sim_steps", "stop_if", "print_on_stop", "print_end", "print_net", "save_progress", "print_CDAT", "print_functions", "netfile", "seed", "complex", "nocslf", "notf", "binary_output", "gml", "equil", "get_final_state", "utl", "param", ] self.arg_dict["simulate"] = list( set( self.arg_dict["simulate"] + self.arg_dict["simulate_ode"] + self.arg_dict["simulate_ssa"] + self.arg_dict["simulate_pla"] + self.arg_dict["simulate_nf"] ) ) self.arg_dict["parameter_scan"] = [ "prefix", "suffix", "verbose", "method", "argfile", "continue", "t_start", "t_end", "n_steps", "n_output_steps", "sample_times", "output_step_interval", "max_sim_steps", "stop_if", "print_on_stop", "print_end", "print_net", "save_progress", "print_CDAT", "print_functions", "netfile", "seed", "parameter", "par_min", "par_max", "n_scan_pts", "log_scale", "par_scan_vals", "reset_conc", ] self.arg_dict["parameter_scan"] = list( set(self.arg_dict["parameter_scan"] + self.arg_dict["simulate"]) ) self.arg_dict["bifurcate"] = [ "prefix", "suffix", "verbose", "method", "argfile", "continue", "t_start", "t_end", "n_steps", "n_output_steps", "sample_times", "output_step_interval", "max_sim_steps", "stop_if", "print_on_stop", "print_end", "print_net", "save_progress", "print_CDAT", "print_functions", "netfile", "seed", "parameter", "par_min", "par_max", "n_scan_pts", "log_scale", "par_scan_vals", ] self.arg_dict["bifurcate"] = list( set(self.arg_dict["bifurcate"] + self.arg_dict["parameter_scan"]) ) self.arg_dict["bifurcate"].remove("reset_conc") self.arg_dict["readFile"] = ["file", "blocks", "atomize", "skip_actions"] self.arg_dict["writeFile"] = [ "format", "prefix", "suffix", "evaluate_expressions", "include_model", "include_network", "overwrite", "pretty_formatting", "TextReaction", "TextSpecies", ] self.arg_dict["writeModel"] = [ "format", "prefix", "suffix", "evaluate_expressions", "include_model", "include_network", "overwrite", "pretty_formatting", "TextReaction", "TextSpecies", ] self.arg_dict["writeNetwork"] = [ "format", "prefix", "suffix", "evaluate_expressions", "include_model", "include_network", "overwrite", "pretty_formatting", "TextReaction", "TextSpecies", ] self.arg_dict["writeXML"] = [ "format", "prefix", "suffix", "evaluate_expressions", "include_model", "include_network", "overwrite", "pretty_formatting", "TextReaction", "TextSpecies", ] self.arg_dict["writeSBML"] = ["prefix", "suffix"] self.arg_dict["writeMfile"] = [ "prefix", "suffix", "t_start", "t_end", "n_steps", "atol", "rtol", "max_step", "bdf", "maxOrder", "stats", ] self.arg_dict["writeCPYfile"] = [ "prefix", "suffix", "t_start", "t_end", "n_steps", "atol", "rtol", "max_step", "bdf", "maxOrder", "stats", ] self.arg_dict["writeMexfile"] = [ "prefix", "suffix", "t_start", "t_end", "n_steps", "atol", "rtol", "max_step", "max_num_steps", "max_err_test_fails", "max_conv_fails", "stiff", "sparse", ] self.arg_dict["writeMDL"] = ["prefix", "suffix"] self.arg_dict["visualize"] = [ "type", "help", "suffix", "each", "background", "groups", "collapse", "filter", "level", "textonly", "opts", "ruleNames", ] # no_setter_syntax self.arg_dict["setConcentration"] = [] self.arg_dict["addConcentration"] = [] self.arg_dict["setParameter"] = [] self.arg_dict["saveParameters"] = [] self.arg_dict["quit"] = None self.arg_dict["setModelName"] = [] self.arg_dict["substanceUnits"] = [] self.arg_dict["version"] = [] self.arg_dict["setOption"] = [] # square_braces self.arg_dict["saveConcentrations"] = [] self.arg_dict["resetConcentrations"] = [] self.arg_dict["resetParameters"] = [] # irregular arg types self.irregular_args = {} self.irregular_args["max_stoich"] = "dict" self.irregular_args["actions"] = "list" self.irregular_args["sample_times"] = "list" self.irregular_args["par_scan_vals"] = "list" self.irregular_args["blocks"] = "list" self.irregular_args["opts"] = "list"
[docs] def is_before_model(self, action_name): if action_name in self.before_model: return True return False
[docs] def define_parser(self): ## Define action grammar import pyparsing as pp # base_name = pp.Word(pp.alphas, pp.alphanums + "_") action_name = base_name # dquote_word = pp.dblQuotedString squote_word = pp.sglQuotedString quote_word = dquote_word ^ squote_word # all action argument types # TODO: deal w/ zero argument list list_arg = "[" + pp.delimitedList(quote_word) + "]" # arg_type_bool = pp.Word("0") ^ pp.Word("1") arg_type_int = pp.Word(pp.nums) arg_type_float = pp.Word(pp.nums + ".") arg_type_expr = pp.Word( pp.nums + "." + "+" + "-" + "e" + "E" + "(" + ")" + "/" + "*" + "^" ) arg_type_list = "[" + pp.delimitedList((quote_word ^ arg_type_float)) + "]" arg_type_string = quote_word # curly_arg_token = quote_word + "=>" + arg_type_int # TODO: handle 0 case arg_type_curly = "{" + pp.delimitedList(curly_arg_token) + "}" arg_types = ( arg_type_bool ^ arg_type_int ^ arg_type_float ^ arg_type_list ^ arg_type_list ^ arg_type_string ^ arg_type_curly ^ arg_type_expr ) # one_arg = quote_word two_arg = quote_word + "," + (arg_type_expr ^ quote_word) # single_arg = base_name + "=>" + arg_types # reg_arg_full = "{" + pp.Optional(pp.delimitedList(single_arg)) + "}" # reg_action_tk = ( action_name + "(" + reg_arg_full + ")" + pp.Optional(";") + pp.stringEnd ) two_arg_action_tk = ( action_name + "(" + quote_word + "," + pp.SkipTo(")" + pp.Optional(";") + pp.stringEnd) + ")" + pp.Optional(";") + pp.stringEnd ) one_arg_action_tk = ( action_name + "(" + pp.Optional(one_arg) + ")" + pp.Optional(";") + pp.stringEnd ) list_arg_action_tk = ( action_name + "(" + list_arg + ")" + pp.Optional(";") + pp.stringEnd ) full_action_tk = ( reg_action_tk ^ list_arg_action_tk ^ two_arg_action_tk ^ one_arg_action_tk ) ## Action grammar done self.action_parser = full_action_tk
[docs]def find_BNG_path(BNGPATH=None): """ A simple function finds the path to BNG2.pl from * Environment variable * Assuming it's under PATH * Given optional path as argument Usage: test_bngexec(path) test_bngexec() Arguments --------- BNGPATH : str (optional) path to the folder that contains BNG2.pl """ # TODO: Figure out how to use the BNG2.pl if it's set # in the PATH variable. Solution: set os.environ BNGPATH # and make everything use that route # Let's keep up the idea we pull this path from the environment if BNGPATH is None: try: BNGPATH = os.environ["BNGPATH"] except: pass # if still none, try pulling it from cmd line if BNGPATH is None: bngexec = "BNG2.pl" if test_bngexec(bngexec): # print("BNG2.pl seems to be working") # get the source of BNG2.pl BNGPATH = spawn.find_executable("BNG2.pl") BNGPATH, _ = os.path.split(BNGPATH) else: bngexec = os.path.join(BNGPATH, "BNG2.pl") if not test_bngexec(bngexec): RuntimeError("BNG2.pl is not working") return BNGPATH, bngexec
[docs]def test_perl(app=None, perl_path=None): """ Test if perl is working Arguments --------- perl_path : str (optional) path to the folder that contains perl """ logger = BNGLogger(app=app) logger.debug("Checking if perl is installed.", loc=f"{__file__} : test_perl()") # find path to perl binary if perl_path is None: perl_path = spawn.find_executable("perl") if perl_path is None: raise BNGPerlError # check if perl is actually working command = [perl_path, "-v"] rc, _ = run_command(command) if rc != 0: raise BNGPerlError
[docs]def test_bngexec(bngexec): """ A simple function that test if BNG2.pl given runs Usage: test_bngexec(path) Arguments --------- bngexec : str path to BNG2.pl to test """ command = ["perl", bngexec] rc, _ = run_command(command) if rc == 0: return True else: return False
[docs]def run_command(command, suppress=True, timeout=None): """ A convenience function to run a given command. The command should be given as a list of values e.g. ['command', 'arg1', 'arg2'] etc. Suppress kwarg suppresses all output from the command and timeout kwarg allows you to set a time period in seconds after which the command will be killed. """ if timeout is not None: if suppress: # I am unsure how to do both timeout and the live polling of stdo rc = subprocess.run( command, timeout=timeout, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return rc.returncode, rc else: # I am unsure how to do both timeout and the live polling of stdo rc = subprocess.run(command, timeout=timeout, capture_output=True) return rc.returncode, rc else: if suppress: process = subprocess.Popen( command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, bufsize=-1, ) rc = process.wait() return rc, process else: process = subprocess.Popen(command, stdout=subprocess.PIPE, encoding="utf8") out = [] while True: output = process.stdout.readline() if output == "" and process.poll() is not None: break if output: o = output.strip() out.append(o) print(o) rc = process.wait() return rc, out