Commit 2c94271e authored by Dr.李's avatar Dr.李

added linear build with turn over constraint

parent 884ccda2
...@@ -16,7 +16,7 @@ def linear_build(er: np.ndarray, ...@@ -16,7 +16,7 @@ def linear_build(er: np.ndarray,
ubound: Union[np.ndarray, float], ubound: Union[np.ndarray, float],
risk_constraints: np.ndarray, risk_constraints: np.ndarray,
risk_target: Tuple[np.ndarray, np.ndarray]) -> Tuple[str, np.ndarray, np.ndarray]: risk_target: Tuple[np.ndarray, np.ndarray]) -> Tuple[str, np.ndarray, np.ndarray]:
er = er.flatten()
n, m = risk_constraints.shape n, m = risk_constraints.shape
if not risk_target: if not risk_target:
...@@ -25,8 +25,9 @@ def linear_build(er: np.ndarray, ...@@ -25,8 +25,9 @@ def linear_build(er: np.ndarray,
cons_matrix = np.concatenate((risk_constraints.T, risk_lbound.reshape((-1, 1)), risk_ubound.reshape((-1, 1))), cons_matrix = np.concatenate((risk_constraints.T, risk_lbound.reshape((-1, 1)), risk_ubound.reshape((-1, 1))),
axis=1) axis=1)
else: else:
cons_matrix = np.concatenate((risk_constraints.T, risk_target[0].reshape((-1, 1)), risk_target[1].reshape((-1, 1))), cons_matrix = np.concatenate(
axis=1) (risk_constraints.T, risk_target[0].reshape((-1, 1)), risk_target[1].reshape((-1, 1))),
axis=1)
if isinstance(lbound, float): if isinstance(lbound, float):
lbound = np.ones(n) * lbound lbound = np.ones(n) * lbound
...@@ -44,22 +45,94 @@ def linear_build(er: np.ndarray, ...@@ -44,22 +45,94 @@ def linear_build(er: np.ndarray,
return status, opt.feval(), opt.x_value() return status, opt.feval(), opt.x_value()
if __name__ == '__main__': def linear_build_with_to_constraint(er: np.ndarray,
n = 200 lbound: Union[np.ndarray, float],
lb = np.zeros(n) ubound: Union[np.ndarray, float],
ub = 0.01 * np.ones(n) risk_constraints: np.ndarray,
er = np.random.randn(n) risk_target: Tuple[np.ndarray, np.ndarray],
turn_over_target: float,
current_position: np.ndarray):
er = er.flatten()
current_position = current_position.reshape((-1, 1))
n, m = risk_constraints.shape
if not risk_target:
risk_lbound = -np.inf * np.ones((m, 1))
risk_ubound = np.inf * np.ones((m, 1))
else:
risk_lbound = risk_target[0].reshape((-1, 1))
risk_ubound = risk_target[1].reshape((-1, 1))
if isinstance(lbound, float):
lbound = np.ones(n) * lbound
if isinstance(ubound, float):
ubound = np.ones(n) * ubound
# we need to expand bounded condition and constraint matrix to handle L1 bound
lbound = np.concatenate((lbound, np.zeros(n)), axis=0)
ubound = np.concatenate((ubound, np.inf * np.ones(n)), axis=0)
cons = np.zeros((2, n+2)) risk_lbound = np.concatenate((risk_lbound, [[0.]]), axis=0)
cons[0] = np.ones(n+2) risk_ubound = np.concatenate((risk_ubound, [[turn_over_target]]), axis=0)
cons[1][0] = 1.
cons[1][1] = 1.
cons[1][-2] = 0.015
cons[1][-1] = 0.015
opt = LPOptimizer(cons, lb, ub, er) risk_constraints = np.concatenate((risk_constraints.T, np.zeros((m, n))), axis=1)
print(opt.status()) er = np.concatenate((er, np.zeros(n)), axis=0)
x = opt.x_value() turn_over_row = np.zeros(2 * n)
print(x[0], x[1]) turn_over_row[n:] = 1.
risk_constraints = np.concatenate((risk_constraints, [turn_over_row]), axis=0)
turn_over_matrix = np.zeros((2 * n, 2 * n))
for i in range(n):
turn_over_matrix[i, i] = 1.
turn_over_matrix[i, i + n] = -1.
turn_over_matrix[i + n, i] = 1.
turn_over_matrix[i + n, i + n] = 1.
risk_constraints = np.concatenate((risk_constraints, turn_over_matrix), axis=0)
risk_lbound = np.concatenate((risk_lbound, -np.inf * np.ones((n, 1))), axis=0)
risk_lbound = np.concatenate((risk_lbound, current_position), axis=0)
risk_ubound = np.concatenate((risk_ubound, current_position), axis=0)
risk_ubound = np.concatenate((risk_ubound, np.inf * np.ones((n, 1))), axis=0)
cons_matrix = np.concatenate((risk_constraints, risk_lbound, risk_ubound), axis=1)
opt = LPOptimizer(cons_matrix, lbound, ubound, -er)
status = opt.status()
if status == 0:
status = 'optimal'
return status, opt.feval(), opt.x_value()[:n]
if __name__ == '__main__':
n = 5
lb = np.zeros(n)
ub = 4. / n * np.ones(n)
er = np.random.randn(n)
current_pos = np.random.randint(0, n, size=n)
current_pos = current_pos / current_pos.sum()
turn_over_target = 0.1
cons = np.ones((n, 1))
risk_lbound = np.ones(1)
risk_ubound = np.ones(1)
status, fvalue, x_values = linear_build_with_to_constraint(er,
lb,
ub,
cons,
(risk_lbound, risk_ubound),
turn_over_target,
current_pos)
print(status)
print(fvalue)
print(x_values)
print(current_pos)
print(np.abs(x_values - current_pos).sum())
...@@ -8,15 +8,17 @@ Created on 2017-5-5 ...@@ -8,15 +8,17 @@ Created on 2017-5-5
import unittest import unittest
import numpy as np import numpy as np
from alphamind.portfolio.linearbuilder import linear_build from alphamind.portfolio.linearbuilder import linear_build
from alphamind.portfolio.linearbuilder import linear_build_with_to_constraint
class TestLinearBuild(unittest.TestCase): class TestLinearBuild(unittest.TestCase):
def setUp(self): def setUp(self):
self.er = np.random.randn(3000) self.er = np.random.randn(3000)
self.risk_exp = np.random.randn(3000, 30) self.risk_exp = np.random.randn(3000, 30)
self.risk_exp = np.concatenate([self.risk_exp, np.ones((3000, 1))], axis=1) self.risk_exp = np.concatenate([self.risk_exp, np.ones((3000, 1))], axis=1)
self.bm = np.random.randint(100, size=3000).astype(float) self.bm = np.random.randint(100, size=3000).astype(float)
self.current_pos = np.random.randint(0, 100, size=3000)
self.current_pos = self.current_pos / self.current_pos.sum()
def test_linear_build(self): def test_linear_build(self):
bm = self.bm / self.bm.sum() bm = self.bm / self.bm.sum()
...@@ -32,7 +34,7 @@ class TestLinearBuild(unittest.TestCase): ...@@ -32,7 +34,7 @@ class TestLinearBuild(unittest.TestCase):
self.assertTrue(np.all(w <= 0.01 + eplson)) self.assertTrue(np.all(w <= 0.01 + eplson))
self.assertTrue(np.all(w >= -eplson)) self.assertTrue(np.all(w >= -eplson))
calc_risk = (w - bm) @ self. risk_exp calc_risk = (w - bm) @ self.risk_exp
expected_risk = np.zeros(self.risk_exp.shape[1]) expected_risk = np.zeros(self.risk_exp.shape[1])
np.testing.assert_array_almost_equal(calc_risk, expected_risk) np.testing.assert_array_almost_equal(calc_risk, expected_risk)
...@@ -61,6 +63,35 @@ class TestLinearBuild(unittest.TestCase): ...@@ -61,6 +63,35 @@ class TestLinearBuild(unittest.TestCase):
calc_risk = (w - bm) @ self.risk_exp / np.abs(bm @ self.risk_exp) calc_risk = (w - bm) @ self.risk_exp / np.abs(bm @ self.risk_exp)
self.assertTrue(np.all(np.abs(calc_risk) <= 1.0001e-2)) self.assertTrue(np.all(np.abs(calc_risk) <= 1.0001e-2))
def test_linear_build_with_to_constraint(self):
bm = self.bm / self.bm.sum()
eplson = 1e-6
turn_over_target = 0.1
risk_lbound = bm @ self.risk_exp
risk_ubound = bm @ self.risk_exp
risk_tolerance = 0.01 * np.abs(risk_lbound[:-1])
risk_lbound[:-1] = risk_lbound[:-1] - risk_tolerance
risk_ubound[:-1] = risk_ubound[:-1] + risk_tolerance
status, _, w = linear_build_with_to_constraint(self.er,
0.,
0.01,
self.risk_exp,
risk_target=(risk_lbound, risk_ubound),
turn_over_target=turn_over_target,
current_position=self.current_pos)
self.assertEqual(status, 'optimal')
self.assertAlmostEqual(np.sum(w), 1.)
self.assertTrue(np.all(w <= 0.01 + eplson))
self.assertTrue(np.all(w >= -eplson))
self.assertAlmostEqual(np.abs(w - self.current_pos).sum(), turn_over_target)
calc_risk = (w - bm) @ self.risk_exp / np.abs(bm @ self.risk_exp)
self.assertTrue(np.all(np.abs(calc_risk) <= 1.0001e-2))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment