import numpy as np
import tellurium
import networkx as nx
from pyvipr.tellurium_viz.static_viz import TelluriumStaticViz
from pyvipr.util_networkx import from_networkx
import pyvipr.util as hf
try:
import tesbml as libsbml
except ImportError:
import libsbml
[docs]class TelluriumDynamicViz(object):
"""
class to visualize the dynamics of systems biology models defined in sbml or antimony format
Parameters
----------
sim_model: tellurium roadrunner
A roadrunner instance after a simulation
cmap : str or Colormap instance
The colormap used to map the reaction rate values to RGBA colors. For more information
visit: https://matplotlib.org/3.1.0/tutorials/colors/colormaps.html
"""
mach_eps = np.finfo(float).eps
def __init__(self, sim_model, cmap='RdBu_r'):
if isinstance(sim_model, tellurium.roadrunner.extended_roadrunner.ExtendedRoadRunner):
model_sbml = sim_model.getSBML()
self.doc = libsbml.readSBMLFromString(model_sbml)
self.model = self.doc.getModel()
else:
raise Exception('Model must be a roadrunner instance')
required_selections = ['time'] + [s.getId() for s in self.model.species] + \
[r.getId() for r in self.model.reactions]
sim_selections = sim_model.selections
# Check that the simulation selections include all species and reactions rate values
if not all(x in required_selections for x in sim_selections):
raise ValueError('To use this visualization you must use '
'this simulator selection \n {0}'.format(required_selections))
self.y = sim_model.getSimulationData()
self.sp_graph = None
self.type_viz = ''
self.cmap = cmap
[docs] def dynamic_sp_view(self, type_viz='consumption'):
"""
Generates a dictionary with the model dynamics data that can be converted in the Cytoscape.js JSON format
Parameters
----------
type_viz : str
Type of the dynamic visualization, it can be 'consumption' or 'production'
Returns
-------
dict
A Dictionary Object with all nodes and edges information that
can be converted into Cytoscape.js JSON to be visualized
"""
self.type_viz = type_viz
self.sp_graph = TelluriumStaticViz(self.model).species_graph()
self.sp_graph.graph['nsims'] = 1
self.sp_graph.graph['tspan'] = self.y['time'].tolist()
self._add_edge_node_dynamics()
data = from_networkx(self.sp_graph)
return data
def matrix_reaction_rates(self):
rxns_matrix = np.zeros((len(self.model.reactions), len(self.y['time'])))
# Calculates matrix of bidirectional reaction rates
for idx, reac in enumerate(self.model.reactions):
rxns_matrix[idx] = self.y[reac.getId()]
return rxns_matrix
[docs] def edges_colors_sizes(self):
"""
This function obtains values for the size and color of the edges in the network.
The color is a representation of the percentage of flux going through an edge.
The edge size is a representation of the relative value of the reaction normalized to the maximum value that
the edge can attain during the whole simulation.
Returns
-------
tuple
Three dictionaries. The first one contains the information of the edge sizes at all time points.
The second one contains the information of the edge colors at all time points.
The third one contains the values of the reaction rates at all time points.
"""
all_rate_colors = {}
all_rate_sizes = {}
all_rate_abs_val = {}
rxns_matrix = self.matrix_reaction_rates()
all_products = [[s.getSpecies() for s in rx.getListOfProducts()] for rx in self.model.reactions]
all_reactants = [[s.getSpecies() for s in rx.getListOfReactants()] for rx in self.model.reactions]
for sp in self.model.species:
rxns_idx_reactant = [i for i, rx in enumerate(all_reactants) if sp.getId() in rx]
rxns_idx_product = [i for i, rx in enumerate(all_products) if sp.getId() in rx]
repeated_rxn = set(rxns_idx_reactant) & set(rxns_idx_product)
# Obtain indices of reactions in which sp is only a product
rxns_idx_product = [c for c in rxns_idx_product if c not in repeated_rxn]
# Getting arrays to obtain producing and consuming reactions
rxn_val_pos = np.where(
np.concatenate((rxns_matrix[rxns_idx_reactant] > 0, rxns_matrix[rxns_idx_product] < 0)),
rxns_matrix[rxns_idx_reactant + rxns_idx_product], 0) # Reactants are consumed
rxn_val_neg = np.where(
np.concatenate((rxns_matrix[rxns_idx_reactant] < 0, rxns_matrix[rxns_idx_product] > 0)),
rxns_matrix[rxns_idx_reactant + rxns_idx_product], 0) # Reactants are produced
rxn_val_pos = np.abs(rxn_val_pos)
rxn_val_neg = np.abs(rxn_val_neg)
# An array with the total of reaction rates at each time point
rxn_val_pos_total = rxn_val_pos.sum(axis=0)
rxn_val_neg_total = rxn_val_neg.sum(axis=0)
# The max and min of the group of reaction rates. It can be used to assign size to edges
# based in the max and min of the groups of reaction rates across all time points
# if rxn_val_pos.size != 0:
# rxn_val_pos_max = np.amax(rxn_val_pos) + self.mach_eps
#
# if rxn_val_neg.size != 0:
# rxn_val_neg_max = np.amax(rxn_val_neg) + self.mach_eps
if self.type_viz == 'consumption':
sp_rr_dom = rxns_matrix[rxns_idx_reactant]
for rx_idx, rx in enumerate(rxns_idx_reactant):
products = all_products[rx]
for p in products:
# Normalizing by the total flux in a node
# We ignore division by zero and invalid value in less than function warnings
# as they are handled later on by converting nan values to 0
with np.errstate(divide='ignore', invalid='ignore'):
react_rate_color = rxns_matrix[rx] / rxn_val_pos_total
rxn_neg_idx = np.where(react_rate_color < 0)
react_rate_color[rxn_neg_idx] = (
rxns_matrix[rx][rxn_neg_idx] / rxn_val_neg_total[rxn_neg_idx])
np.nan_to_num(react_rate_color, copy=False)
rate_colors = hf.f2hex_edges(react_rate_color, cmap=self.cmap)
rxn_eps = sp_rr_dom[rx_idx] + self.mach_eps # rxns_matrix[rx] + self.mach_eps
react_rate_size = np.zeros(len(rxn_eps))
tro_pos_idx = np.where(rxn_eps > 0)[0]
tro_neg_idx = np.where(rxn_eps < 0)[0]
if tro_neg_idx.size != 0:
rxn_neg_max = np.amax(np.abs(rxn_eps[tro_neg_idx]))
react_rate_size[tro_neg_idx] = np.array([rxn_neg/rxn_neg_max for
rxn_neg in np.abs(rxn_eps[tro_neg_idx])])
if tro_pos_idx.size != 0:
rxn_pos_max = np.amax(rxn_eps[tro_pos_idx])
react_rate_size[tro_pos_idx] = np.array([rxn_pos/rxn_pos_max for
rxn_pos in rxn_eps[tro_pos_idx]])
rate_sizes = hf.range_normalization(react_rate_size, min_x=0, max_x=1)
edges_id = (sp.getId(), p)
all_rate_colors[edges_id] = rate_colors
all_rate_sizes[edges_id] = rate_sizes.tolist()
all_rate_abs_val[edges_id] = rxns_matrix[rx].tolist()
elif self.type_viz == 'production':
sp_prr_dom = rxns_matrix[rxns_idx_product]
for rp_idx, rp in enumerate(rxns_idx_product):
reactants = all_reactants[rp]
for r in reactants:
with np.errstate(divide='ignore', invalid='ignore'):
preact_rate_color = rxns_matrix[rp] / rxn_val_neg_total
prxns_neg_idx = np.where(preact_rate_color < 0)
preact_rate_color[prxns_neg_idx] = (
rxns_matrix[rp][prxns_neg_idx] / rxn_val_pos_total[prxns_neg_idx])
np.nan_to_num(preact_rate_color, copy=False)
prate_colors = hf.f2hex_edges(preact_rate_color, cmap=self.cmap)
prxn_eps = sp_prr_dom[rp_idx] + self.mach_eps
preact_rate_size = np.zeros(len(prxn_eps))
ptro_pos_idx = np.where(prxn_eps > 0)[0]
ptro_neg_idx = np.where(prxn_eps < 0)[0]
if ptro_neg_idx.size != 0:
prxn_neg_max = np.amax(np.abs(prxn_eps[ptro_neg_idx]))
preact_rate_size[ptro_neg_idx] = np.array([prxn_neg/prxn_neg_max for
prxn_neg in np.abs(prxn_eps[ptro_neg_idx])])
if ptro_pos_idx.size != 0:
prxn_pos_max = np.amax(prxn_eps[ptro_pos_idx])
preact_rate_size[ptro_pos_idx] = np.array([prxn_pos/prxn_pos_max for
prxn_pos in prxn_eps[ptro_pos_idx]])
prate_sizes = hf.range_normalization(preact_rate_size, min_x=0, max_x=1)
edges_id = (r, sp.getId())
all_rate_colors[edges_id] = prate_colors
all_rate_sizes[edges_id] = prate_sizes.tolist()
all_rate_abs_val[edges_id] = rxns_matrix[rp].tolist()
else:
raise ValueError('The type of process can only be `consumption` or `production`')
return all_rate_sizes, all_rate_colors, all_rate_abs_val
[docs] def node_data(self):
"""
Obtains the species concentration values and the relative concentration
compared with the maximum concentration across all time points
Returns
-------
tuple
Two dictionaries. The first one has the species concentration. The
second one has the relative species concentrations
"""
node_absolute = {}
node_relative = {}
for sp in self.model.species:
sp_absolute = np.absolute(self.y[sp.getId()])
sp_relative = (sp_absolute / sp_absolute.max()) * 100
node_absolute[sp.getId()] = sp_absolute.tolist()
node_relative[sp.getId()] = sp_relative.tolist()
# all_nodes_values = pandas.DataFrame(all_rate_colors)
return node_absolute, node_relative
def _add_edge_node_dynamics(self):
"""
Add the edge size and color data as well as node color and values data
Returns
-------
"""
edge_sizes, edge_colors, edge_qtips = self.edges_colors_sizes()
nx.set_edge_attributes(self.sp_graph, edge_colors, 'edge_color')
nx.set_edge_attributes(self.sp_graph, edge_sizes, 'edge_size')
nx.set_edge_attributes(self.sp_graph, edge_qtips, 'edge_qtip')
node_abs, node_rel = self.node_data()
nx.set_node_attributes(self.sp_graph, node_abs, 'abs_value')
nx.set_node_attributes(self.sp_graph, node_rel, 'rel_value')