import param
import numpy as np
import pandas as pd
from ..core import Operation, Element
from ..core.data import PandasInterface
from ..element import Scatter
[docs]class RollingBase(param.Parameterized):
"""
Parameters shared between `rolling` and `rolling_outlier_std`.
"""
center = param.Boolean(default=True, doc="""
Whether to set the x-coordinate at the center or right edge
of the window.""")
min_periods = param.Integer(default=None, doc="""
Minimum number of observations in window required to have a
value (otherwise result is NaN).""")
rolling_window = param.Integer(default=10, doc="""
The window size over which to operate.""")
def _roll_kwargs(self):
return {'window': self.p.rolling_window,
'center': self.p.center,
'min_periods': self.p.min_periods}
[docs]class rolling(Operation,RollingBase):
"""
Applies a function over a rolling window.
"""
window_type = param.ObjectSelector(default=None,
objects=['boxcar', 'triang', 'blackman', 'hamming', 'bartlett',
'parzen', 'bohman', 'blackmanharris', 'nuttall',
'barthann', 'kaiser', 'gaussian', 'general_gaussian',
'slepian'], doc="The shape of the window to apply")
function = param.Callable(default=np.mean, doc="""
The function to apply over the rolling window.""")
def _process_layer(self, element, key=None):
xdim = element.kdims[0].name
df = PandasInterface.as_dframe(element)
df = df.set_index(xdim).rolling(win_type=self.p.window_type,
**self._roll_kwargs())
if self.p.window_type is None:
rolled = df.apply(self.p.function)
else:
if self.p.function is np.mean:
rolled = df.mean()
elif self.p.function is np.sum:
rolled = df.sum()
else:
raise ValueError("Rolling window function only supports "
"mean and sum when custom window_type is supplied")
return element.clone(rolled.reset_index())
def _process(self, element, key=None):
return element.map(self._process_layer, Element)
[docs]class resample(Operation):
"""
Resamples a timeseries of dates with a frequency and function.
"""
closed = param.ObjectSelector(default=None, objects=['left', 'right'],
doc="Which side of bin interval is closed")
function = param.Callable(default=np.mean, doc="""
Function for computing new values out of existing ones.""")
label = param.ObjectSelector(default='right', doc="""
The bin edge to label the bin with.""")
rule = param.String(default='D', doc="""
A string representing the time interval over which to apply the resampling""")
def _process_layer(self, element, key=None):
df = PandasInterface.as_dframe(element)
xdim = element.kdims[0].name
resample_kwargs = {'rule': self.p.rule, 'label': self.p.label,
'closed': self.p.closed}
df = df.set_index(xdim).resample(**resample_kwargs)
return element.clone(df.apply(self.p.function).reset_index())
def _process(self, element, key=None):
return element.map(self._process_layer, Element)
[docs]class rolling_outlier_std(Operation, RollingBase):
"""
Detect outliers using the standard deviation within a rolling window.
Outliers are the array elements outside `sigma` standard deviations from
the smoothed trend line, as calculated from the trend line residuals.
The rolling window is controlled by parameters shared with the
`rolling` operation via the base class RollingBase, to make it
simpler to use the same settings for both.
"""
sigma = param.Number(default=2.0, doc="""
Minimum sigma before a value is considered an outlier.""")
def _process_layer(self, element, key=None):
ys = element.dimension_values(1)
# Calculate the variation in the distribution of the residual
avg = pd.Series(ys).rolling(**self._roll_kwargs()).mean()
residual = ys - avg
std = pd.Series(residual).rolling(**self._roll_kwargs()).std()
# Get indices of outliers
with np.errstate(invalid='ignore'):
outliers = (np.abs(residual) > std * self.p.sigma).values
return element[outliers].clone(new_type=Scatter)
def _process(self, element, key=None):
return element.map(self._process_layer, Element)