Commit 0a6d9643 authored by Dr.李's avatar Dr.李

FEATURE: added winsorize wth interpolation

parent 23dbdadd
......@@ -35,6 +35,46 @@ def mask_values_2d(x: np.ndarray,
return res
@nb.njit(nogil=True, cache=True)
def interp_values_2d(x: np.ndarray,
groups: np.ndarray,
mean_values: np.ndarray,
std_values: np.ndarray,
num_stds: int = 3,
interval: float = 0.5) -> np.ndarray:
res = x.copy()
length, width = x.shape
max_cat = np.max(groups)
for k in range(max_cat + 1):
target_idx = np.where(groups == k)[0].flatten()
for j in range(width):
target_x = x[target_idx, j]
target_res = target_x.copy()
mean = mean_values[target_idx[0], j]
std = std_values[target_idx[0], j]
ubound = mean + num_stds * std
lbound = mean - num_stds * std
# upper bound abnormal values
idx = target_x > ubound
n = np.sum(idx)
if n > 0:
u_values = target_res[idx]
q_values = u_values.argsort().argsort()
target_res[idx] = ubound + q_values / n * interval * std
# lower bound abnormal values
idx = target_x < lbound
n = np.sum(idx)
if n > 0:
l_values = target_res[idx]
q_values = (-l_values).argsort().argsort()
target_res[idx] = lbound - q_values / n * interval * std
res[target_idx, j] = target_res
return res
@nb.njit(nogil=True, cache=True)
def mask_values_1d(x: np.ndarray,
mean_values: np.ndarray,
......@@ -46,38 +86,76 @@ def mask_values_1d(x: np.ndarray,
for j in range(width):
ubound = mean_values[j] + num_stds * std_values[j]
lbound = mean_values[j] - num_stds * std_values[j]
for i in range(length):
if x[i, j] > ubound:
res[i, j] = ubound
elif x[i, j] < lbound:
res[i, j] = lbound
res[x[:, j] > ubound, j] = ubound
res[x[:, j] < lbound, j] = lbound
return res
@nb.njit(nogil=True, cache=True)
def interp_values_1d(x: np.ndarray,
mean_values: np.ndarray,
std_values: np.ndarray,
num_stds: int = 3,
interval: float = 0.5) -> np.ndarray:
res = x.copy()
length, width = x.shape
for j in range(width):
ubound = mean_values[j] + num_stds * std_values[j]
lbound = mean_values[j] - num_stds * std_values[j]
# upper bound abnormal values
idx = x[:, j] > ubound
n = np.sum(idx)
if n > 0:
u_values = res[idx, j]
q_values = u_values.argsort().argsort()
res[idx, j] = ubound + q_values / n * interval * std_values[j]
# lower bound abnormal values
idx = x[:, j] < lbound
n = np.sum(idx)
if n > 0:
l_values = res[idx, j]
q_values = (-l_values).argsort().argsort()
res[idx, j] = lbound - q_values / n * interval * std_values[j]
return res
def winsorize_normal(x: np.ndarray, num_stds: int = 3, ddof=1,
groups: np.ndarray = None,
fill_method: str = 'flat',
fill_interval: int = 0.5) -> np.ndarray:
method: str = 'flat',
interval: float = 0.5) -> np.ndarray:
if groups is not None:
groups = group_mapping(groups)
mean_values = transform(groups, x, 'mean')
std_values = transform(groups, x, 'std', ddof)
if method == 'flat':
res = mask_values_2d(x, mean_values, std_values, num_stds)
else:
res = interp_values_2d(x, groups, mean_values, std_values, num_stds, interval)
else:
std_values = simple_std(x, axis=0, ddof=ddof)
mean_values = simple_mean(x, axis=0)
if method == 'flat':
res = mask_values_1d(x, mean_values, std_values, num_stds)
else:
res = interp_values_1d(x, mean_values, std_values, num_stds, interval)
return res
class NormalWinsorizer(object):
def __init__(self, num_stds: int = 3, ddof=1):
def __init__(self, num_stds: int = 3,
ddof: int =1,
method: str = 'flat',
interval: float = 0.5):
self.num_stds = num_stds
self.ddof = ddof
self.mean = None
self.std = None
self.labels = None
self.method = method
self.interval = interval
def fit(self, x: np.ndarray, groups: np.ndarray = None):
if groups is not None:
......@@ -92,9 +170,35 @@ class NormalWinsorizer(object):
def transform(self, x: np.ndarray, groups: np.ndarray = None) -> np.ndarray:
if groups is not None:
index = array_index(self.labels, groups)
return mask_values_2d(x, self.mean[index], self.std[index], self.num_stds)
if self.method == 'flat':
res = mask_values_2d(x, self.mean[index], self.std[index], self.num_stds)
else:
res = interp_values_2d(x, groups,
self.mean[index],
self.std[index],
self.num_stds,
self.interval)
else:
return mask_values_1d(x, self.mean, self.std, self.num_stds)
if self.method == 'flat':
res = mask_values_1d(x, self.mean, self.std, self.num_stds)
else:
res = interp_values_1d(x, self.mean, self.std, self.num_stds, self.interval)
return res
def __call__(self, x: np.ndarray, groups: np.ndarray = None) -> np.ndarray:
return winsorize_normal(x, self.num_stds, self.ddof, groups)
return winsorize_normal(x, self.num_stds, self.ddof, groups, self.method, self.interval)
if __name__ == '__main__':
x = np.random.randn(10000, 1)
groups = np.random.randint(0, 3, 10000)
import datetime as dt
start = dt.datetime.now()
for i in range(1000):
winsorize_normal(x, method='flat')
print(dt.datetime.now() - start)
start = dt.datetime.now()
for i in range(1000):
winsorize_normal(x, method='interp')
print(dt.datetime.now() - start)
......@@ -17,6 +17,7 @@ from alphamind.data.winsorize import winsorize_normal
class TestWinsorize(unittest.TestCase):
def setUp(self):
np.random.seed(10)
self.x = np.random.randn(3000, 10)
self.groups = np.random.randint(10, 30, size=3000)
self.num_stds = 2
......@@ -38,6 +39,33 @@ class TestWinsorize(unittest.TestCase):
calculated_col = calc_winsorized[:, i]
np.testing.assert_array_almost_equal(col_data, calculated_col)
def test_winsorize_normal_with_interp(self):
calc_winsorized = winsorize_normal(self.x, self.num_stds, method='interp')
std_values = self.x.std(axis=0, ddof=1)
mean_value = self.x.mean(axis=0)
lower_bound = mean_value - self.num_stds * std_values
upper_bound = mean_value + self.num_stds * std_values
for i in range(np.size(calc_winsorized, 1)):
col_data = self.x[:, i].copy()
idx = col_data > upper_bound[i]
u_values = col_data[idx]
q_values = u_values.argsort().argsort()
if len(q_values) > 0:
col_data[idx] = upper_bound[i] + q_values / len(q_values) * 0.5 * std_values[i]
idx = col_data < lower_bound[i]
l_values = col_data[idx]
q_values = (-l_values).argsort().argsort()
if len(q_values) > 0:
col_data[idx] = lower_bound[i] - q_values / len(q_values) * 0.5 * std_values[i]
calculated_col = calc_winsorized[:, i]
np.testing.assert_array_almost_equal(col_data, calculated_col)
def test_winsorize_normal_with_group(self):
cal_winsorized = winsorize_normal(self.x, self.num_stds, groups=self.groups)
......@@ -55,6 +83,36 @@ class TestWinsorize(unittest.TestCase):
exp_winsorized = pd.DataFrame(self.x).groupby(self.groups).transform(impl).values
np.testing.assert_array_almost_equal(cal_winsorized, exp_winsorized)
def test_winsorize_normal_with_group_and_interp(self):
cal_winsorized = winsorize_normal(self.x, self.num_stds, groups=self.groups,
method='interp')
def impl(x):
x = x.values
std_values = x.std(axis=0, ddof=1)
mean_value = x.mean(axis=0)
lower_bound = mean_value - self.num_stds * std_values
upper_bound = mean_value + self.num_stds * std_values
col_data = x.copy()
idx = col_data > upper_bound
u_values = col_data[idx]
q_values = u_values.argsort().argsort()
if len(q_values) > 0:
col_data[idx] = upper_bound + q_values / len(q_values) * 0.5 * std_values
idx = col_data < lower_bound
l_values = col_data[idx]
q_values = (-l_values).argsort().argsort()
if len(q_values) > 0:
col_data[idx] = lower_bound - q_values / len(q_values) * 0.5 * std_values
return col_data
exp_winsorized = pd.DataFrame(self.x).groupby(self.groups).transform(impl).values
np.testing.assert_array_almost_equal(cal_winsorized, exp_winsorized)
def test_normal_winsorizer(self):
s = NormalWinsorizer(num_stds=self.num_stds)
s.fit(self.x)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment