"""
AttributeTree, Collector and related classes offer optional functionality
for holding and collecting DataView objects.
"""
import uuid
import numpy as np
import param
from ..core import Dimension, ViewableElement, UniformNdMapping,\
GridSpace, AttrTree, Layout, HoloMap
from ..core.util import ProgressIndicator
from ..core.io import Reference
Time = Dimension("Time", type=param.Dynamic.time_fn.time_type)
[docs]class AttrDict(dict):
"""
A dictionary type object that supports attribute access (e.g. for IPython
tab completion).
"""
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self
[docs]class ViewRef(Reference):
"""
A ViewRef object is a Reference to a dataview object in an
Attrtree that may not exist when initialized. This makes it
possible to schedule tasks for processing data not yet present.
ViewRefs compose with the * operator to specify Overlays and also
support slicing of the referenced elements:
>>> from ..element.raster import Image
>>> ref = ViewRef('Example.Path1 * Example.Path2')
>>> tree = Layout()
>>> tree.Example.Path1 = Image(np.random.rand(5,5))
>>> tree.Example.Path2 = Image(np.random.rand(5,5))
>>> overlay = ref.resolve(tree)
>>> len(overlay)
2
Note that the operands of * must be distinct ViewRef objects.
"""
def __init__(self, spec=''):
"""
The specification is a string that follows attribute access on
an Layout. The '*' operator is supported, as well as slicing
syntax - an example specification is 'A.B.C[2:4] *D.E'
"""
self.specification, self.slices = self._parse_spec(spec)
if not len(self.slices): self.slices = [None]
if not all(p[0].isupper() for path in self.specification for p in path):
raise Exception("All path components must be capitalized.")
@property
def spec(self):
paths = ['.'.join(s for s in spec) for spec in self.specification]
indexed_paths = [p + self._pprint_index(s) for (p,s) in zip(paths, self.slices)]
ref = ' * '.join(indexed_paths)
if len(self.slices) > len(self):
ref = '(' + ref + ')' + self._pprint_index(self.slices[-1])
return ref
def _parse_spec(self, spec):
class Index(object):
def __getitem__(self, val):
return val
specs, slices = [], []
if spec.strip() == '':
return specs, slices
components = [el.strip() for el in spec.split('*')]
for component in components:
if component.count('[') != component.count(']'):
raise Exception("Mismatched parentheses in %r" % component)
elif component.count('[') in [0,1]:
if component.count('['):
pstart, pstop = component.index('['), component.index(']')
subcomponent = component[:pstart] + component[pstop+1:]
else:
subcomponent = component
path_spec = tuple(subcomponent.split('.'))
specs.append(path_spec)
else:
raise Exception("Invalid syntax %r" % component)
if component.count('[') == 1:
opening = component.find('[')
closing = component.find(']')
if opening > closing:
raise Exception("Invalid syntax %r" % component)
slices.append(eval('Index()[%s]' % component[opening+1:closing]))
else:
slices.append(None)
return specs, slices
@property
def resolved_type(self):
return (ViewableElement, UniformNdMapping, GridSpace)
def _resolve_ref(self, ref, attrtree):
"""
Get the ViewableElement referred to by a single reference tuple if the
data exists, otherwise raise AttributeError.
"""
obj = attrtree
for label in ref:
if label in obj:
obj= obj[label]
else:
info = ('.'.join(ref), label)
raise AttributeError("Could not resolve %r at level %r" % info)
return obj
[docs] def resolve(self, attrtree):
"""
Resolve the current ViewRef object into the appropriate ViewableElement
object (if available).
"""
overlaid_view = None
for idx, ref in enumerate(self.specification):
view = self._resolve_ref(ref, attrtree)
# Access specified slices for the view
slc = self.slices[idx]
view = view if slc is None else view[slc]
if overlaid_view is None:
overlaid_view = view
else:
overlaid_view = overlaid_view * view
return overlaid_view if self.specification else attrtree
def __getitem__(self, index):
"""
Slice the referenced Chart.
"""
if len(self.slices) == 1:
self.slices[0] = index
else:
self.slices.append(index)
return self
def __getattr__(self, label):
"""
Multi-level attribute access on a ViewRef() object creates a
reference with the same specified attribute access path.
"""
try:
return super(ViewRef, self).__getattr__(label)
except AttributeError:
if not label[0].isupper():
raise AttributeError("Reference path element %r must capitalized" % label)
elif len(self.specification) > 1:
raise AttributeError("Cannot use attribute specification for overlays.")
if len(self.specification) == 0:
self.specification = [(label,)]
elif len(self.specification) == 1:
self.specification = [self.specification[0] + (label,)]
return self
def __mul__(self, other):
"""
ViewRef object can be composed in to overlays.
"""
if id(self) == id(other):
raise Exception("Please ensure that each operand are distinct ViewRef objects.")
return ViewRef(self.spec + ' * ' + other.spec)
def _pprint_index(self, inds):
if inds is None: return ''
elif not isinstance(inds, tuple): inds = (inds,)
index_strings = []
for ind in inds:
if isinstance(ind, slice):
parts = [str(el) for el in [ind.start, ind.stop, ind.step]]
parts = parts[:2] if parts[2]=='None' else parts
index_strings.append('%s' % ':'.join(el if el!='None' else '' for el in parts))
else:
index_strings.append('%r' % ind)
return '[' + ', '.join(index_strings) + ']'
def __repr__(self):
return 'ViewRef(%r)' % self.spec
def __len__(self):
return len(self.specification)
[docs]class Collect(object):
"""
An Collect takes an object and corresponding hook and when
called with an Layout, updates it with the output of the hook
(given the object). The output of the hook should be a ViewableElement or an
Layout.
The input object may be a picklable object (e.g. a
ParameterizedFunction) or a Reference to the target object. The
supplied *args and **kwargs are passed to the hook together with
the resolved object.
When mode is 'merge' the return value of the hook needs to be an
Layout to be merged with the attrtree when called.
"""
[docs] @classmethod
def select_hook(cls, obj, hooks):
"""
Select the most appropriate hook by the most specific type.
"""
matches = []
obj_class = obj if isinstance(obj, type) else type(obj)
if obj_class == param.parameterized.ParameterizedMetaclass:
obj_class = obj
for tp in hooks.keys():
if issubclass(obj_class, tp):
matches.append(tp)
if len(matches) == 0:
raise Exception("No hook found for object of type %s"
% obj.__class__.__name__)
for obj_cls in obj_class.mro():
if obj_cls in matches:
return hooks[obj_cls]
raise Exception("Match not in object classes mro()")
def __init__(self, obj, *args, **kwargs):
self.args=list(args)
if 'times' in kwargs:
self.times = kwargs.pop('times')
else:
self.times = []
self.kwargs=kwargs
self.path = None
resolveable = None
if hasattr(obj, 'resolve'):
resolveable = obj
obj = obj.resolved_type
self.hook, self.mode, resolver = self.select_hook(obj, Collector.type_hooks)
if resolveable is None:
resolveable = obj if resolver is None else resolver(obj)
self.obj = resolveable
def _get_result(self, attrtree, time, times):
"""
Method returning a ViewableElement or Layout to be merged into the
attrtree (via the specified hook) in the call.
"""
resolvable = hasattr(self.obj, 'resolve')
obj = self.obj.resolve() if resolvable else self.obj
return self.hook(obj, *self.args, **self.kwargs)
def __call__(self, attrtree, time=None, times=None):
"""
Update and return the supplied Layout with the output of
the hook at the given time out of the given list of times.
"""
if self.path is None:
raise Exception("Aggregation path not set.")
if self.times and time not in self.times:
return attrtree
val = self._get_result(attrtree, time, times)
if val is None: return attrtree
if self.mode == 'merge':
if isinstance(val, Layout):
attrtree.update(val)
return attrtree
else:
raise Exception("Return value is not a Layout and mode is 'merge'.")
if self.path not in attrtree:
if not isinstance(val, (UniformNdMapping, Layout)):
val = HoloMap([((time,), val)], kdims=[Time])
else:
current_val = attrtree.data[self.path]
val = self._merge_views(current_val, val, time)
attrtree.set_path(self.path, val)
return attrtree
def _merge_views(self, current_val, val, time):
"""
Helper for merging views together. For instance, this method
will add a Image to a HoloMap or merge two ViewMaps.
"""
if isinstance(val, ViewableElement):
current_val[time] = val
elif (isinstance(current_val, UniformNdMapping) and 'Time' not in
[d.name for d in current_val.kdims]):
raise Exception("Time dimension is missing.")
else:
current_val.update(val)
return current_val
def __repr__(self):
args = ', '.join(str(el) for el in self.args) if self.args else ''
kwargs = ', '.join('%s=%r' % (k,v) for (k,v) in self.kwargs.items()) if self.kwargs else ''
name = self.obj.name if hasattr(self.obj, 'name') else repr(self.obj)
return 'Collect(%s%s%s)' % (name,
(', %s' % args) if args else '',
(', %s' % kwargs) if kwargs else '')
def __str__(self):
if hasattr(self.obj, 'name'):
obj_name = self.obj.name
else:
obj_name = self.obj
args = [str(el) for el in self.args]+['%s=%r' % (k,v) for (k,v) in self.kwargs.items()]
arguments = '' if len(args)==0 else ' [%s]' % ','.join(args)
return "%s%s" % (obj_name, arguments)
[docs]class Analyze(Collect):
"""
An Analyze is a type of Collect that updates an Attrtree with
the results of a Operation. Analyze takes a ViewRef object as
input which is resolved to generate input for the Operation.
"""
def __init__(self, reference, analysis, *args, **kwargs):
self.reference = reference
self.analysis = analysis
self.args = list(args)
if 'times' in kwargs:
self.times = kwargs.pop('times')
else:
self.times = []
self.kwargs = kwargs
self.mapwise = kwargs.pop('mapwise', False)
self.mode = kwargs.pop('mode', 'set')
self.path = None
def _get_result(self, attrtree, time, times):
if self.mapwise and time != times[-1]:
return None
else:
try:
view = self.reference.resolve(attrtree)
except:
info = (self.reference, time, self)
param.main.warning('Reference %r could not be resolved at time '
'%s, skipping analysis %r.' % info)
return None
return self.analysis(view, *self.args, **self.kwargs)
def __repr__(self):
args = ', '.join(str(el) for el in self.args) if self.args else ''
kwargs = ', '.join('%s=%r' % (k,v) for (k,v) in self.kwargs.items()) if self.kwargs else ''
return "Analyze(%r, %s%s%s)" % (self.reference, self.analysis.name,
(', %s' % args) if args else '',
(', %s' % kwargs) if kwargs else '')
def __str__(self):
return "%s(%s)" % (self.analysis.name, self.reference)
[docs]class Collector(AttrTree):
"""
A Collector specifies a template for how to populate a Layout
with data over time. Two methods are used to schedule data
collection: 'collect' and 'analyze'.
The collect method takes an object (or reference) and collects
views from it (as configured by setting an appropriate hook set
with the for_type classmethod).
The analysis method takes a reference to data on the attrtree (a
ViewRef) and passes the resolved output to the given analysisfn
Operation.
>>> Collector.for_type(str, lambda x: ViewableElement(x, name=x))
>>> Collector.interval_hook = param.Dynamic.time_fn.advance
>>> c = Collector()
>>> c.Target.Path = c.collect('example string')
# Start collection...
>>> data = c(times=[1,2,3,4,5])
>>> isinstance(data, Layout)
True
>>> isinstance(data.Target.Path, UniformNdMapping)
True
>>> times = data.Target.Path.keys()
>>> print("Collected the data for %d time values" % len(times))
Collected the data for 5 time values
>>> results = data.Target.Path.last
>>> results.name
'example string'
"""
# A callable that advances by the specified time before the next
# batch of collection tasks is executed. If set to a subclass of
# RunProgress, the class will be instantiated and precent_range
# updated to allow a progress bar to be displayed
interval_hook = param.Dynamic.time_fn.advance
# A callable that returns the time where the time may be the
# simulation time or wall-clock time. The time values are
# recorded by the UniformNdMapping keys
time_fn = param.Dynamic.time_fn
type_hooks = {}
[docs] @classmethod
def for_type(cls, tp, hookfn, referencer=None, mode='set'):
"""
For an object of a given type, apply the hookfn and use the
specified mode to aggregate the data.
To allow pickling (or any other defered access) of the target
object, a referencer (a Reference subclass) may be specified
to wrap the object as required.
If mode is 'merge', merge the Layout output by the hook,
otherwise if 'set', add the output to the path specified by
the ViewRef.
"""
cls.type_hooks[tp] = (hookfn, mode, referencer)
def __init__(self, specs=[], **kwargs):
super(Collector,self).__init__(**kwargs)
for (path_spec, obj) in specs:
if path_spec is None:
self.__dict__['data'][uuid.uuid4().hex] = obj
else:
path = path_spec.rsplit('.')
self.set_path(path, obj)
self._scheduled_tasks = []
fixed_error = 'Cannot set %r as Collector specification disabled after first call.'
self.__dict__['_fixed_error'] = fixed_error
self.__dict__['progress_label'] = 'Completion'
@property
def ref(self):
"""
A convenient property to easily generate ViewRef object (via
attribute access). Used to define ViewableElement references for analysis
or for setting a path for an Collect on the Collector.
"""
return ViewRef()
[docs] def collect(self, obj, *args, **kwargs):
"""
Aggregate views from the object at each step by passing the
arguments to the corresponding hook. The object may represent
itself, or it may be a Reference. If a referencer class was
specified when the hook was defined, the object will
automatically be wrapped into a reference.
"""
task = Collect(obj, *args, **kwargs)
if task.mode == 'merge':
self.data[uuid.uuid4().hex] = task
return None
return task
[docs] def analyze(self, reference, analysisfn, *args, **kwargs):
"""
Given a ViewRef and the Operation analysisfn, process the
data resolved by the reference with analysisfn at each step.
"""
task = Analyze(reference, analysisfn, *args, **kwargs)
if task.mode == 'merge':
self.data[uuid.uuid4().hex] = task
return task
def __call__(self, attrtree=Layout(), times=[], strict=False):
current_time = self.time_fn()
if times != sorted(times):
raise Exception("Please supply the list of times in ascending order")
if times[0] < current_time:
raise Exception("The first time value is prior to the current time.")
times = np.array([current_time] + times)
if len(set(times)) == 1:
completion = [0,100]
else:
completion = 100 * (times - times.min()) / (times.max() - times.min())
update_progress = (isinstance(self.interval_hook, type)
and issubclass(self.interval_hook, ProgressIndicator))
# If an instance of RunProgress, instantiate the progress bar
interval_hook = (self.interval_hook(label=self.progress_label)
if update_progress else self.interval_hook)
self._schedule_tasks(times, strict)
(self.fixed, attrtree.fixed) = (False, False)
try:
for i, t in enumerate(np.diff(times)):
interval_hook(float(t))
# An empty attrtree buffer stops analysis repeatedly
# computing results over the entire accumulated map
attrtree_buffer = Layout()
for task in self._scheduled_tasks:
try:
if isinstance(task, Analyze) and task.mapwise:
task(attrtree, self.time_fn(), times)
else:
task(attrtree_buffer, self.time_fn(), times)
attrtree.update(attrtree_buffer)
except Exception as e:
param.main.warning("Task %s at time %s failed with following "
"exception and was skipped:\n%s",
task, self.time_fn(), e)
if update_progress:
interval_hook.percent_range = (completion[i],
completion[i+1])
interval_hook(0)
(self.fixed, attrtree.fixed) = (True, True)
return attrtree
except KeyboardInterrupt:
(self.fixed, attrtree.fixed) = (True, True)
return attrtree
[docs] def verify_times(self, times, strict=False):
"""
Given a set of times this method checks that all
scheduled measurements will actually be carried out.
"""
for _, task in self.items():
if task.times:
self._verify_task_times(task, times, strict)
def _verify_task_times(self, task, times, strict=False):
"""
Checks that a given task that is scheduled to be run at
certain times will actually be executed. The strict flag
determines whether to simply warn or raise an Exception.
"""
if task.times:
unsatisfied = set(task.times) - set(list(times))
if unsatisfied:
msg = "Task %r has been requested for times %s, " \
"not scheduled for collection." % (task, list(unsatisfied))
if unsatisfied:
if strict: raise Exception(msg)
else: param.main.warning(msg)
def _schedule_tasks(self, times, strict=False):
"""
Inspect the data to find all the Collects that have
been specified and add them to the scheduled tasks list.
"""
self._scheduled_tasks = []
for path, task in self.items():
if task is None:
raise Exception("Incorrect task definition for %r" % '.'.join(path))
if not isinstance(task, Collect):
self._scheduled_tasks = []
raise Exception("Only Collects or Analyze objects allowed, not %s" % task)
if isinstance(path, tuple) and task.mode == 'merge':
self._scheduled_tasks = []
raise Exception("Setting path for Task that is in 'merge' mode.")
task.path = path
self._verify_task_times(task, times, strict)
self._scheduled_tasks.append(task)
def __repr__(self):
spec_strs = []
for path, val in self.items():
key = repr('.'.join(path)) if isinstance(path, tuple) else 'None'
spec_strs.append('\n(%s, %r)' % (key, val))
return 'Collector([%s])' % ', '.join(spec_strs)
def __str__(self):
indent = ' '
padding = len(str(len(self)))
num_fmt = '%%0%dd.' % padding
lines = ["%d tasks scheduled:\n" % len(self)]
dotted_line = indent + num_fmt +" %s"
merge_line = indent + num_fmt + " [...] "
value_line = indent*3 + ' '*padding + " %s %s"
for i, (path, val) in enumerate(self.items()):
if isinstance(path, tuple):
lines.append(dotted_line % (i+1, '.'.join(p for p in path)))
lines.append(value_line % (' ', val))
else:
lines.append(merge_line % (i+1))
lines.append(value_line % ('', val))
return '\n'.join(lines)