Commit 1112f970 authored by Dr.李's avatar Dr.李

update factor analysis

parent 5c4eef61
......@@ -7,8 +7,11 @@ Created on 2017-5-25
from typing import Optional
from typing import List
from typing import Tuple
import numpy as np
import pandas as pd
from alphamind.data.standardize import standardize
from alphamind.data.winsorize import winsorize_normal
from alphamind.data.neutralize import neutralize
from alphamind.portfolio.longshortbulder import long_short_build
from alphamind.portfolio.rankbuilder import rank_build
......@@ -56,6 +59,7 @@ class FDataPack(object):
def __init__(self,
raw_factor: np.ndarray,
d1returns,
factor_name: str=None,
codes: List=None,
groups: Optional[np.ndarray]=None,
......@@ -64,19 +68,52 @@ class FDataPack(object):
risk_names: List[str]=None):
self.raw_factor = raw_factor
self.d1returns = d1returns.flatten()
if factor_name:
self.factor_name = factor_name
else:
self.factor_name = 'factor'
self.codes = codes
self.groups = groups
self.benchmark = benchmark
self.groups = groups.flatten()
self.benchmark = benchmark.flatten()
self.risk_exp = risk_exp
self.risk_names = risk_names
def benchmark_risk_exp(self) -> np.ndarray:
return self.risk_exp @ self.benchmark
def settle(self, weights: np.ndarray) -> pd.DataFrame:
weights = weights.flatten()
if self.benchmark is not None:
net_pos = weights - self.benchmark
else:
net_pos = weights
ret_arr = net_pos * self.d1returns
if self.groups is not None:
ret_agg = pd.Series(ret_arr).groupby(self.groups).sum()
ret_agg.loc['total'] = ret_agg.sum()
else:
ret_agg = pd.Series(ret_arr.sum(), index=['total'])
ret_agg.index.name = 'industry'
ret_agg.name = 'er'
pos_table = pd.DataFrame(net_pos, columns=[self.factor_name])
pos_table['ret'] = self.d1returns
if self.groups is not None:
ic_table = pos_table.groupby(self.groups).corr()['ret'].loc[(slice(None), self.factor_name)]
ic_table.loc['total'] = pos_table.corr().iloc[0, 1]
else:
ic_table = pd.Series(pos_table.corr().iloc[0, 1], index=['total'])
return pd.DataFrame({'er': ret_agg.values,
'ic': ic_table.values},
index=ret_agg.index)
def factor_processing(self, pre_process) -> np.ndarray:
if self.risk_exp is None:
......@@ -89,7 +126,7 @@ class FDataPack(object):
def to_df(self) -> pd.DataFrame:
cols = [self.factor_name]
to_concat = [self.raw_factor]
to_concat = [self.raw_factor.reshape((-1, 1))]
if self.groups is not None:
cols.append('groups')
......@@ -97,7 +134,7 @@ class FDataPack(object):
if self.benchmark is not None:
cols.append('benchmark')
to_concat.append(self.benchmark)
to_concat.append(self.benchmark.reshape(-1, 1))
if self.risk_exp is not None:
cols.extend(self.risk_names)
......@@ -108,14 +145,61 @@ class FDataPack(object):
index=self.codes)
def factor_analysis(factor_values,
industry,
d1returns,
detail_analysis=True,
benchmark: Optional[np.ndarray]=None,
risk_exp: Optional[np.ndarray]=None) -> Tuple[np.ndarray, Optional[pd.DataFrame]]:
data_pack = FDataPack(raw_factor=factor_values,
d1returns=d1returns,
groups=industry,
benchmark=benchmark,
risk_exp=risk_exp)
processed_data = data_pack.factor_processing([winsorize_normal, standardize])
if benchmark is not None and risk_exp is not None:
# using linear programming portfolio builder
benchmark = benchmark.flatten()
lbound = 0.
ubound = 0.01 + benchmark
risk_lbound = benchmark @ risk_exp
risk_ubound = benchmark @ risk_exp
weights = build_portfolio(processed_data,
builder='linear',
risk_exposure=risk_exp,
lbound=lbound,
ubound=ubound,
risk_target=(risk_lbound, risk_ubound),
solver='GLPK')
else:
# using rank builder
weights = build_portfolio(processed_data,
builder='rank',
use_rank=100) / 100.
if detail_analysis:
analysis = data_pack.settle(weights)
else:
analysis = None
return weights, analysis
if __name__ == '__main__':
raw_factor = np.random.randn(1000, 1)
d1returns = np.random.randn(1000, 1)
groups = np.random.randint(30, size=1000)
benchmark = np.random.randn(1000, 1)
risk_exp = np.random.randn(1000, 3)
codes = list(range(1, 1001))
data_pack = FDataPack(raw_factor,
d1returns,
'cfinc1',
codes=codes,
groups=groups,
......@@ -123,7 +207,8 @@ if __name__ == '__main__':
risk_exp=risk_exp,
risk_names=['market', 'size', 'growth'])
print(data_pack.to_df())
weights = np.random.randn(1000)
print(data_pack.settle(weights))
......@@ -12,6 +12,7 @@ from alphamind.data.winsorize import winsorize_normal
from alphamind.data.standardize import standardize
from alphamind.data.neutralize import neutralize
from alphamind.analysis.factoranalysis import factor_processing
from alphamind.analysis.factoranalysis import factor_analysis
class TestFactorAnalysis(unittest.TestCase):
......@@ -19,6 +20,7 @@ class TestFactorAnalysis(unittest.TestCase):
def setUp(self):
self.raw_factor = np.random.randn(1000, 1)
self.risk_factor = np.random.randn(1000, 3)
self.d1returns = np.random.randn(1000, 1)
def test_factor_processing(self):
new_factor = factor_processing(self.raw_factor)
......@@ -36,6 +38,20 @@ class TestFactorAnalysis(unittest.TestCase):
np.testing.assert_array_almost_equal(new_factor, neutralize(self.risk_factor,
winsorize_normal(standardize(self.raw_factor))))
def test_factor_analysis(self):
benchmark = np.random.randint(50, size=1000)
benchmark = benchmark / benchmark.sum()
industry = np.random.randint(30, size=1000)
weight, analysis_table = factor_analysis(self.raw_factor,
d1returns=self.d1returns,
industry=industry,
benchmark=benchmark,
risk_exp=self.risk_factor)
self.assertEqual(analysis_table['er'].sum() / analysis_table['er'][-1], 2.0)
np.testing.assert_array_almost_equal(weight @ self.risk_factor, benchmark @ self.risk_factor)
self.assertTrue((weight @ self.d1returns)[0] > (benchmark @ self.d1returns)[0])
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment