Source code for pyvipr.pysb_viz.dynamic_viz

import networkx as nx
import sympy
import numpy as np
from pysb.simulator import SimulationResult
from pyvipr.pysb_viz.static_viz import PysbStaticViz
import pyvipr.util as hf
from pyvipr.util_networkx import from_networkx


[docs]class PysbDynamicViz(object): """ Class to visualize the dynamics of systems biology models defined in PySB format. Parameters ---------- simulation : pysb SimulationResult A SimulationResult instance of the model that is going to be visualized. sim_idx : Index of simulation to be visualized cmap : str or Colormap instance The colormap used to map the reaction rate values to RGBA colors. For more information visit: https://matplotlib.org/3.1.0/tutorials/colors/colormaps.html """ mach_eps = np.finfo(float).eps def __init__(self, simulation, sim_idx=0, cmap='RdBu_r'): if not isinstance(simulation, SimulationResult): raise TypeError('Argument must be a pysb SimulationResult object') self.model = simulation._model self.nsims = simulation.nsims if self.nsims == 1: self.y = simulation.all else: self.y = simulation.all[sim_idx] self.tspan = simulation.tout[sim_idx] param_values = simulation.param_values[sim_idx] self.param_dict = dict((p.name, param_values[i]) for i, p in enumerate(self.model.parameters)) self.sp_graph = None self.type_viz = '' self.cmap = cmap
[docs] def dynamic_sp_view(self, type_viz='consumption'): """ Generates a dictionary with the model dynamics data that can be converted in the Cytoscape.js JSON format Parameters ---------- type_viz : str Type of the dynamic visualization, it can be 'consumption' or 'production' Examples -------- >>> from pysb.examples.earm_1_0 import model >>> from pysb.simulator import ScipyOdeSimulator >>> import pyvipr.pysb_viz.dynamic_viz as viz >>> import numpy as np >>> tspan = np.linspace(0, 20000) >>> sim = ScipyOdeSimulator(model, tspan).run() >>> dyn_viz = viz.PysbDynamicViz(sim) >>> data = dyn_viz.dynamic_sp_view() Returns ------- dict A Dictionary Object with all nodes and edges information that can be converted into Cytoscape.js JSON to be visualized """ self.type_viz = type_viz self.sp_graph = PysbStaticViz(self.model).species_graph() self.sp_graph.graph['nsims'] = self.nsims self.sp_graph.graph['tspan'] = self.tspan.tolist() self._add_edge_node_dynamics() data = from_networkx(self.sp_graph) return data
[docs] def dynamic_sp_comp_view(self, type_viz='consumption'): """ Same as :py:meth:`dynamic_view` but the species nodes are grouped by the compartments they belong to """ self.type_viz = type_viz self.sp_graph = PysbStaticViz(self.model).compartments_data_graph() self.sp_graph.graph['nsims'] = self.nsims self.sp_graph.graph['tspan'] = self.tspan.tolist() self._add_edge_node_dynamics() data = from_networkx(self.sp_graph) return data
[docs] def dynamic_sp_comm_view(self, type_viz='consumption', random_state=None): """ Same as :py:meth:`dynamic_view` but the species nodes are grouped by the communities they belong to. Communities are obtained using the Louvain algorithm. Parameters ---------- type_viz: str Type of visualization. It can be `consumption` to see how species are being consumed or `production` to see how the species are being produced. random_state: int Seed used by the random generator in community detection Returns ------- dict A Dictionary Object with all nodes and edges information that can be converted into Cytoscape.js JSON to be visualized """ self.type_viz = type_viz self.sp_graph = PysbStaticViz(self.model).species_graph() hf.add_louvain_communities(self.sp_graph, all_levels=False, random_state=random_state) self.sp_graph.graph['nsims'] = self.nsims self.sp_graph.graph['tspan'] = self.tspan.tolist() self._add_edge_node_dynamics() data = from_networkx(self.sp_graph) return data
# def dynamic_node_dynamics(self, node): ## Node centric dynamics # all_rate_colors = {} # all_rate_sizes = {} # all_rate_abs_val = {} # # rxns_idxs = [idx for idx, rxn in enumerate(self.model.reactions_bidirectional) # if node in rxn['reactants'] or node in rxn['products']] # rxns_matrix = self.matrix_bidirectional_rates(rxns_idxs=rxns_idxs)- # for idx, reaction in enumerate(self.model.reactions_bidirectional[rxns_idxs]): # reactants = set(reaction['reactants']) # products = set(reaction['products']) # for s in reactants: # for p in products: # edges_id = ('s{0}'.format(s), 's{0}'.format(p)) # all_rate_colors[edges_id] = rate_colors # all_rate_sizes[edges_id] = rate_sizes.tolist() # all_rate_abs_val[edges_id] = rxns_matrix[idx].tolist() def _add_edge_node_dynamics(self): """ Add the edge size and color data as well as node color and values data Returns ------- """ edge_sizes, edge_colors, edge_qtips = self.edges_colors_sizes() nx.set_edge_attributes(self.sp_graph, edge_colors, 'edge_color') nx.set_edge_attributes(self.sp_graph, edge_sizes, 'edge_size') nx.set_edge_attributes(self.sp_graph, edge_qtips, 'edge_qtip') node_abs, node_rel = self.node_data() nx.set_node_attributes(self.sp_graph, node_abs, 'abs_value') nx.set_node_attributes(self.sp_graph, node_rel, 'rel_value')
[docs] def matrix_bidirectional_rates(self, rxns_idxs=None): """ Obtains the values of the reaction rates at all the time points of the simulation Returns ------- np.ndarray Array with the reaction rates values """ if rxns_idxs is not None: rxns_bidirectional = self.model.reactions_bidirectional[rxns_idxs] else: rxns_bidirectional = self.model.reactions_bidirectional rxns_matrix = np.zeros((len(rxns_bidirectional), len(self.tspan))) # Calculates matrix of bidirectional reaction rates for idx, reac in enumerate(rxns_bidirectional): rate_reac = reac['rate'] for p in self.param_dict: rate_reac = rate_reac.subs(p, self.param_dict[p]) variables = [atom for atom in rate_reac.atoms(sympy.Symbol)] args = [0] * len(variables) # arguments to put in the lambdify function for idx2, va in enumerate(variables): if str(va).startswith('__'): args[idx2] = self.y[str(va)] else: args[idx2] = self.param_dict[va.name] func = sympy.lambdify(variables, rate_reac, modules=dict(sqrt=np.lib.scimath.sqrt)) react_rate = func(*args) rxns_matrix[idx] = react_rate return rxns_matrix
[docs] def edges_colors_sizes(self): """ This function obtains values for the size and color of the edges in the network. The color is a representation of the percentage of flux going through an edge. The edge size is a representation of the relative value of the reaction normalized to the maximum value that the edge can attain during the whole simulation. Returns ------- tuple Three dictionaries. The first one contains the information of the edge sizes at all time points. The second one contains the information of the edge colors at all time points. The third one contains the values of the reaction rates at all time points. """ all_rate_colors = {} all_rate_sizes = {} all_rate_abs_val = {} rxns_matrix = self.matrix_bidirectional_rates() all_products = [rx['products'] for rx in self.model.reactions_bidirectional] all_reactants = [rx['reactants'] for rx in self.model.reactions_bidirectional] sp_imp = range(len(self.model.species)) # species indices # TODO: Make sure that product nodes that also have more than one incoming node, don't overwrite # previous nodes edges attrs for sp in sp_imp: # Getting the indices of the reactions_bidirectional in which sp is either # a reactant or a product, respectively rxns_idx_reactant = [i for i, rx in enumerate(all_reactants) if sp in rx] rxns_idx_product = [i for i, rx in enumerate(all_products) if sp in rx] repeated_rxn = set(rxns_idx_reactant) & set(rxns_idx_product) # Obtain indices of reactions in which sp is only a product rxns_idx_product = [c for c in rxns_idx_product if c not in repeated_rxn] # Getting arrays to obtain producing and consuming reactions rxn_val_pos = np.where( np.concatenate((rxns_matrix[rxns_idx_reactant] > 0, rxns_matrix[rxns_idx_product] < 0)), rxns_matrix[rxns_idx_reactant + rxns_idx_product], 0) # Reactants are consumed rxn_val_neg = np.where( np.concatenate((rxns_matrix[rxns_idx_reactant] < 0, rxns_matrix[rxns_idx_product] > 0)), rxns_matrix[rxns_idx_reactant + rxns_idx_product], 0) # Reactants are produced rxn_val_pos = np.abs(rxn_val_pos) rxn_val_neg = np.abs(rxn_val_neg) # An array with the total of reaction rates at each time point rxn_val_pos_total = rxn_val_pos.sum(axis=0) rxn_val_neg_total = rxn_val_neg.sum(axis=0) # The max and min of the group of reaction rates. It can be used to assign size to edges # based in the max and min of the groups of reaction rates across all time points # if rxn_val_pos.size != 0: # rxn_val_pos_max = np.amax(rxn_val_pos) + self.mach_eps # # if rxn_val_neg.size != 0: # rxn_val_neg_max = np.amax(rxn_val_neg) + self.mach_eps sp_rr_dom = rxns_matrix[rxns_idx_reactant] sp_prr_dom = rxns_matrix[rxns_idx_product] if self.type_viz == 'consumption': for rx_idx, rx in enumerate(rxns_idx_reactant): products = all_products[rx] for p in products: # Normalizing by the total flux in a node # We ignore division by zero and invalid value in less than function warnings # as they are handled later on by converting nan values to 0 with np.errstate(divide='ignore', invalid='ignore'): react_rate_color = rxns_matrix[rx] / rxn_val_pos_total rxn_neg_idx = np.where(react_rate_color < 0) react_rate_color[rxn_neg_idx] = ( rxns_matrix[rx][rxn_neg_idx] / rxn_val_neg_total[rxn_neg_idx]) np.nan_to_num(react_rate_color, copy=False) rate_colors = hf.f2hex_edges(react_rate_color, cmap=self.cmap) rxn_eps = sp_rr_dom[rx_idx] + self.mach_eps # rxns_matrix[rx] + self.mach_eps react_rate_size = np.zeros(len(rxn_eps)) tro_pos_idx = np.where(rxn_eps > 0)[0] tro_neg_idx = np.where(rxn_eps < 0)[0] if tro_neg_idx.size != 0: rxn_neg_max = np.amax(np.abs(rxn_eps[tro_neg_idx])) react_rate_size[tro_neg_idx] = np.array([rxn_neg/rxn_neg_max for rxn_neg in np.abs(rxn_eps[tro_neg_idx])]) if tro_pos_idx.size != 0: rxn_pos_max = np.amax(rxn_eps[tro_pos_idx]) react_rate_size[tro_pos_idx] = np.array([rxn_pos/rxn_pos_max for rxn_pos in rxn_eps[tro_pos_idx]]) rate_sizes = hf.range_normalization(react_rate_size, min_x=0, max_x=1) edges_id = ('s{0}'.format(sp), 's{0}'.format(p)) all_rate_colors[edges_id] = rate_colors all_rate_sizes[edges_id] = rate_sizes.tolist() all_rate_abs_val[edges_id] = rxns_matrix[rx].tolist() elif self.type_viz == 'production': for rp_idx, rp in enumerate(rxns_idx_product): reactants = all_reactants[rp] for r in reactants: with np.errstate(divide='ignore', invalid='ignore'): preact_rate_color = rxns_matrix[rp] / rxn_val_neg_total prxns_neg_idx = np.where(preact_rate_color < 0) preact_rate_color[prxns_neg_idx] = ( rxns_matrix[rp][prxns_neg_idx] / rxn_val_pos_total[prxns_neg_idx]) np.nan_to_num(preact_rate_color, copy=False) prate_colors = hf.f2hex_edges(preact_rate_color, cmap=self.cmap) prxn_eps = sp_prr_dom[rp_idx] + self.mach_eps preact_rate_size = np.zeros(len(prxn_eps)) ptro_pos_idx = np.where(prxn_eps > 0)[0] ptro_neg_idx = np.where(prxn_eps < 0)[0] if ptro_neg_idx.size != 0: prxn_neg_max = np.amax(np.abs(prxn_eps[ptro_neg_idx])) preact_rate_size[ptro_neg_idx] = np.array([prxn_neg/prxn_neg_max for prxn_neg in np.abs(prxn_eps[ptro_neg_idx])]) if ptro_pos_idx.size != 0: prxn_pos_max = np.amax(prxn_eps[ptro_pos_idx]) preact_rate_size[ptro_pos_idx] = np.array([prxn_pos/prxn_pos_max for prxn_pos in prxn_eps[ptro_pos_idx]]) prate_sizes = hf.range_normalization(preact_rate_size, min_x=0, max_x=1) edges_id = ('s{0}'.format(r), 's{0}'.format(sp)) all_rate_colors[edges_id] = prate_colors all_rate_sizes[edges_id] = prate_sizes.tolist() all_rate_abs_val[edges_id] = rxns_matrix[rp].tolist() else: raise ValueError('The type of process can only be `consumption` or `production`') return all_rate_sizes, all_rate_colors, all_rate_abs_val
[docs] def node_data(self): """ Obtains the species concentration values and the relative concentration compared with the maximum concentration across all time points Returns ------- tuple Two dictionaries. The first one has the species concentration. The second one has the relative species concentrations """ node_absolute = {} node_relative = {} for sp in range(len(self.model.species)): sp_absolute = np.absolute(self.y['__s{}'.format(sp)]) sp_relative = (sp_absolute / sp_absolute.max()) * 100 node_absolute['s{0}'.format(sp)] = sp_absolute.tolist() node_relative['s{0}'.format(sp)] = sp_relative.tolist() # all_nodes_values = pandas.DataFrame(all_rate_colors) return node_absolute, node_relative