Source code for holoviews.core.data.dictionary

from collections import OrderedDict
from itertools import compress

try:
    import itertools.izip as zip
except ImportError:
    pass

import numpy as np

from .interface import Interface, DataError
from ..dimension import Dimension
from ..element import Element
from ..dimension import OrderedDict as cyODict
from ..ndmapping import NdMapping, item_check
from .. import util


[docs]class DictInterface(Interface): """ Interface for simple dictionary-based dataset format. The dictionary keys correspond to the column (i.e dimension) names and the values are collections representing the values in that column. """ types = (dict, OrderedDict, cyODict) datatype = 'dictionary' @classmethod def dimension_type(cls, dataset, dim): name = dataset.get_dimension(dim, strict=True).name values = dataset.data[name] return type(values) if np.isscalar(values) else values.dtype.type @classmethod def init(cls, eltype, data, kdims, vdims): odict_types = (OrderedDict, cyODict) if kdims is None: kdims = eltype.kdims if vdims is None: vdims = eltype.vdims dimensions = [d.name if isinstance(d, Dimension) else d for d in kdims + vdims] if isinstance(data, tuple): data = {d: v for d, v in zip(dimensions, data)} elif util.is_dataframe(data) and all(d in data for d in dimensions): data = {d: data[d] for d in dimensions} elif isinstance(data, np.ndarray): if data.ndim == 1: if eltype._auto_indexable_1d and len(kdims)+len(vdims)>1: data = np.column_stack([np.arange(len(data)), data]) else: data = np.atleast_2d(data).T data = {k: data[:,i] for i,k in enumerate(dimensions)} elif isinstance(data, list) and data == []: data = OrderedDict([(d, []) for d in dimensions]) elif isinstance(data, list) and np.isscalar(data[0]): data = {dimensions[0]: np.arange(len(data)), dimensions[1]: data} elif (isinstance(data, list) and isinstance(data[0], tuple) and len(data[0]) == 2 and any(isinstance(v, tuple) for v in data[0])): dict_data = zip(*((util.wrap_tuple(k)+util.wrap_tuple(v)) for k, v in data)) data = {k: np.array(v) for k, v in zip(dimensions, dict_data)} # Ensure that interface does not consume data of other types # with an iterator interface elif not any(isinstance(data, tuple(t for t in interface.types if t is not None)) for interface in cls.interfaces.values()): data = {k: v for k, v in zip(dimensions, zip(*data))} elif (isinstance(data, dict) and not any(isinstance(v, np.ndarray) for v in data.values()) and not any(d in data or any(d in k for k in data if isinstance(k, tuple)) for d in dimensions)): # For data where both keys and values are dimension values # e.g. {('A', 'B'): (1, 2)} (should consider deprecating) dict_data = sorted(data.items()) k, v = dict_data[0] if len(util.wrap_tuple(k)) != len(kdims) or len(util.wrap_tuple(v)) != len(vdims): raise ValueError("Dictionary data not understood, should contain a column " "per dimension or a mapping between key and value dimension " "values.") dict_data = zip(*((util.wrap_tuple(k)+util.wrap_tuple(v)) for k, v in dict_data)) data = {k: np.array(v) for k, v in zip(dimensions, dict_data)} if not isinstance(data, cls.types): raise ValueError("DictInterface interface couldn't convert data.""") elif isinstance(data, dict): unpacked = [] for d, vals in data.items(): if isinstance(d, tuple): vals = np.asarray(vals) if not vals.ndim == 2 and vals.shape[1] == len(d): raise ValueError("Values for %s dimensions did not have " "the expected shape.") for i, sd in enumerate(d): unpacked.append((sd, vals[:, i])) else: unpacked.append((d, vals if np.isscalar(vals) else np.asarray(vals))) if not cls.expanded([d[1] for d in unpacked if not np.isscalar(d[1])]): raise ValueError('DictInterface expects data to be of uniform shape.') if isinstance(data, odict_types): data.update(unpacked) else: data = OrderedDict(unpacked) return data, {'kdims':kdims, 'vdims':vdims}, {} @classmethod def validate(cls, dataset, vdims=True): dim_types = 'key' if vdims else 'all' dimensions = dataset.dimensions(dim_types, label='name') not_found = [d for d in dimensions if d not in dataset.data] if not_found: raise DataError('Following columns specified as dimensions ' 'but not found in data: %s' % not_found, cls) lengths = [(dim, 1 if np.isscalar(dataset.data[dim]) else len(dataset.data[dim])) for dim in dimensions] if len({l for d, l in lengths if l > 1}) > 1: lengths = ', '.join(['%s: %d' % l for l in sorted(lengths)]) raise DataError('Length of columns must be equal or scalar, ' 'columns have lengths: %s' % lengths, cls)
[docs] @classmethod def unpack_scalar(cls, dataset, data): """ Given a dataset object and data in the appropriate format for the interface, return a simple scalar. """ if len(data) != 1: return data key = list(data.keys())[0] if len(data[key]) == 1 and key in dataset.vdims: return data[key][0]
@classmethod def isscalar(cls, dataset, dim): name = dataset.get_dimension(dim, strict=True).name values = dataset.data[name] return np.isscalar(values) or len(np.unique(values)) == 1 @classmethod def shape(cls, dataset): return cls.length(dataset), len(dataset.data), @classmethod def length(cls, dataset): lengths = [len(vals) for vals in dataset.data.values() if not np.isscalar(vals)] return max(lengths) if lengths else 1 @classmethod def array(cls, dataset, dimensions): if not dimensions: dimensions = dataset.dimensions(label='name') else: dimensions = [dataset.get_dimensions(d).name for d in dimensions] arrays = [dataset.data[dim.name] for dim in dimensions] return np.column_stack([np.full(len(dataset), arr) if np.isscalar(arr) else arr for arr in arrays]) @classmethod def add_dimension(cls, dataset, dimension, dim_pos, values, vdim): dim = dimension.name if isinstance(dimension, Dimension) else dimension data = list(dataset.data.items()) data.insert(dim_pos, (dim, values)) return OrderedDict(data) @classmethod def redim(cls, dataset, dimensions): all_dims = dataset.dimensions() renamed = [] for k, v in dataset.data.items(): if k in dimensions: k = dimensions[k].name elif k in all_dims: k = dataset.get_dimension(k).name renamed.append((k, v)) return OrderedDict(renamed) @classmethod def concat(cls, dataset_objs): cast_objs = cls.cast(dataset_objs) cols = set(tuple(c.data.keys()) for c in cast_objs) if len(cols) != 1: raise Exception("In order to concatenate, all Dataset objects " "should have matching set of columns.") concatenated = OrderedDict() for column in cols.pop(): concatenated[column] = np.concatenate([obj[column] for obj in cast_objs]) return concatenated @classmethod def sort(cls, dataset, by=[], reverse=False): by = [dataset.get_dimension(d).name for d in by] if len(by) == 1: sorting = cls.values(dataset, by[0]).argsort() else: arrays = [dataset.dimension_values(d) for d in by] sorting = util.arglexsort(arrays) return OrderedDict([(d, v if np.isscalar(v) else (v[sorting][::-1] if reverse else v[sorting])) for d, v in dataset.data.items()]) @classmethod def values(cls, dataset, dim, expanded=True, flat=True): dim = dataset.get_dimension(dim).name values = dataset.data.get(dim) if np.isscalar(values): if not expanded: return np.array([values]) values = np.full(len(dataset), values) else: if not expanded: return util.unique_array(values) values = np.array(values) return values @classmethod def reindex(cls, dataset, kdims, vdims): dimensions = [dataset.get_dimension(d).name for d in kdims+vdims] return OrderedDict([(d, dataset.dimension_values(d)) for d in dimensions]) @classmethod def groupby(cls, dataset, dimensions, container_type, group_type, **kwargs): # Get dimensions information dimensions = [dataset.get_dimension(d) for d in dimensions] kdims = [kdim for kdim in dataset.kdims if kdim not in dimensions] vdims = dataset.vdims # Update the kwargs appropriately for Element group types group_kwargs = {} group_type = dict if group_type == 'raw' else group_type if issubclass(group_type, Element): group_kwargs.update(util.get_param_values(dataset)) group_kwargs['kdims'] = kdims group_kwargs.update(kwargs) # Find all the keys along supplied dimensions keys = (tuple(dataset.data[d.name] if np.isscalar(dataset.data[d.name]) else dataset.data[d.name][i] for d in dimensions) for i in range(len(dataset))) # Iterate over the unique entries applying selection masks grouped_data = [] for unique_key in util.unique_iterator(keys): mask = cls.select_mask(dataset, dict(zip(dimensions, unique_key))) group_data = OrderedDict(((d.name, dataset.data[d.name] if np.isscalar(dataset.data[d.name]) else dataset.data[d.name][mask]) for d in kdims+vdims)) group_data = group_type(group_data, **group_kwargs) grouped_data.append((unique_key, group_data)) if issubclass(container_type, NdMapping): with item_check(False): return container_type(grouped_data, kdims=dimensions) else: return container_type(grouped_data) @classmethod def select(cls, dataset, selection_mask=None, **selection): if selection_mask is None: selection_mask = cls.select_mask(dataset, selection) indexed = cls.indexed(dataset, selection) data = OrderedDict((k, v if np.isscalar(v) else v[selection_mask]) for k, v in dataset.data.items()) if indexed and len(list(data.values())[0]) == 1 and len(dataset.vdims) == 1: value = data[dataset.vdims[0].name] return value if np.isscalar(value) else value[0] return data @classmethod def sample(cls, dataset, samples=[]): mask = False for sample in samples: sample_mask = True if np.isscalar(sample): sample = [sample] for i, v in enumerate(sample): name = dataset.get_dimension(i).name sample_mask &= (dataset.data[name]==v) mask |= sample_mask return {k: col if np.isscalar(col) else np.array(col)[mask] for k, col in dataset.data.items()} @classmethod def aggregate(cls, dataset, kdims, function, **kwargs): kdims = [dataset.get_dimension(d, strict=True).name for d in kdims] vdims = dataset.dimensions('value', label='name') groups = cls.groupby(dataset, kdims, list, OrderedDict) aggregated = OrderedDict([(k, []) for k in kdims+vdims]) for key, group in groups: key = key if isinstance(key, tuple) else (key,) for kdim, val in zip(kdims, key): aggregated[kdim].append(val) for vdim, arr in group.items(): if vdim in dataset.vdims: if np.isscalar(arr): reduced = arr elif isinstance(function, np.ufunc): reduced = function.reduce(arr, **kwargs) else: reduced = function(arr, **kwargs) aggregated[vdim].append(reduced) return aggregated @classmethod def iloc(cls, dataset, index): rows, cols = index scalar = False if np.isscalar(cols): scalar = np.isscalar(rows) cols = [dataset.get_dimension(cols, strict=True)] elif isinstance(cols, slice): cols = dataset.dimensions()[cols] else: cols = [dataset.get_dimension(d, strict=True) for d in cols] if np.isscalar(rows): rows = [rows] new_data = OrderedDict() for d, values in dataset.data.items(): if d in cols: if np.isscalar(values): new_data[d] = values else: new_data[d] = values[rows] if scalar: arr = new_data[cols[0].name] return arr if np.isscalar(arr) else arr[0] return new_data
Interface.register(DictInterface)