Commit 4faceefc authored by Dr.李's avatar Dr.李

reformat codes

parent 39930004
......@@ -52,8 +52,8 @@ if __name__ == '__main__':
benchmark_build_percent_with_group(30, 50000, 0.1, 3)
benchmark_build_percent(50000, 20, 0.1)
benchmark_build_percent_with_group(50000, 20, 0.1, 300)
benchmark_build_linear(100, 3, 3, 100)
benchmark_build_linear(1000, 30, 30, 10)
benchmark_build_linear(100, 3, 100)
benchmark_build_linear(1000, 30, 10)
benchmark_simple_settle(3000, 10, 1000)
benchmark_simple_settle_with_group(3000, 10, 1000, 30)
benchmark_simple_settle(30, 10, 50000)
......
......@@ -7,6 +7,7 @@ Created on 2017-5-5
import numpy as np
from typing import Tuple
from typing import Union
import cvxpy
from cvxopt import solvers
......@@ -15,8 +16,8 @@ solvers.options['glpk'] = {'msg_lev': 'GLP_MSG_OFF'}
def linear_build(er: np.ndarray,
lbound: np.ndarray,
ubound: np.ndarray,
lbound: Union[np.ndarray, float],
ubound: Union[np.ndarray, float],
risk_exposure: np.ndarray,
bm: np.ndarray,
risk_target: Tuple[np.ndarray, np.ndarray]=None,
......@@ -43,17 +44,3 @@ def linear_build(er: np.ndarray,
prob = cvxpy.Problem(objective, constraints)
prob.solve(solver=solver)
return prob.status, prob.value, np.array(w.value).flatten()
if __name__ == '__main__':
er = np.arange(300)
bm = np.ones(300) * 0.00333333333
risk_exposure = np.random.randn(300, 10)
s, v, x = linear_build(er, 0., 0.01, risk_exposure, bm, (-0.01*np.ones(10), 0.01*np.ones(10)))
print(s)
print(x.sum())
print(x.min(), ',', x.max())
print((x - bm) @ risk_exposure)
......@@ -15,7 +15,7 @@ from alphamind.utilities import set_value
def percent_build(er: np.ndarray, percent: float, groups: np.ndarray=None) -> np.ndarray:
if er.ndim == 1 or (er.shape[0] == 1 or er.shape[1] == 1):
""" fast path methods for single column er"""
# fast path methods for single column er
neg_er = -er.flatten()
length = len(neg_er)
weights = zeros((length, 1))
......
......@@ -15,7 +15,7 @@ from alphamind.utilities import set_value
def rank_build(er: np.ndarray, use_rank: int, groups: np.ndarray=None) -> np.ndarray:
if er.ndim == 1 or (er.shape[0] == 1 or er.shape[1] == 1):
""" fast path methods for single column er"""
# fast path methods for single column er
neg_er = -er.flatten()
length = len(neg_er)
weights = zeros((length, 1))
......
......@@ -13,7 +13,8 @@ from alphamind.analysis.riskanalysis import risk_analysis
class TestRiskAnalysis(unittest.TestCase):
def test_risk_analysis(self):
@classmethod
def test_risk_analysis(cls):
n_samples = 36000
n_dates = 20
n_risk_factors = 35
......@@ -26,7 +27,7 @@ class TestRiskAnalysis(unittest.TestCase):
columns=list(range(n_risk_factors)),
index=dates)
explained_table, exposure_table = risk_analysis(weights_series - bm_series,
explained_table, _ = risk_analysis(weights_series - bm_series,
next_bar_return_series,
risk_table)
......
......@@ -13,98 +13,87 @@ from alphamind.data.neutralize import neutralize
class TestNeutralize(unittest.TestCase):
def test_neutralize(self):
y = np.random.randn(3000, 4)
x = np.random.randn(3000, 10)
def setUp(self):
self.y = np.random.randn(3000, 4)
self.x = np.random.randn(3000, 10)
self.groups = np.random.randint(30, size=3000)
calc_res = neutralize(x, y)
def test_neutralize(self):
calc_res = neutralize(self.x, self.y)
model = LinearRegression(fit_intercept=False)
model.fit(x, y)
model.fit(self.x, self.y)
exp_res = y - x @ model.coef_.T
exp_res = self.y - self.x @ model.coef_.T
np.testing.assert_array_almost_equal(calc_res, exp_res)
def test_neutralize_with_group(self):
y = np.random.randn(3000, 4)
x = np.random.randn(3000, 10)
groups = np.random.randint(30, size=3000)
calc_res = neutralize(x, y, groups)
calc_res = neutralize(self.x, self.y, self.groups)
model = LinearRegression(fit_intercept=False)
for i in range(30):
curr_x = x[groups == i]
curr_y = y[groups == i]
curr_x = self.x[self.groups == i]
curr_y = self.y[self.groups == i]
model.fit(curr_x, curr_y)
exp_res = curr_y - curr_x @ model.coef_.T
np.testing.assert_array_almost_equal(calc_res[groups == i], exp_res)
np.testing.assert_array_almost_equal(calc_res[self.groups == i], exp_res)
def test_neutralize_explain_output(self):
y = np.random.randn(3000)
x = np.random.randn(3000, 10)
y = self.y[:, 0].flatten()
calc_res, other_stats = neutralize(x, y, output_explained=True)
calc_res, other_stats = neutralize(self.x, y, output_explained=True)
model = LinearRegression(fit_intercept=False)
model.fit(x, y)
model.fit(self.x, y)
exp_res = y - x @ model.coef_.T
exp_explained = x * model.coef_.T
exp_res = y - self.x @ model.coef_.T
exp_explained = self.x * model.coef_.T
np.testing.assert_array_almost_equal(calc_res, exp_res.reshape(-1, 1))
np.testing.assert_array_almost_equal(other_stats['explained'][:, :, 0], exp_explained)
y = np.random.randn(3000, 4)
x = np.random.randn(3000, 10)
calc_res, other_stats = neutralize(x, y, output_explained=True)
calc_res, other_stats = neutralize(self.x, self.y, output_explained=True)
model = LinearRegression(fit_intercept=False)
model.fit(x, y)
model.fit(self.x, self.y)
exp_res = y - x @ model.coef_.T
exp_res = self.y - self.x @ model.coef_.T
np.testing.assert_array_almost_equal(calc_res, exp_res)
for i in range(y.shape[1]):
exp_explained = x * model.coef_.T[:, i]
for i in range(self.y.shape[1]):
exp_explained = self.x * model.coef_.T[:, i]
np.testing.assert_array_almost_equal(other_stats['explained'][:, :, i], exp_explained)
def test_neutralize_explain_output_with_group(self):
y = np.random.randn(3000)
x = np.random.randn(3000, 10)
groups = np.random.randint(30, size=3000)
y = self.y[:, 0].flatten()
calc_res, other_stats = neutralize(x, y, groups, output_explained=True)
calc_res, other_stats = neutralize(self.x, y, self.groups, output_explained=True)
model = LinearRegression(fit_intercept=False)
for i in range(30):
curr_x = x[groups == i]
curr_y = y[groups == i]
curr_x = self.x[self.groups == i]
curr_y = y[self.groups == i]
model.fit(curr_x, curr_y)
exp_res = curr_y - curr_x @ model.coef_.T
exp_explained = curr_x * model.coef_.T
np.testing.assert_array_almost_equal(calc_res[groups == i], exp_res.reshape(-1, 1))
np.testing.assert_array_almost_equal(other_stats['explained'][groups == i, :, 0], exp_explained)
y = np.random.randn(3000, 4)
x = np.random.randn(3000, 10)
np.testing.assert_array_almost_equal(calc_res[self.groups == i], exp_res.reshape(-1, 1))
np.testing.assert_array_almost_equal(other_stats['explained'][self.groups == i, :, 0], exp_explained)
calc_res, other_stats = neutralize(x, y, groups, output_explained=True)
calc_res, other_stats = neutralize(self.x, self.y, self.groups, output_explained=True)
model = LinearRegression(fit_intercept=False)
for i in range(30):
curr_x = x[groups == i]
curr_y = y[groups == i]
curr_x = self.x[self.groups == i]
curr_y = self.y[self.groups == i]
model.fit(curr_x, curr_y)
exp_res = curr_y - curr_x @ model.coef_.T
np.testing.assert_array_almost_equal(calc_res[groups == i], exp_res)
np.testing.assert_array_almost_equal(calc_res[self.groups == i], exp_res)
for j in range(y.shape[1]):
for j in range(self.y.shape[1]):
exp_explained = curr_x * model.coef_.T[:, j]
np.testing.assert_array_almost_equal(other_stats['explained'][groups == i, :, j], exp_explained)
np.testing.assert_array_almost_equal(other_stats['explained'][self.groups == i, :, j], exp_explained)
if __name__ == '__main__':
......
......@@ -14,21 +14,21 @@ from alphamind.data.standardize import standardize
class TestStandardize(unittest.TestCase):
def test_standardize(self):
x = np.random.randn(3000, 10)
def setUp(self):
self.x = np.random.randn(3000, 10)
self.groups = np.random.randint(10, 30, size=3000)
calc_zscore = standardize(x)
exp_zscore = zscore(x, ddof=1)
def test_standardize(self):
calc_zscore = standardize(self.x)
exp_zscore = zscore(self.x, ddof=1)
np.testing.assert_array_almost_equal(calc_zscore, exp_zscore)
def test_standardize_with_group(self):
x = np.random.randn(3000, 10)
groups = np.random.randint(10, 30, size=3000)
calc_zscore = standardize(x, groups)
exp_zscore = pd.DataFrame(x).groupby(groups).transform(lambda s: (s - s.mean(axis=0)) / s.std(axis=0, ddof=1))
calc_zscore = standardize(self.x, self.groups)
exp_zscore = pd.DataFrame(self.x).\
groupby(self.groups).\
transform(lambda s: (s - s.mean(axis=0)) / s.std(axis=0, ddof=1))
np.testing.assert_array_almost_equal(calc_zscore, exp_zscore)
......
......@@ -13,21 +13,22 @@ from alphamind.data.winsorize import winsorize_normal
class TestWinsorize(unittest.TestCase):
def test_winsorize_normal(self):
num_stds = 2
x = np.random.randn(3000, 10)
def setUp(self):
self.x = np.random.randn(3000, 10)
self.groups = np.random.randint(10, 30, size=3000)
self.num_stds = 2
calc_winsorized = winsorize_normal(x, num_stds)
def test_winsorize_normal(self):
calc_winsorized = winsorize_normal(self.x, self.num_stds)
std_values = x.std(axis=0, ddof=1)
mean_value = x.mean(axis=0)
std_values = self.x.std(axis=0, ddof=1)
mean_value = self.x.mean(axis=0)
lower_bound = mean_value - num_stds * std_values
upper_bound = mean_value + num_stds * std_values
lower_bound = mean_value - self.num_stds * std_values
upper_bound = mean_value + self.num_stds * std_values
for i in range(np.size(calc_winsorized, 1)):
col_data = x[:, i]
col_data = self.x[:, i]
col_data[col_data > upper_bound[i]] = upper_bound[i]
col_data[col_data < lower_bound[i]] = lower_bound[i]
......@@ -35,24 +36,20 @@ class TestWinsorize(unittest.TestCase):
np.testing.assert_array_almost_equal(col_data, calculated_col)
def test_winsorize_normal_with_group(self):
num_stds = 2
x = np.random.randn(3000, 10)
groups = np.random.randint(10, 30, size=3000)
cal_winsorized = winsorize_normal(x, num_stds, groups)
cal_winsorized = winsorize_normal(self.x, self.num_stds, self.groups)
def impl(x):
std_values = x.std(axis=0, ddof=1)
mean_value = x.mean(axis=0)
lower_bound = mean_value - num_stds * std_values
upper_bound = mean_value + num_stds * std_values
lower_bound = mean_value - self.num_stds * std_values
upper_bound = mean_value + self.num_stds * std_values
res = np.where(x > upper_bound, upper_bound, x)
res = np.where(res < lower_bound, lower_bound, res)
return res
exp_winsorized = pd.DataFrame(x).groupby(groups).transform(impl).values
exp_winsorized = pd.DataFrame(self.x).groupby(self.groups).transform(impl).values
np.testing.assert_array_almost_equal(cal_winsorized, exp_winsorized)
......
......@@ -12,22 +12,23 @@ from alphamind.portfolio.linearbuilder import linear_build
class TestLinearBuild(unittest.TestCase):
def test_linear_build(self):
def setUp(self):
self.er = np.random.randn(3000)
self.risk_exp = np.random.randn(3000, 30)
self.bm = np.random.randint(100, size=3000).astype(float)
er = np.random.randn(3000)
risk_exp = np.random.randn(3000, 30)
bm = np.random.randint(100, size=3000).astype(float)
bm /= bm.sum()
def test_linear_build(self):
bm = self.bm / self.bm.sum()
eplson = 1e-6
status, value, w = linear_build(er, 0., 0.01, risk_exp, bm)
status, value, w = linear_build(self.er, 0., 0.01, self.risk_exp, bm)
self.assertEqual(status, 'optimal')
self.assertAlmostEqual(np.sum(w), 1.)
self.assertTrue(np.all(w <= 0.01 + eplson))
self.assertTrue(np.all(w >= -eplson))
calc_risk = (w - bm) @ risk_exp
expected_risk = np.zeros(risk_exp.shape[1])
calc_risk = (w - bm) @self. risk_exp
expected_risk = np.zeros(self.risk_exp.shape[1])
np.testing.assert_array_almost_equal(calc_risk, expected_risk)
......
......@@ -13,34 +13,28 @@ from alphamind.portfolio.longshortbulder import long_short_build
class TestLongShortBuild(unittest.TestCase):
def test_long_short_build(self):
def setUp(self):
self.x = np.random.randn(3000, 10)
self.groups = np.random.randint(10, 40, size=3000)
x = np.random.randn(3000)
def test_long_short_build(self):
x = self.x[:, 0].flatten()
calc_weights = long_short_build(x).flatten()
expected_weights = x / np.abs(x).sum()
np.testing.assert_array_almost_equal(calc_weights, expected_weights)
x = np.random.randn(3000, 10)
calc_weights = long_short_build(x, leverage=2)
expected_weights = x / np.abs(x).sum(axis=0) * 2
calc_weights = long_short_build(self.x, leverage=2)
expected_weights = self.x / np.abs(self.x).sum(axis=0) * 2
np.testing.assert_array_almost_equal(calc_weights, expected_weights)
def test_long_short_build_with_group(self):
x = np.random.randn(3000)
groups = np.random.randint(10, 40, size=3000)
calc_weights = long_short_build(x, groups=groups).flatten()
expected_weights = pd.Series(x).groupby(groups).apply(lambda s: s / np.abs(s).sum())
x = self.x[:, 0].flatten()
calc_weights = long_short_build(x, groups=self.groups).flatten()
expected_weights = pd.Series(x).groupby(self.groups).apply(lambda s: s / np.abs(s).sum())
np.testing.assert_array_almost_equal(calc_weights, expected_weights)
x = np.random.randn(3000, 10)
groups = np.random.randint(10, 40, size=3000)
calc_weights = long_short_build(x, groups=groups)
expected_weights = pd.DataFrame(x).groupby(groups).apply(lambda s: s / np.abs(s).sum(axis=0))
calc_weights = long_short_build(self.x, groups=self.groups)
expected_weights = pd.DataFrame(self.x).groupby(self.groups).apply(lambda s: s / np.abs(s).sum(axis=0))
np.testing.assert_array_almost_equal(calc_weights, expected_weights)
......
......@@ -13,17 +13,19 @@ from alphamind.portfolio.percentbuilder import percent_build
class TestPercentBuild(unittest.TestCase):
def test_percent_build(self):
n_samples = 3000
p_included = 0.1
def setUp(self):
self.n_samples = 3000
self.p_included = 0.1
self.n_groups = 30
self.n_portfolios = range(1, 10)
n_portfolios = range(1, 10)
n_include = int(n_samples * p_included)
def test_percent_build(self):
n_include = int(self.n_samples * self.p_included)
for n_portfolio in n_portfolios:
x = np.random.randn(n_samples, n_portfolio)
for n_portfolio in self.n_portfolios:
x = np.random.randn(self.n_samples, n_portfolio)
calc_weights = percent_build(x, p_included)
calc_weights = percent_build(x, self.p_included)
expected_weights = np.zeros((len(x), n_portfolio))
......@@ -35,24 +37,18 @@ class TestPercentBuild(unittest.TestCase):
np.testing.assert_array_almost_equal(calc_weights, expected_weights)
def test_percent_build_with_group(self):
n_samples = 3000
p_included = 0.1
n_groups = 30
n_portfolios = range(1, 10)
for n_portfolio in n_portfolios:
for n_portfolio in self.n_portfolios:
x = np.random.randn(n_samples, n_portfolio)
groups = np.random.randint(n_groups, size=n_samples)
x = np.random.randn(self.n_samples, n_portfolio)
groups = np.random.randint(self.n_groups, size=self.n_samples)
calc_weights = percent_build(x, p_included, groups)
calc_weights = percent_build(x, self.p_included, groups)
grouped_ordering = pd.DataFrame(-x).groupby(groups).rank()
grouped_count = pd.DataFrame(-x).groupby(groups).transform(lambda x: x.count())
expected_weights = np.zeros((len(x), n_portfolio))
n_include = (grouped_count * p_included).astype(int)
n_include = (grouped_count * self.p_included).astype(int)
masks = (grouped_ordering <= n_include).values
for j in range(x.shape[1]):
expected_weights[masks[:, j], j] = 1.
......
......@@ -13,20 +13,20 @@ from alphamind.portfolio.rankbuilder import rank_build
class TestRankBuild(unittest.TestCase):
def test_rank_build(self):
n_samples = 3000
n_included = 300
n_portfolios = range(1, 10)
def setUp(self):
self.n_samples = 3000
self.n_included = 300
self.n_groups = 30
self.n_portfolio = range(1, 10)
for n_portfolio in n_portfolios:
x = np.random.randn(n_samples, n_portfolio)
def test_rank_build(self):
for n_portfolio in self.n_portfolio:
x = np.random.randn(self.n_samples, n_portfolio)
calc_weights = rank_build(x, n_included)
calc_weights = rank_build(x, self.n_included)
expected_weights = np.zeros((len(x), n_portfolio))
masks = (-x).argsort(axis=0).argsort(axis=0) < n_included
masks = (-x).argsort(axis=0).argsort(axis=0) < self.n_included
for j in range(x.shape[1]):
expected_weights[masks[:, j], j] = 1.
......@@ -34,17 +34,12 @@ class TestRankBuild(unittest.TestCase):
np.testing.assert_array_almost_equal(calc_weights, expected_weights)
def test_rank_build_with_group(self):
n_include = int(self.n_included / self.n_groups)
n_samples = 3000
n_include = 10
n_groups = 30
n_portfolios = range(1, 10)
for n_portfolio in n_portfolios:
for n_portfolio in self.n_portfolio:
x = np.random.randn(n_samples, n_portfolio)
groups = np.random.randint(n_groups, size=n_samples)
x = np.random.randn(self.n_samples, n_portfolio)
groups = np.random.randint(self.n_groups, size=self.n_samples)
calc_weights = rank_build(x, n_include, groups)
......
......@@ -14,53 +14,48 @@ from alphamind.settlement.simplesettle import simple_settle
class TestSimpleSettle(unittest.TestCase):
def test_simples_settle(self):
n_samples = 3000
n_portfolio = 3
weights = np.random.randn(n_samples, n_portfolio)
ret_series = np.random.randn(n_samples)
def setUp(self):
self.n_samples = 3000
self.n_portfolio = 3
self.n_groups = 30
self.weights = np.random.randn(self.n_samples,
self.n_portfolio)
self.ret_series = np.random.randn(self.n_samples)
self.groups = np.random.randint(self.n_groups, size=self.n_samples)
calc_ret = simple_settle(weights, ret_series)
def test_simples_settle(self):
calc_ret = simple_settle(self.weights, self.ret_series)
ret_series.shape = -1, 1
expected_ret = (weights * ret_series).sum(axis=0)
ret_series = self.ret_series.reshape((-1, 1))
expected_ret = (self.weights * ret_series).sum(axis=0)
np.testing.assert_array_almost_equal(calc_ret, expected_ret)
ret_series = np.random.randn(n_samples, 1)
ret_series = np.random.randn(self.n_samples, 1)
calc_ret = simple_settle(weights, ret_series)
calc_ret = simple_settle(self.weights, ret_series)
expected_ret = (weights * ret_series).sum(axis=0)
expected_ret = (self.weights * ret_series).sum(axis=0)
np.testing.assert_array_almost_equal(calc_ret, expected_ret)
def test_simple_settle_with_group(self):
n_samples = 3000
n_portfolio = 3
n_groups = 30
weights = np.random.randn(n_samples, n_portfolio)
ret_series = np.random.randn(n_samples)
groups = np.random.randint(n_groups, size=n_samples)
calc_ret = simple_settle(weights, ret_series, groups)
calc_ret = simple_settle(self.weights, self.ret_series, self.groups)
ret_series.shape = -1, 1
ret_mat = weights * ret_series
expected_ret = pd.DataFrame(ret_mat).groupby(groups).sum().values
ret_series = self.ret_series.reshape((-1, 1))
ret_mat = self.weights * ret_series
expected_ret = pd.DataFrame(ret_mat).groupby(self.groups).sum().values
np.testing.assert_array_almost_equal(calc_ret, expected_ret)
ret_series = np.random.randn(n_samples, 1)
ret_series = np.random.randn(self.n_samples, 1)
calc_ret = simple_settle(weights, ret_series, groups)
calc_ret = simple_settle(self.weights, ret_series, self.groups)
ret_mat = weights * ret_series
expected_ret = pd.DataFrame(ret_mat).groupby(groups).sum().values
ret_mat = self.weights * ret_series
expected_ret = pd.DataFrame(ret_mat).groupby(self.groups).sum().values
np.testing.assert_array_almost_equal(calc_ret, expected_ret)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment