Source code for holoviews.element.graphs

from types import FunctionType

import param
import numpy as np

from ..core import Dimension, Dataset, Element2D
from ..core.dimension import redim
from ..core.util import max_range
from ..core.operation import Operation
from .chart import Points
from .path import Path
from .util import split_path, pd

try:
    from datashader.layout import LayoutAlgorithm as ds_layout
except:
    ds_layout = None


[docs]class redim_graph(redim): """ Extension for the redim utility that allows re-dimensioning Graph objects including their nodes and edgepaths. """ def __call__(self, specs=None, **dimensions): redimmed = super(redim_graph, self).__call__(specs, **dimensions) new_data = (redimmed.data,) if self.parent.nodes: new_data = new_data + (self.parent.nodes.redim(specs, **dimensions),) if self.parent._edgepaths: new_data = new_data + (self.parent.edgepaths.redim(specs, **dimensions),) return redimmed.clone(new_data)
def circular_layout(nodes): N = len(nodes) circ = np.pi/N*np.arange(N)*2 x = np.cos(circ) y = np.sin(circ) return (x, y, nodes)
[docs]class layout_nodes(Operation): """ Accepts a Graph and lays out the corresponding nodes with the supplied networkx layout function. If no layout function is supplied uses a simple circular_layout function. Also supports LayoutAlgorithm function provided in datashader layouts. """ only_nodes = param.Boolean(default=False, doc=""" Whether to return Nodes or Graph.""") layout = param.Callable(default=None, doc=""" A NetworkX layout function""") kwargs = param.Dict(default={}, doc=""" Keyword arguments passed to the layout function.""") def _process(self, element, key=None): if self.p.layout and isinstance(self.p.layout, FunctionType): import networkx as nx graph = nx.from_edgelist(element.array([0, 1])) positions = self.p.layout(graph, **self.p.kwargs) nodes = [tuple(pos)+(idx,) for idx, pos in sorted(positions.items())] else: source = element.dimension_values(0, expanded=False) target = element.dimension_values(1, expanded=False) nodes = np.unique(np.concatenate([source, target])) if self.p.layout: import pandas as pd df = pd.DataFrame({'index': nodes}) nodes = self.p.layout(df, element.dframe(), **self.p.kwargs) nodes = nodes[['x', 'y', 'index']] else: nodes = circular_layout(nodes) if self.p.only_nodes: return Nodes(nodes) return element.clone((element.data, nodes))
[docs]class Graph(Dataset, Element2D): """ Graph is high-level Element representing both nodes and edges. A Graph may be defined in an abstract form representing just the abstract edges between nodes and optionally may be made concrete by supplying a Nodes Element defining the concrete positions of each node. If the node positions are supplied the EdgePaths (defining the concrete edges) can be inferred automatically or supplied explicitly. The constructor accepts regular columnar data defining the edges or a tuple of the abstract edges and nodes, or a tuple of the abstract edges, nodes, and edgepaths. """ group = param.String(default='Graph', constant=True) kdims = param.List(default=[Dimension('start'), Dimension('end')], bounds=(2, 2)) def __init__(self, data, kdims=None, vdims=None, **params): if isinstance(data, tuple): data = data + (None,)* (3-len(data)) edges, nodes, edgepaths = data else: edges, nodes, edgepaths = data, None, None if nodes is not None: node_info = None if isinstance(nodes, Nodes): pass elif not isinstance(nodes, Dataset) or nodes.ndims == 3: nodes = Nodes(nodes) else: node_info = nodes nodes = None else: node_info = None if edgepaths is not None and not isinstance(edgepaths, EdgePaths): edgepaths = EdgePaths(edgepaths) self._nodes = nodes self._edgepaths = edgepaths super(Graph, self).__init__(edges, kdims=kdims, vdims=vdims, **params) if self._nodes is None and node_info: nodes = self.nodes.clone(datatype=['pandas', 'dictionary']) for d in node_info.dimensions(): nodes = nodes.add_dimension(d, len(nodes.vdims), node_info.dimension_values(d), vdim=True) self._nodes = nodes self._validate() self.redim = redim_graph(self, mode='dataset') def _validate(self): if self._edgepaths is None: return mismatch = [] for kd1, kd2 in zip(self.nodes.kdims, self.edgepaths.kdims): if kd1 != kd2: mismatch.append('%s != %s' % (kd1, kd2)) if mismatch: raise ValueError('Ensure that the first two key dimensions on ' 'Nodes and EdgePaths match: %s' % ', '.join(mismatch)) npaths = len(self._edgepaths.data) nedges = len(self) if nedges != npaths: mismatch = True if npaths == 1: edges = self.edgepaths.split()[0] vals = edges.dimension_values(0) npaths = len(np.where(np.isnan(vals))[0]) if not np.isnan(vals[-1]): npaths += 1 mismatch = npaths != nedges if mismatch: raise ValueError('Ensure that the number of edges supplied ' 'to the Graph (%d) matches the number of ' 'edgepaths (%d)' % (nedges, npaths)) def clone(self, data=None, shared_data=True, new_type=None, *args, **overrides): if data is None: data = (self.data, self.nodes) if self._edgepaths: data = data + (self.edgepaths,) overrides['plot_id'] = self._plot_id elif not isinstance(data, tuple): data = (data, self.nodes) if self._edgepaths: data = data + (self.edgepaths,) return super(Graph, self).clone(data, shared_data, new_type, *args, **overrides)
[docs] def select(self, selection_specs=None, selection_mode='edges', **selection): """ Allows selecting data by the slices, sets and scalar values along a particular dimension. The indices should be supplied as keywords mapping between the selected dimension and value. Additionally selection_specs (taking the form of a list of type.group.label strings, types or functions) may be supplied, which will ensure the selection is only applied if the specs match the selected object. Selecting by a node dimensions selects all edges and nodes that are connected to the selected nodes. To select only edges between the selected nodes set the selection_mode to 'nodes'. """ selection = {dim: sel for dim, sel in selection.items() if dim in self.dimensions('ranges')+['selection_mask']} if (selection_specs and not any(self.matches(sp) for sp in selection_specs) or not selection): return self index_dim = self.nodes.kdims[2].name dimensions = self.kdims+self.vdims node_selection = {index_dim: v for k, v in selection.items() if k in self.kdims} nodes = self.nodes.select(**dict(selection, **node_selection)) selection = {k: v for k, v in selection.items() if k in dimensions} # Compute mask for edges if nodes were selected on nodemask = None if len(nodes) != len(self.nodes): xdim, ydim = dimensions[:2] indices = list(nodes.dimension_values(2, False)) if selection_mode == 'edges': mask1 = self.interface.select_mask(self, {xdim.name: indices}) mask2 = self.interface.select_mask(self, {ydim.name: indices}) nodemask = (mask1 | mask2) nodes = self.nodes else: nodemask = self.interface.select_mask(self, {xdim.name: indices, ydim.name: indices}) # Compute mask for edge selection mask = None if selection: mask = self.interface.select_mask(self, selection) # Combine masks if nodemask is not None: if mask is not None: mask &= nodemask else: mask = nodemask # Apply edge mask if mask is not None: data = self.interface.select(self, mask) if not np.all(mask): new_graph = self.clone((data, nodes)) source = new_graph.dimension_values(0, expanded=False) target = new_graph.dimension_values(1, expanded=False) unique_nodes = np.unique(np.concatenate([source, target])) nodes = new_graph.nodes[:, :, list(unique_nodes)] paths = None if self._edgepaths: edgepaths = self._split_edgepaths paths = edgepaths.clone(edgepaths.interface.select_paths(edgepaths, mask)) if len(self._edgepaths.data) == 1: paths = paths.clone([paths.dframe() if pd else paths.array()]) else: data = self.data paths = self._edgepaths return self.clone((data, nodes, paths))
@property def _split_edgepaths(self): if len(self) == len(self._edgepaths.data): return self._edgepaths else: return self._edgepaths.clone(split_path(self._edgepaths)) def range(self, dimension, data_range=True): if self.nodes and dimension in self.nodes.dimensions(): node_range = self.nodes.range(dimension, data_range) if self._edgepaths: path_range = self._edgepaths.range(dimension, data_range) return max_range([node_range, path_range]) return node_range return super(Graph, self).range(dimension, data_range) def dimensions(self, selection='all', label=False): dimensions = super(Graph, self).dimensions(selection, label) if selection == 'ranges': if self._nodes: node_dims = self.nodes.dimensions(selection, label) else: node_dims = Nodes.kdims+Nodes.vdims if label in ['name', True, 'short']: node_dims = [d.name for d in node_dims] elif label in ['long', 'label']: node_dims = [d.label for d in node_dims] return dimensions+node_dims return dimensions @property def nodes(self): """ Computes the node positions the first time they are requested if no explicit node information was supplied. """ if self._nodes is None: self._nodes = layout_nodes(self, only_nodes=True) return self._nodes @property def edgepaths(self): """ Returns the fixed EdgePaths or computes direct connections between supplied nodes. """ if self._edgepaths: return self._edgepaths paths = [] for start, end in self.array(self.kdims): start_ds = self.nodes[:, :, start] end_ds = self.nodes[:, :, end] if not len(start_ds) or not len(end_ds): raise ValueError('Could not find node positions for all edges') sx, sy = start_ds.array(start_ds.kdims[:2]).T ex, ey = end_ds.array(end_ds.kdims[:2]).T paths.append([(sx[0], sy[0]), (ex[0], ey[0])]) return EdgePaths(paths, kdims=self.nodes.kdims[:2])
[docs] @classmethod def from_networkx(cls, G, layout_function, nodes=None, **kwargs): """ Generate a HoloViews Graph from a networkx.Graph object and networkx layout function. Any keyword arguments will be passed to the layout function. """ positions = layout_function(G, **kwargs) edges = G.edges() if nodes: idx_dim = nodes.kdims[-1].name xs, ys = zip(*[v for k, v in sorted(positions.items())]) indices = list(nodes.dimension_values(idx_dim)) edges = [(src, tgt) for (src, tgt) in edges if src in indices and tgt in indices] nodes = nodes.select(**{idx_dim: [eid for e in edges for eid in e]}).sort() nodes = nodes.add_dimension('x', 0, xs) nodes = nodes.add_dimension('y', 1, ys).clone(new_type=Nodes) else: nodes = Nodes([tuple(pos)+(idx,) for idx, pos in sorted(positions.items())]) return cls((edges, nodes))
[docs]class Nodes(Points): """ Nodes is a simple Element representing Graph nodes as a set of Points. Unlike regular Points, Nodes must define a third key dimension corresponding to the node index. """ kdims = param.List(default=[Dimension('x'), Dimension('y'), Dimension('index')], bounds=(3, 3)) group = param.String(default='Nodes', constant=True)
[docs]class EdgePaths(Path): """ EdgePaths is a simple Element representing the paths of edges connecting nodes in a graph. """ group = param.String(default='EdgePaths', constant=True)