Commit 5d4b11f8 authored by Dr.李's avatar Dr.李

added tests for least square explain

parent d4f35fab
......@@ -13,12 +13,15 @@ from typing import Union
from alphamind.aggregate import groupby
def neutralize(x: np.ndarray, y: np.ndarray, groups: np.ndarray=None, keep_explained=False) \
def neutralize(x: np.ndarray, y: np.ndarray, groups: np.ndarray=None, output_explained=False) \
-> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
if groups is not None:
res = zeros(y.shape)
if keep_explained:
explained = zeros((x.shape[1],) + y.shape)
if y.ndim == 2 and output_explained:
explained = zeros(x.shape + (y.shape[1],))
else:
explained = zeros(x.shape)
groups_ids = groupby(groups)
for curr_idx in groups_ids:
......@@ -26,16 +29,16 @@ def neutralize(x: np.ndarray, y: np.ndarray, groups: np.ndarray=None, keep_expla
curr_y = y[curr_idx]
b = ls_fit(x[curr_idx], y[curr_idx])
res[curr_idx] = ls_res(curr_x, curr_y, b)
if keep_explained:
explained[curr_idx] = ls_explain(curr_x, curr_y, b)
if keep_explained:
if output_explained:
explained[curr_idx] = ls_explain(curr_x, b)
if output_explained:
return res, explained
else:
return res
else:
b = ls_fit(x, y)
if keep_explained:
return ls_res(x, y, b), ls_explain(x, y, b)
if output_explained:
return ls_res(x, y, b), ls_explain(x, b)
else:
return ls_res(x, y, b)
......@@ -50,18 +53,17 @@ def ls_res(x: np.ndarray, y: np.ndarray, b: np.ndarray) -> np.ndarray:
return y - x @ b
def ls_explain(x: np.ndarray, y: np.ndarray, b: np.ndarray) -> np.ndarray:
if y.ndim == 1:
return y.reshape((-1, 1)) - b * x
def ls_explain(x: np.ndarray, b: np.ndarray) -> np.ndarray:
if b.ndim == 1:
return b * x
else:
n_samples = y.shape[0]
dependends = y.shape[1]
n_samples = x.shape[0]
dependends = b.shape[1]
factors = x.shape[1]
explained = zeros((n_samples, factors, dependends))
for i in range(dependends):
this_y = y[:, [i]]
explained[:, :, i] = this_y - b[:, i] * x
explained[:, :, i] = b[:, i] * x
return explained
......
......@@ -40,9 +40,71 @@ class TestNeutralize(unittest.TestCase):
curr_y = y[groups == i]
model.fit(curr_x, curr_y)
exp_res = curr_y - curr_x @ model.coef_.T
np.testing.assert_array_almost_equal(calc_res[groups ==i
np.testing.assert_array_almost_equal(calc_res[groups == i], exp_res)
], exp_res)
def test_neutralize_explain_output(self):
y = np.random.randn(3000)
x = np.random.randn(3000, 10)
calc_res, calc_explained = neutralize(x, y, output_explained=True)
model = LinearRegression(fit_intercept=False)
model.fit(x, y)
exp_res = y - x @ model.coef_.T
exp_explained = x * model.coef_.T
np.testing.assert_array_almost_equal(calc_res, exp_res)
np.testing.assert_array_almost_equal(calc_explained, exp_explained)
y = np.random.randn(3000, 4)
x = np.random.randn(3000, 10)
calc_res, calc_explained = neutralize(x, y, output_explained=True)
model = LinearRegression(fit_intercept=False)
model.fit(x, y)
exp_res = y - x @ model.coef_.T
np.testing.assert_array_almost_equal(calc_res, exp_res)
for i in range(y.shape[1]):
exp_explained = x * model.coef_.T[:, i]
np.testing.assert_array_almost_equal(calc_explained[:, :, i], exp_explained)
def test_neutralize_explain_output_with_group(self):
y = np.random.randn(3000)
x = np.random.randn(3000, 10)
groups = np.random.randint(30, size=3000)
calc_res, calc_explained = neutralize(x, y, groups, output_explained=True)
model = LinearRegression(fit_intercept=False)
for i in range(30):
curr_x = x[groups == i]
curr_y = y[groups == i]
model.fit(curr_x, curr_y)
exp_res = curr_y - curr_x @ model.coef_.T
exp_explained = curr_x * model.coef_.T
np.testing.assert_array_almost_equal(calc_res[groups == i], exp_res)
np.testing.assert_array_almost_equal(calc_explained[groups == i], exp_explained)
y = np.random.randn(3000, 4)
x = np.random.randn(3000, 10)
calc_res, calc_explained = neutralize(x, y, groups, output_explained=True)
model = LinearRegression(fit_intercept=False)
for i in range(30):
curr_x = x[groups == i]
curr_y = y[groups == i]
model.fit(curr_x, curr_y)
exp_res = curr_y - curr_x @ model.coef_.T
np.testing.assert_array_almost_equal(calc_res[groups == i], exp_res)
for j in range(y.shape[1]):
exp_explained = curr_x * model.coef_.T[:, j]
np.testing.assert_array_almost_equal(calc_explained[groups == i, :, j], exp_explained)
if __name__ == '__main__':
......
......@@ -19,7 +19,7 @@ alpha_logger.addHandler(ch)
alpha_logger.setLevel(logging.INFO)
def add_parent_path(name, level):
def add_parent_path(name: str, level: int) -> None:
current_path = os.path.abspath(name)
sys.path.append(os.path.sep.join(current_path.split(os.path.sep)[:-level]))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment