Commit 076d95d3 authored by Dr.李's avatar Dr.李

made factor analysis workable with several factors

parent 4363faf1
......@@ -17,22 +17,23 @@ from alphamind.portfolio.longshortbulder import long_short_build
from alphamind.portfolio.rankbuilder import rank_build
from alphamind.portfolio.percentbuilder import percent_build
from alphamind.portfolio.linearbuilder import linear_build
from alphamind.portfolio.meanvariancebuilder import mean_variance_builder
def factor_processing(raw_factor: np.ndarray,
def factor_processing(raw_factors: np.ndarray,
pre_process: Optional[List]=None,
risk_factors: Optional[np.ndarray]=None) -> np.ndarray:
new_factor = raw_factor
new_factors = raw_factors
if pre_process:
for p in pre_process:
new_factor = p(new_factor)
new_factors = p(new_factors)
if risk_factors is not None:
new_factor = neutralize(risk_factors, new_factor)
new_factors = neutralize(risk_factors, new_factors)
return new_factor
return new_factors
def build_portfolio(er: np.ndarray,
......@@ -53,26 +54,32 @@ def build_portfolio(er: np.ndarray,
raise ValueError('linear programming optimizer in status: {0}'.format(status))
else:
return weight
elif builder == 'mean_variance' or builder == 'mv':
status, _, weight = mean_variance_builder(er, **kwargs)
if status != 'optimal':
raise ValueError('mean variance optimizer in status: {0}'.format(status))
else:
return weight
class FDataPack(object):
def __init__(self,
raw_factor: np.ndarray,
raw_factors: np.ndarray,
d1returns,
factor_name: str=None,
factor_names: List[str]=None,
codes: List=None,
groups: Optional[np.ndarray]=None,
benchmark: Optional[np.ndarray]=None,
risk_exp: Optional[np.ndarray]=None,
risk_names: List[str]=None):
self.raw_factor = raw_factor.reshape((-1, 1))
self.raw_factors = raw_factors
self.d1returns = d1returns.flatten()
if factor_name:
self.factor_name = factor_name
if factor_names:
self.factor_names = factor_names
else:
self.factor_name = 'factor'
self.factor_names = ['factor' + str(i) for i in range(raw_factors.shape[1])]
self.codes = codes
self.groups = groups.flatten()
if benchmark is not None:
......@@ -104,11 +111,11 @@ class FDataPack(object):
ret_agg.index.name = 'industry'
ret_agg.name = 'er'
pos_table = pd.DataFrame(net_pos, columns=[self.factor_name])
pos_table = pd.DataFrame(net_pos, columns=['weight'])
pos_table['ret'] = self.d1returns
if self.groups is not None:
ic_table = pos_table.groupby(self.groups).corr()['ret'].loc[(slice(None), self.factor_name)]
ic_table = pos_table.groupby(self.groups).corr()['ret'].loc[(slice(None), 'weight')]
ic_table.loc['total'] = pos_table.corr().iloc[0, 1]
else:
ic_table = pd.Series(pos_table.corr().iloc[0, 1], index=['total'])
......@@ -120,16 +127,16 @@ class FDataPack(object):
def factor_processing(self, pre_process) -> np.ndarray:
if self.risk_exp is None:
return factor_processing(self.raw_factor,
return factor_processing(self.raw_factors,
pre_process)
else:
return factor_processing(self.raw_factor,
return factor_processing(self.raw_factors,
pre_process,
self.risk_exp)
def to_df(self) -> pd.DataFrame:
cols = [self.factor_name]
to_concat = [self.raw_factor.reshape((-1, 1))]
cols = self.factor_names
to_concat = [self.raw_factors.copy()]
if self.groups is not None:
cols.append('groups')
......@@ -148,7 +155,8 @@ class FDataPack(object):
index=self.codes)
def factor_analysis(factors: pd.Series,
def factor_analysis(factors: pd.DataFrame,
factor_weights: np.ndarray,
industry: np.ndarray,
d1returns: np.ndarray,
detail_analysis=True,
......@@ -156,13 +164,13 @@ def factor_analysis(factors: pd.Series,
risk_exp: Optional[np.ndarray]=None,
is_tradable: Optional[np.ndarray]=None) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
data_pack = FDataPack(raw_factor=factors.values,
data_pack = FDataPack(raw_factors=factors.values,
d1returns=d1returns,
groups=industry,
benchmark=benchmark,
risk_exp=risk_exp)
processed_data = data_pack.factor_processing([winsorize_normal, standardize])
er = data_pack.factor_processing([winsorize_normal, standardize]) @ factor_weights
if benchmark is not None and risk_exp is not None:
# using linear programming portfolio builder
......@@ -176,7 +184,7 @@ def factor_analysis(factors: pd.Series,
risk_lbound = benchmark @ risk_exp
risk_ubound = benchmark @ risk_exp
weights = build_portfolio(processed_data,
weights = build_portfolio(er,
builder='linear',
risk_exposure=risk_exp,
lbound=lbound,
......@@ -186,7 +194,7 @@ def factor_analysis(factors: pd.Series,
else:
# using rank builder
weights = build_portfolio(processed_data,
weights = build_portfolio(er,
builder='rank',
use_rank=100) / 100.
......
......@@ -42,9 +42,11 @@ class TestFactorAnalysis(unittest.TestCase):
benchmark = benchmark / benchmark.sum()
industry = np.random.randint(30, size=1000)
factor_series = pd.Series(self.raw_factor.flatten(), index=range(len(self.raw_factor)))
factor_df = pd.DataFrame(self.raw_factor.flatten(), index=range(len(self.raw_factor)))
factor_weights = np.array([1.])
weight_table, analysis_table = factor_analysis(factor_series,
weight_table, analysis_table = factor_analysis(factor_df,
factor_weights,
d1returns=self.d1returns,
industry=industry,
benchmark=benchmark,
......@@ -54,7 +56,25 @@ class TestFactorAnalysis(unittest.TestCase):
self.assertEqual(analysis_table['er'].sum() / analysis_table['er'][-1], 2.0)
np.testing.assert_array_almost_equal(weight @ self.risk_factor, benchmark @ self.risk_factor)
self.assertTrue(weight @ factor_series.values > benchmark @ factor_series.values)
self.assertTrue(weight @ factor_df.values > benchmark @ factor_df.values)
def test_factor_analysis_with_several_factors(self):
benchmark = np.random.randint(50, size=1000)
benchmark = benchmark / benchmark.sum()
industry = np.random.randint(30, size=1000)
factor_df = pd.DataFrame(np.random.randn(1000, 2), index=range(len(self.raw_factor)))
factor_weights = np.array([0.2, 0.8])
weight_table, analysis_table = factor_analysis(factor_df,
factor_weights,
d1returns=self.d1returns,
industry=industry,
benchmark=benchmark,
risk_exp=self.risk_factor)
weight = weight_table.weight
self.assertEqual(analysis_table['er'].sum() / analysis_table['er'][-1], 2.0)
np.testing.assert_array_almost_equal(weight @ self.risk_factor, benchmark @ self.risk_factor)
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment