Commit 1dd35635 authored by Dr.李's avatar Dr.李

initial commit

parents
.ipynb_checkpoints
__pycache__
\ No newline at end of file
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> 在这个notebook中,我们测试UQER与TF的风险模型方差解释度"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"import numpy as np\n",
"import pandas as pd\n",
"from tf.CovCal import RiskMatrics\n",
"from tf.CovCal import BarraCov\n",
"from PyFin.api import advanceDateByCalendar"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 0. Data Preparation\n",
"------------------"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"risk_cov = pd.read_parquet('data/risk_cov.parqut').sort_values(['trade_date', 'FactorID']).set_index('trade_date')\n",
"risk_exp = pd.read_parquet('data/risk_exposure.parqut').sort_values(['trade_date', 'code']).set_index('trade_date')\n",
"specific_risk = pd.read_parquet('data/special_risk.parqut').sort_values(['trade_date', 'code']).set_index('trade_date')\n",
"risk_return = pd.read_parquet('data/risk_return.parqut').sort_values('trade_date').set_index('trade_date')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"risk_factors = risk_cov.columns[2:-1].tolist()\n",
"dates = risk_cov.index.unique().tolist()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. UQER v.s. TF Risk Model\n",
"--------------------"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res = pd.DataFrame(columns=['UQER', 'TF', 'avg. vol', '# of Stk.'])\n",
"\n",
"window = '250b'\n",
"\n",
"for sample_date in dates[250:]:\n",
" \n",
" ## Uqer Risk Model\n",
" sample_risk_cov = risk_cov.loc[sample_date:sample_date, risk_factors]\n",
" sample_risk_exp = risk_exp.loc[sample_date:sample_date, ['code'] + risk_factors]\n",
" sample_specific_risk = specific_risk.loc[sample_date:sample_date, ['code', 'SRISK']]\n",
"\n",
" sample_codes = set(sample_risk_exp.code).intersection(sample_specific_risk.code)\n",
" sample_risk_exp = sample_risk_exp.loc[sample_risk_exp.code.isin(sample_codes)]\n",
" sample_specific_risk = sample_specific_risk.loc[sample_specific_risk.code.isin(sample_codes)]\n",
"\n",
" sample_risk_exp = sample_risk_exp[risk_factors].values\n",
" sample_risk_cov = sample_risk_cov.values\n",
" sample_specific_risk = sample_specific_risk['SRISK'].values\n",
" \n",
" explained_cov = sample_risk_exp @ sample_risk_cov @ sample_risk_exp.T / 10000.\n",
" sec_cov = explained_cov + np.diag(sample_specific_risk ** 2) / 10000\n",
" \n",
" explained_var = np.diag(explained_cov)\n",
" sec_var = np.diag(sec_cov)\n",
" explained_per = np.mean(explained_var / sec_var)\n",
" avg_vol = np.mean(np.sqrt(sec_var))\n",
" num_stks = len(sample_codes)\n",
" \n",
" ## TF Risk Model\n",
" start_date = advanceDateByCalendar('china.sse', sample_date, period= '-' + window)\n",
" sample_risk_return = risk_return[start_date:sample_date]\n",
" ret_values = sample_risk_return[risk_factors].T\n",
" tf_risk_cov = BarraCov(ret_values, 90, 20) * 10000 * 250\n",
" tf_explained_cov = sample_risk_exp @ tf_risk_cov @ sample_risk_exp.T / 10000.\n",
" tf_explained_var = np.diag(tf_explained_cov)\n",
" tf_explained_per = np.mean(tf_explained_var / sec_var)\n",
" \n",
" ## Output\n",
" res.loc[sample_date, 'UQER'] = explained_per\n",
" res.loc[sample_date, 'UQER (std.)'] = np.std(explained_var / sec_var)\n",
" res.loc[sample_date, 'UQER (# > 100%)'] = np.sum((explained_var / sec_var >= 1.))\n",
" res.loc[sample_date, 'TF'] = tf_explained_per\n",
" res.loc[sample_date, 'TF (std.)'] = np.std(tf_explained_var / sec_var)\n",
" res.loc[sample_date, 'TF (# > 100%)'] = np.sum((tf_explained_var / sec_var >= 1.))\n",
" res.loc[sample_date, 'avg. vol'] = avg_vol\n",
" res.loc[sample_date, '# of Stk.'] = num_stks\n",
" print(f\"{sample_date}: UQER: {explained_per:.2f}, {np.std(explained_var / sec_var):.2f}, {np.sum((explained_var / sec_var >= 1.)):0d} \\\n",
" TF: {tf_explained_per:.2f}, {np.std(tf_explained_var / sec_var):.2f}, {np.sum((tf_explained_var / sec_var >= 1.)):0d}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res.to_excel('uqer_vs_tf.xlsx')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"res[['UQER', 'TF']].rolling(window=30).mean().plot(secondary_y='avg. vol', figsize=(16, 8))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
#!/usr/bin/env python
# visit http://tool.lu/pyc/ for more information
'''
Created on Mon Oct 9 10:24:02 2017
协方差矩阵
@author: hanjinyang
'''
import pandas as pd
import numpy as np
from .PMBaseFunc import expweight, newey_west_adj
import statsmodels.api as sm
848316
def RiskMatrics(X, decay = 0.94):
'''RiskMetrics Estimator
Parameters
----------
X \xef\xbc\x9aarray-like ,shape(n_features,n_samples)
decay : decay factor
Returns
-------
cov_matrix : array-like, shape (n_features, n _features)
'''
(n_features, n_samples) = X.shape
exp_weight = [decay**(n_samples-i) for i in range(1,n_samples+1)]
exp_weight = np.array(exp_weight) / np.sum(exp_weight)
weights_mat = pd.DataFrame(exp_weight * np.ones([
n_features,
n_samples]))
weights_mat.index = X.index
weights_mat.columns = X.columns
weights_mat[np.isnan(X)] = np.nan
weights_mat = (weights_mat.T / weights_mat.sum(axis = 1)).T
x_demean = (X.T - np.sum(X * weights_mat, axis = 1)).T
riskmatrics = np.dot((x_demean * weights_mat).fillna(0), x_demean.fillna(0).T)
return riskmatrics
def BarraCov(X, halflife_corr, halflife_vol, Lnw_c = None, Lnw_v = None):
'''Barra Covariance
Estimate covariance matrix via volatilities and correlations
Parameters
----------
X \xef\xbc\x9aarray-like ,shape(n_features,n_samples)
halflife_corr : half-life for correlations
Lnw_c : Newey-West C=correlation Lags
halflife_vol : half-life for volatility
Lnw_v : Newey-West volatility lags
Returns
-------
cov_matrix : array-like, shape (n_features, n _features)
Noes
----
cov_matrix = std*corr*std
'''
weight_adj = expweight(halflife_corr, X.shape[1])
nw_cov = newey_west_adj(X.T, Lnw_c, aweights = weight_adj)
std = np.sqrt(np.diag(nw_cov))
corrcoef = np.mat(np.diag(1 / std)) * np.mat(nw_cov) * np.mat(np.diag(1 / std)).T
weight_adj = expweight(halflife_vol, X.shape[1])
nw_vol = newey_west_adj(X.T, Lnw_v, aweights = weight_adj)
std_vol = np.sqrt(np.diag(nw_vol))
cov_matrix = np.array(np.mat(np.diag(std_vol)) * np.mat(corrcoef) * np.mat(np.diag(std_vol)).T)
return cov_matrix
def weighted_vol(X, halflife_vol, halflife_nw, Lnw_v = None):
'''Barra Covariance
Estimate covariance matrix via volatilities and correlations
Parameters
----------
X \xef\xbc\x9aarray-like ,shape(n_features,n_samples)
halflife_vol : half-life for volatility
Lnw_v : Newey-West volatility lags
halflife_nw : Newey-West Auto-Correlation Half-life
Returns
-------
nw_sp_vol : array-like, shape (n_features, )
'''
(n_features, n_samples) = X.shape
lambda_vol = 0.5 ** (1 / halflife_vol)
weight_ = [lambda_vol**(n_samples-i) for i in range(1,n_samples+1)]
weight_m = pd.DataFrame(data = [weight_], index = X.index, columns = X.columns)
weight_m[X.applymap(np.isnan)] = np.nan
weight_adj = weight_m.T / weight_m.sum(axis = 1)
nw_sp_vol = pd.Series(index = range(X.shape[0]))
for i in range(X.shape[0]):
temp_res = X.iloc[i, :].dropna()
if len(temp_res) < 50:
nw_sp_vol_ = np.nan
else:
nw_sp_vol_ = newey_west_adj(temp_res, Lnw_v, halflife_rho = halflife_nw, aweights = weight_adj.iloc[:, i].dropna())
nw_sp_vol[i] = nw_sp_vol_
return nw_sp_vol
def betareg(x, y):
y.name = 'y'
xy = pd.concat([
x,
y], axis = 1).dropna()
x = sm.add_constant(xy.drop('y', axis = 1))
y = xy['y']
model = sm.OLS(y, x).fit()
coef = model.params
stderr = model.bse
idiosyn = model.resid.var()
resid = model.resid
return (coef, stderr, idiosyn, resid)
def betaest(Y, X):
'''Single Index
Estimate beta using OLS regression
Parameters
----------
Y \xef\xbc\x9aarray-like ,shape(n_features,n_samples)
Ym : market ret
Returns
-------
beta : array-like, shape(n_features,)
sigma : idiosyncratic risk , array_like, shape(n_features,n_features)
stderr : array-like, shape(n_features,)
'''
regresults = (Y.apply,)((lambda j: betareg(X, j)), axis = 1)
beta = regresults.apply((lambda x: x[0]))
stderr = regresults.apply((lambda x: x[1]))
idiosyn = regresults.apply((lambda x: x[2]))
resid = regresults.apply((lambda x: x[3]))
return (beta, idiosyn, stderr, resid)
def betaest2(Y, X):
'''Single Index
Estimate beta using OLS regression
Parameters
----------
Y \xef\xbc\x9aarray-like ,shape(n_features,n_samples)
Ym : market ret
Returns
-------
beta : array-like, shape(n_features,)
sigma : idiosyncratic risk , array_like, shape(n_features,n_features)
stderr : array-like, shape(n_features,)
'''
regresults = (Y.apply,)((lambda j: betareg(X[['Mkt',j.name]].rename(columns = {j.name: 'Indu' }), j)), axis = 1)
beta = regresults.apply((lambda x: x[0]))
stderr = regresults.apply((lambda x: x[1]))
idiosyn = regresults.apply((lambda x: x[2]))
resid = regresults.apply((lambda x: x[3]))
return (beta, idiosyn, stderr, resid)
def LedoitWolfCOV(Y, F = 'Constant'):
"""LedoitWolf Estimator
Ledoit-Wolf is a particular form of shrinkage, where the shrinkage
coefficient is computed using O. Ledoit and M. Wolf's formula as
described in Ledoit & Wolf (2003) and Ledoit & Wolf (2004).
Parameters
----------
Y : array-like ,shape(n_features,n_samples)
F : array_like, shape(n_features,n_features)
Notes
-----
The regularised covariance is:
shrinkage*F+(1-shrinkage)*S
where S is the sample covariance matrix and F is a highly strutured estimator.
shrinkage = max{0,min{k/T,1}}
where k = (pi-rho)/gamma
References
----------
Ledoit O, Wolf M. Improved estimation of the covariance matrix of stock
returns with an application to portfolio selection[J].
Journal of Empirical Finance, 2003, 10(5):603-621.
Ledoit O, Wolf M. Honey, I Shrunk the Sample Covariance Matrix[J].
Social Science Electronic Publishing, 2004, 30(4):p\xc3\xa1gs. 110-119.
"""
(n_features, n_samples) = Y.shape
y_i = Y.mean(axis = 1)
Y_ = (Y.T - y_i).T
trd = 1 * ~np.isnan(Y)
T_mat = np.dot(trd, trd.T)
S = np.dot(Y_.fillna(0), Y_.fillna(0).T) / T_mat
vol_mean = np.nanmean(S[np.identity(n_features) == 1])
S[(np.isnan(S) | (np.abs(S) < 1e-08)) & (np.identity(n_features) == 1)] = vol_mean
S[np.isnan(S)] = 0
X = Y.fillna(0)
S = X.T.cov() * (n_samples - 1) / n_samples
S_diag = np.diag(S)
S_diag_m = (np.ones([
n_features,
n_features]) * S_diag).T
r = (1 / np.sqrt(S_diag_m)) * S * (1 / np.sqrt(S_diag_m)).T
r_avg = (r.sum().sum() - n_features) / n_features / (n_features - 1)
Y2 = Y_ ** 2
Y3 = Y_ ** 3
S2 = S ** 2
Y2_cov = np.dot(Y2.fillna(0), Y2.fillna(0).T)
Y_cov = np.dot(Y_.fillna(0), Y_.fillna(0).T)
pi_m = ((1 / T_mat) * Y2_cov - (2 / T_mat) * Y_cov * S) + S2
pi = np.nansum(pi_m)
S_ij = S_diag_m / S_diag_m.T
S_ji = S_diag_m.T / S_diag_m
theta_ii = ((1 / T_mat) * np.dot(Y3.fillna(0), Y_.fillna(0).T) - np.repeat(np.reshape(np.array(Y2.mean(axis = 1)),
[n_features, 1]), n_features, axis = 1) * S - (1 / T_mat) * np.dot(Y_.fillna(0), Y_.fillna(0).T) * S_diag_m) + S_diag_m * S
theta_jj = ((1 / T_mat) * np.dot(Y_.fillna(0), Y3.fillna(0).T) - np.repeat(np.reshape(np.array(Y2.mean(axis = 1)),
[ n_features,1]), n_features, axis = 1).T * S - (1 / T_mat) * np.dot(Y_.fillna(0), Y_.fillna(0).T) * S_diag_m.T) + S_diag_m.T * S
rho2 = np.sqrt(S_ji) * theta_ii + np.sqrt(S_ij) * theta_jj
rho = np.nansum(np.diag(pi_m)) + (r_avg / 2) * (np.nansum(rho2) - np.nansum(np.diag(rho2)))
if F is 'Identity':
mu = np.diag(S).mean()
F = mu * np.identity(n_features)
elif F is 'Constant':
r_new = np.ones([
n_features,
n_features]) * r_avg
r_new[np.identity(n_features) == 1] = 1
std = np.diag(np.sqrt(np.diag(S)))
F = np.dot(np.dot(std, r_new), std)
elif F is 'Diag':
F = np.diag(S)
gamma = np.linalg.norm(F - S, 'fro') ** 2
k = (pi - rho) / gamma
shrinkage = max(0, min(k / n_samples, 1))
lw_cov = shrinkage * F + (1 - shrinkage) * S
return (shrinkage, lw_cov)
def RandomMatrixCOV(Y):
'''Random Matrix fltering Estimator
Parameters
----------
Y : array-like ,shape(n_features,n_samples)
References
----------
Laloux L, Cizeau P, Potters M, et al. RANDOM MATRIX THEORY AND FINANCIAL CORRELATIONS[J].
International Journal of Theoretical & Applied Finance, 2000, 03(03):-.
'''
(n_features, n_samples) = Y.shape
y_i = Y.mean(axis = 1)
Y_ = (Y.T - y_i).T
trd = 1 * ~np.isnan(Y)
T_mat = np.dot(trd, trd.T)
S = np.dot(Y_.fillna(0), Y_.fillna(0).T) / T_mat
vol_mean = np.nanmean(S[np.identity(n_features) == 1])
S[(np.isnan(S) | (np.abs(S) < 1e-08)) & (np.identity(n_features) == 1)] = vol_mean
S[np.isnan(S)] = 0
X = Y.fillna(0)
S = X.T.cov() * (n_samples - 1) / n_samples
S_diag = np.diag(S)
S_diag_m = (np.ones([
n_features,
n_features]) * S_diag).T
r = (1 / np.sqrt(S_diag_m)) * S * (1 / np.sqrt(S_diag_m)).T
(eigenvalue, eigenvector) = np.linalg.eigh(r)
Q = n_samples / n_features
eig_max = 1 + 1 / Q + 2 * np.sqrt(1 / Q)
k_list = list(np.nonzero(eigenvalue > eig_max)[0])
alpha = np.sum(eigenvalue[eigenvalue <= eig_max]) / n_features
r_rmt = np.dot(np.dot(eigenvector[:, k_list], np.diag(eigenvalue[k_list])), eigenvector[:, k_list].T) + alpha * np.identity(n_features)
rmt_cov = np.sqrt(S_diag_m) * r_rmt * np.sqrt(S_diag_m.T)
return rmt_cov
def fillnan_corr(V):
'''
V : array-like ,shape(n_features,n_features), correlation matrix
'''
n_features = V.shape[0]
V_nodiag = V.copy()
V_nodiag[np.identity(n_features) == 1] = np.nan
r_avg = np.nansum(np.nansum(V, axis = 1)) / ~np.isnan(V_nodiag).sum().sum()
V[np.isnan(V)] = r_avg
V[np.identity(n_features) == 1] = 1
V_transf = V.copy()
return V_transf
def eigenDist(C1, C2, flag = 'COV'):
'''Random Matrix fltering Estimator
Parameters
----------
V1,V2 : array-like ,shape(n_features,n_features), covariance matrix
References
----------
Liu, Lan. Portfolio risk measurement: the estimation of the covariance of stock returns.
Diss. University of Warwick, 2007.
'''
if flag == 'CORR':
s1 = np.diag(1 / np.sqrt(np.diag(C1)))
s2 = np.diag(1 / np.sqrt(np.diag(C2)))
V1 = np.dot(np.dot(s1, C1), s1)
V2 = np.dot(np.dot(s2, C2), s2)
if np.isnan(V1).sum().sum() > 0:
V1 = fillnan_corr(V1)
if np.isnan(V2).sum().sum() > 0:
V2 = fillnan_corr(V2)
elif flag == 'COV':
V1 = C1.copy()
V2 = C2.copy()
(w1, v1) = np.linalg.eigh(V1)
(w2, v2) = np.linalg.eigh(V2)
w1[np.abs(w1) < 1e-08] = 0
if np.linalg.matrix_rank(V1) == V1.shape[0]:
V = (np.mat(v1) * np.mat(np.diag(w1 ** -0.5))).T * np.mat(V2) * np.mat(v1) * np.mat(np.diag(w1 ** -0.5))
else:
eig_05 = np.linalg.pinv(np.diag(w1 ** 0.5))
V = (np.mat(v1) * eig_05).T * np.mat(V2) * np.mat(v1) * eig_05
(w, v) = np.linalg.eigh(V)
lambda_max = np.max(w[w > 0])
lambda_min = np.min(w[w > 1e-07])
dist = np.log(lambda_max / lambda_min)
return dist
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
Created on Fri Aug 4 15:53:44 2017
组合优化相关基础函数
@author: hanjinyang
"""
import numpy as np
import statsmodels.api as sm
import pymysql
import pandas as pd
import datetime
from dateutil.parser import parse
def isOutlier(x):
#MAD
me = x.median()
mad = (np.abs(x-me)).median()
up_outlier = me+3*1.4826*mad
#low_outlier = me-3*1.4826*mad
outlier = x> up_outlier
return outlier
def MADOutlier(x):
#MAD
me = x.median()
mad = (np.abs(x-me)).median()
up_outlier = me+3*1.4826*mad
low_outlier = me-3*1.4826*mad
return up_outlier,low_outlier
def replaceNone(x):
#替换None
if x==None:
return '21000101'
else:
return x
def get_dictvalue(key,thedict):
try:
return thedict[key]
except:
return np.nan
def find_fill(t,trddt,nan_list):
#为空缺值寻找填充值
t_ = t
lag = 1
while t_ in nan_list:
t_ = trddt[trddt.index(t_)-lag]
return t_
def code_fill(x):
#将股票数字代码转为Wind代码
x_str = str(x).zfill(6)
if x_str.startswith('0') or x_str.startswith('3'):
return '.'.join([x_str,'SZ'])
elif x_str.startswith('6'):
return '.'.join([x_str,'SH'])
else:
return np.nan
def weighted_cov(m1,m2,aweights,bias=False, ddof=None,mean=True):
"""
Compute weighted covariance matrix cov(x,y)
Parameters
----------
m1,m2 : (K x N)
aweights: (Nx1)
bias:
ddof:
mean:
Returns
-------
ndarray (K x K)
Reference
---------
np.cov
"""
if ddof is None:
if bias == 0:
ddof = 1
else:
ddof = 0
x = m1.copy()
y = m2.copy()
w = aweights
v1 = np.sum(w)
v2 = np.sum(w * aweights)
if mean:
#print(mean)
if len(x.shape)==2:
x = (x.T - np.sum(x * w, axis=1) / v1).T
#np.average(x, axis=1, weights=w)
y = (y.T - np.sum(y * w, axis=1) / v1).T
else:
x = (x.T - np.sum(x * w) / v1).T
#np.average(x, axis=1, weights=w)
y = (y.T - np.sum(y * w) / v1).T
cov = np.dot(x * w, y.T) * v1 / (v1**2 - ddof * v2)
return cov
def newey_west_adj(m, nw_lags_beta=None, halflife_rho=None, aweights=None):
"""
Compute Newey-West adjusted covariance matrix, taking into account
specified number of leads / lags
Parameters
----------
m : (N x K)
nw_lags_beta : int
halflife_rho : Newey-West Auto-Correlation Half-life, int
aweights: (Nx1)
Returns
-------
ndarray (K x K)
Reference
---------
Newey, W. K. & West, K. D. (1987) A Simple, Positive
Semi-definite, Heteroskedasticity and Autocorrelation Consistent
Covariance Matrix, Econometrica, vol. 55(3), 703-708
"""
N = len(m)
if aweights is None:
#print('aweights is NONE')
B = m - m.mean(0)
C = np.dot(B.T, B) / (N-1)
else:
w = aweights
v1 = np.sum(w)
if len(m.shape) == 2:
B = m - np.sum(m.T * w, axis=1) / v1
else:
B = m - np.sum(m.T * w) / v1
C = weighted_cov(B.T,B.T,aweights,mean=False)
if (halflife_rho is not None) and (nw_lags_beta is not None) and (len(m.shape)==1):
weights_ = expweight(halflife_rho,len(m)-1)
#weights = weights_/weights_.sum()*nw_lags_beta/2
rho = weighted_cov(m[:-1],m[1:],weights_)
if nw_lags_beta is not None:
for i in range(1,nw_lags_beta + 1):
if aweights is None:
#加权协方差矩阵
cov = np.dot(B[:-i].T, B[i:]) / (N-i)
else:
#print('Y')
cov = weighted_cov(B[:-i].T, B[i:].T, np.array(aweights[i:])/sum(aweights[i:]),mean=False)
if halflife_rho is None:
weight = i / (nw_lags_beta + 1)
C += 2 * (1 - weight) * cov
else:
#print('Y')
weight = rho**i
C += 2 * weight * cov
return C
def expweight(tau,nobs):
"""
Return a list of expoential weights characterized by half-life parameter tau.
Weights are standardized so that the sum of weights equals one.
Parameters
----------
tau : int
nobs : int
Returns
-------
list (nobs)
"""
lambda_ = 0.5**(1/tau)
#权重
expweight = [lambda_**(nobs-i) for i in range(1,nobs+1)]
#权重归一
weight_adj = np.array(expweight)/sum(expweight)
return weight_adj
def deweight(nobs):
"""
Return a list of weights.
Weights are standardized so that the sum of weights equals one.
Parameters
----------
nobs : int
Returns
-------
list (nobs)
"""
#权重
deweight = [i for i in range(1,nobs+1)]
#权重归一
weight_adj = np.array(deweight)/sum(deweight)
return weight_adj
"""
def wtdays(tstart,tend,period='Day'):
#
if type(tstart)== str:
tstart = parse(tstart).date()
if type(tend)== str:
tend = parse(tend).date()
host = '192.168.41.56'
port = 3306
user = 'inforesdep01'
passwd = 'tfyfInfo@1602'
db = 'wind'
#按照给定的细分行业,提取行业类别
conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd,db=db,charset='utf8')
#上市日期
sqlstr = ("select distinct TRADE_DAYS from AShareCalendar")
calendar = pd.read_sql(sqlstr,conn)
conn.close()
alltdays = [datetime.datetime.strptime(x,'%Y%m%d').date() for x in list(calendar['TRADE_DAYS'])]
if period in ['Day','D']:
tdays = [x for x in alltdays if x>=tstart and x<=tend]
tdays.sort()
elif period in [ 'Month','M']:
alltdays_d = pd.DataFrame(np.array([alltdays]).T,columns=['Date'])
alltdays_d ['Month'] = alltdays_d['Date'].apply(lambda x:x.year*100+x.month)
monthend = alltdays_d.sort_values('Date').groupby('Month').tail(1)
tdays = [x for x in list(monthend['Date']) if x>=tstart and x<=tend]
tdays.sort()
elif period in ['Quarter','Q']:
alltdays_d = pd.DataFrame(np.array([alltdays]).T,columns=['Date'])
alltdays_d ['Quarter'] = alltdays_d['Date'].apply(lambda x:x.year*100+int(np.ceil(x.month/3)))
quarterend = alltdays_d.sort_values('Date').groupby('Quarter').tail(1)
tdays = [x for x in list(quarterend['Date']) if x>=tstart and x<=tend]
tdays.sort()
return tdays
"""
class SolutionError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class FactorDomainError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def cvxopt_solve_qp(P, Q, G=None, h=None, A=None, b=None,init=None):
from cvxopt import solvers,matrix
P = .5 * (P + P.T) # make sure P is symmetric
P = matrix(P)
Q = matrix(Q)
if G is not None:
G = matrix(G)
h = matrix(h)
if A is not None:
A = matrix(A)
b = matrix(b)
solvers.options['show_progress']=False
solvers.options['maxiters']=100
if init is not None:
init = matrix(init)
sol = solvers.qp(P,Q,G,h,A,b,initvals=init)
if 'unknown' in sol['status']:
return False,np.array(sol['x']).reshape((P.size[1],))
if 'optimal' in sol['status']:
return True,np.array(sol['x']).reshape((P.size[1],))
def mul_reg(Y,X):
"""
Y:T*N
X: T*K
Y中每一列对X做多元回归
返回回归系数(不带截距项)
"""
x_ = sm.add_constant(X)
beta = Y.apply(lambda j:sm.OLS(j,x_,axis=1).fit().params,axis=1)
return beta
def RiskModel(beta_all,resid_all,factor_all,stk_pool,dq_mv,all_var,halflife_corr=504,halflife_vol=84,halflife_nw=252,Lnw_v=None,Lnw_c=None,q = 0.25):
"""
t: datime.date
beta_all: 因子收益
resid_all: 残差收益
factor_all: 因子
stk_pool: 股票池
dq_mv:市值
all_var: 变量
halflife_vol: Half-life for Volatility,默认84
Lnw_v: Newey-West Volatility Lags,默认None
halflife_corr: Half-life for Correlations,默认504
Lnw_c: Newey-West Correlation Lags,默认None
halflife_nw: Newey-West Auto-Correlation Half-life,默认252
q: Bayesian Shrinkage Parameter,默认0.25
"""
#因子暴露
factors = factor_all.loc[stk_pool,all_var]
factors = factors.fillna(factors.mean())
factors.loc[:,'const'] = 1
factors = factors.reindex(index=stk_pool,columns=['const']+all_var).fillna(0)
#因子暴露矩阵(N×K)
X = np.mat(factors)
#因子收益率
sub_daily = beta_all.loc[['const']+all_var,:]
sub_resid = resid_all.loc[stk_pool,:]
#-------------------------------风险模型-----------------------------------
#因子收益协方差矩阵,特质风险矩阵
#------估计因子收益率协方差矩阵
#因子收益协防差估计:因子收益相关系数,因子收益波动率
#(1)估计相关系数
#权重
weight_adj = expweight(halflife_corr,sub_daily.shape[1])
#加权协方差
nw_cov = newey_west_adj(sub_daily.T, Lnw_c, aweights=weight_adj)
#相关系数
std = np.sqrt(np.diag(nw_cov))
corrcoef = np.mat(np.diag(1/std)) *np.mat(nw_cov)*np.mat(np.diag(1/std)).T
#(2)估计波动率
#权重
weight_adj = expweight(halflife_vol,sub_daily.shape[1])
#加权协方差
nw_vol = newey_west_adj(sub_daily.T, Lnw_v, aweights=weight_adj)
#波动率
std_vol = np.sqrt(np.diag(nw_vol))
#(3)根据相关系数及波动率计算协方差矩阵
cov_matrix = np.mat(np.diag(std_vol))*np.mat(corrcoef)*np.mat(np.diag(std_vol)).T
#因子收益率协方差矩阵(K×K)
F = np.mat(cov_matrix)
#------估计特质风险矩阵
#(1)估计特质波动率
lambda_vol = 0.5**(1/halflife_vol)
weight_ = [lambda_vol**(sub_resid.shape[1]-i) for i in range(1,sub_resid.shape[1]+1)]
weight_m = pd.DataFrame(data=[weight_],index=sub_resid.index,columns=sub_resid.columns)
weight_m[sub_resid.applymap(np.isnan)] = np.nan
#权重归一
weight_adj = weight_m.T/(weight_m.sum(axis=1))
#加权特质波动率
nw_sp_vol = pd.Series(index=stk_pool)
for i in nw_sp_vol.index:
if i in sub_resid.index:
temp_res = sub_resid.loc[i,:].dropna()
if len(temp_res)<50:
nw_sp_vol_=np.nan
else:
nw_sp_vol_ = newey_west_adj(temp_res,Lnw_v ,halflife_rho=halflife_nw, \
aweights=weight_adj.loc[:,i].dropna())
else:
nw_sp_vol_=np.nan
nw_sp_vol[i] = nw_sp_vol_
nw_sp_vol.name = 'sp_vol'
#(2)Bayesian Shrinkage
#按照规模分成10组
sub_dq_mv = dq_mv
sub_dq_mv.name = 'dq_mv'
nw_mv = pd.concat([nw_sp_vol,sub_dq_mv],axis=1)
try:
nw_mv['logmarket'] = pd.qcut(nw_mv['dq_mv'] ,10,labels=False)
except:
try:
nw_mv['logmarket'] = pd.qcut(nw_mv['dq_mv'],3,labels=False)
except:
nw_mv['logmarket'] = pd.qcut(nw_mv['dq_mv'],1,labels=False)
nw_mv = nw_mv.reindex(stk_pool)
nw_mv['sp_std'] = nw_mv['sp_vol']**0.5
nw_mv['Cap_sp'] = nw_mv['sp_std']*nw_mv['dq_mv']
nw_mv = nw_mv.reset_index()
nw_mv_grouped = nw_mv.groupby('logmarket')
nw_mv_mean = nw_mv_grouped.apply(lambda x:x['Cap_sp'].sum()/x['dq_mv'].sum())
nw_mv_mean.name='mean'
nw_mv_mean = nw_mv_mean.reset_index()
nw_mv = pd.merge(left=nw_mv,right=nw_mv_mean,on='logmarket')
nw_mv['div'] = np.abs(nw_mv['sp_std' ]-nw_mv['mean'])
nw_mv['vn'] = nw_mv.groupby('logmarket')['div'].apply(
lambda x:q*x/(x.std()+q*x))
nw_mv['sig_sh'] = nw_mv['vn']*nw_mv['mean']+(1-nw_mv['vn'])*nw_mv['sp_std' ]
nw_mv['sig_sh'] = nw_mv['sig_sh'].fillna(nw_mv['mean'])
nw_mv.set_index('index',inplace=True)
#特质波动率
sp_vol = nw_mv['sig_sh']**2
sp_vol = sp_vol.reindex(stk_pool)
sp_vol = sp_vol.fillna(sp_vol.mean()).reindex(stk_pool)
#(3)特质风险矩阵(N×N)
delta = np.mat(np.diag(sp_vol))
#------股票收益协方差矩阵(N×N)
P = X*F*X.T+delta
return X,F,P,delta
\ No newline at end of file
File added
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment