Unverified Commit 233c92ad authored by lion-sing's avatar lion-sing Committed by GitHub

Merge pull request #6 from alpha-miner/master

update
parents 9b36cc04 79792651
......@@ -4,8 +4,8 @@
<tr>
<td>Build Status</td>
<td>
   <a href="https://travis-ci.org/wegamekinglc/alpha-mind">
<img src="https://travis-ci.org/wegamekinglc/alpha-mind.svg?branch=master" alt="travis build status" />
   <a href="https://travis-ci.org/alpha-miner/alpha-mind">
<img src="https://travis-ci.org/alpha-miner/alpha-mind.svg?branch=master" alt="travis build status" />
</a>
</td>
</tr>
......
# -*- coding: utf-8 -*-
"""
Created on 2018-3-5
@author: cheng.li
"""
import numpy as np
import pandas as pd
import statsmodels.api as sm
from alphamind.utilities import alpha_logger
from alphamind.data.processing import factor_processing
def cs_impl(ref_date,
factor_data,
factor_name,
risk_exposure,
constraint_risk,
industry_matrix,
dx_returns):
total_data = pd.merge(factor_data, risk_exposure, on='code')
total_data = pd.merge(total_data, industry_matrix, on='code').dropna()
if len(total_data) < 0.33 * len(factor_data):
alpha_logger.warning(f"valid data point({len(total_data)}) "
f"is less than 33% of the total sample ({len(factor_data)}). Omit this run")
return np.nan, np.nan, np.nan
total_risk_exp = total_data[constraint_risk]
er = total_data[factor_name].values.astype(float)
er = factor_processing(er, [], total_risk_exp.values, []).flatten()
industry = total_data.industry_name.values
codes = total_data.code.tolist()
target_pos = pd.DataFrame({'code': codes,
'weight': er,
'industry': industry})
target_pos['weight'] = target_pos['weight'] / target_pos['weight'].abs().sum()
target_pos = pd.merge(target_pos, dx_returns, on=['code'])
target_pos = pd.merge(target_pos, total_data[['code'] + constraint_risk], on=['code'])
activate_weight = target_pos.weight.values
excess_return = np.exp(target_pos.dx.values) - 1.
port_ret = np.log(activate_weight @ excess_return + 1.)
ic = np.corrcoef(excess_return, activate_weight)[0, 1]
x = sm.add_constant(activate_weight)
results = sm.OLS(excess_return, x).fit()
t_stats = results.tvalues[1]
alpha_logger.info(f"{ref_date} is finished with {len(target_pos)} stocks for {factor_name}")
alpha_logger.info(f"{ref_date} risk_exposure: "
f"{np.sum(np.square(target_pos.weight.values @ target_pos[constraint_risk].values))}")
return port_ret, ic, t_stats
def cross_section_analysis(ref_date,
factor_name,
universe,
horizon,
constraint_risk,
engine):
codes = engine.fetch_codes(ref_date, universe)
risk_exposure = engine.fetch_risk_model(ref_date, codes)[1][['code'] + constraint_risk]
factor_data = engine.fetch_factor(ref_date, factor_name, codes)
industry_matrix = engine.fetch_industry_matrix(ref_date, codes, 'sw_adj', 1)
dx_returns = engine.fetch_dx_return(ref_date, codes, horizon=horizon, offset=1)
return cs_impl(ref_date, factor_data, factor_name, risk_exposure, constraint_risk, industry_matrix, dx_returns)
if __name__ == '__main__':
import numpy as np
import pandas as pd
import statsmodels.api as sm
from alphamind.api import *
factor_name = 'SIZE'
data_source = 'postgres+psycopg2://postgres:A12345678!@10.63.6.220/alpha'
engine = SqlEngine(data_source)
risk_names = list(set(risk_styles).difference({factor_name}))
industry_names = list(set(industry_styles).difference({factor_name}))
constraint_risk = risk_names + industry_names
universe = Universe('custom', ['ashare_ex'])
horizon = 9
x = cross_section_analysis('2018-02-08',
factor_name,
universe,
horizon,
constraint_risk,
engine=engine)
print(x)
......@@ -90,7 +90,7 @@ def er_portfolio_analysis(er: np.ndarray,
cons_exp = constraints.risk_exp
return lbound, ubound, cons_exp, risk_lbound, risk_ubound
if benchmark is not None and method == 'risk_neutral':
if method == 'risk_neutral':
lbound, ubound, cons_exp, risk_lbound, risk_ubound = create_constraints(benchmark, **kwargs)
turn_over_target = kwargs.get('turn_over_target')
......
......@@ -1921,18 +1921,6 @@ class Gogoal(Base):
tcap = Column(Float(53))
class OutrightTmp(Base):
__tablename__ = 'outright_tmp'
__table_args__ = (
Index('outright_trade_date_code_portfolio_name_uindex', 'trade_date', 'code', 'portfolio_name', unique=True),
)
trade_date = Column(DateTime, primary_key=True, nullable=False)
code = Column(Integer, primary_key=True, nullable=False)
portfolio_name = Column(String(50), primary_key=True, nullable=False)
volume = Column(Integer, nullable=False)
class Outright(Base):
__tablename__ = 'outright'
__table_args__ = (
......@@ -1956,5 +1944,5 @@ class Outright(Base):
if __name__ == '__main__':
from sqlalchemy import create_engine
engine = create_engine('postgres+psycopg2://postgres:we083826@101.132.104.118/alpha')
engine = create_engine('postgres+psycopg2://postgres:we083826@192.168.0.102/alpha')
Base.metadata.create_all(engine)
......@@ -16,16 +16,12 @@ import alphamind.utilities as utils
def neutralize(x: np.ndarray,
y: np.ndarray,
groups: np.ndarray=None,
detail: bool=False,
weights: np.ndarray=None) \
detail: bool=False) \
-> Union[np.ndarray, Tuple[np.ndarray, Dict]]:
if y.ndim == 1:
y = y.reshape((-1, 1))
if weights is None:
weights = np.ones(len(y), dtype=float)
output_dict = {}
if detail:
......@@ -41,17 +37,17 @@ def neutralize(x: np.ndarray,
if detail:
for diff_loc in index_diff:
curr_idx = order[start:diff_loc + 1]
curr_x, b = _sub_step(x, y, weights, curr_idx, res)
curr_x, b = _sub_step(x, y, curr_idx, res)
exposure[curr_idx, :, :] = b
explained[curr_idx] = ls_explain(curr_x, b)
start = diff_loc + 1
else:
for diff_loc in index_diff:
curr_idx = order[start:diff_loc + 1]
_sub_step(x, y, weights, curr_idx, res)
_sub_step(x, y, curr_idx, res)
start = diff_loc + 1
else:
b = ls_fit(x, y, weights)
b = ls_fit(x, y)
res = ls_res(x, y, b)
if detail:
......@@ -65,17 +61,16 @@ def neutralize(x: np.ndarray,
@nb.njit(nogil=True, cache=True)
def _sub_step(x, y, w, curr_idx, res) -> Tuple[np.ndarray, np.ndarray]:
curr_x, curr_y, curr_w = x[curr_idx], y[curr_idx], w[curr_idx]
b = ls_fit(curr_x, curr_y, curr_w)
def _sub_step(x, y, curr_idx, res) -> Tuple[np.ndarray, np.ndarray]:
curr_x, curr_y= x[curr_idx], y[curr_idx]
b = ls_fit(curr_x, curr_y)
res[curr_idx] = ls_res(curr_x, curr_y, b)
return curr_x, b
@nb.njit(nogil=True, cache=True)
def ls_fit(x: np.ndarray, y: np.ndarray, w: np.ndarray) -> np.ndarray:
x_bar = x.T * w
b = np.linalg.solve(x_bar @ x, x_bar @ y)
def ls_fit(x: np.ndarray, y: np.ndarray) -> np.ndarray:
b = np.linalg.lstsq(x, y, rcond=-1)[0]
return b
......
......@@ -24,6 +24,6 @@ def rank(x: np.ndarray, groups: Optional[np.ndarray]=None) -> np.ndarray:
curr_idx = order[start:diff_loc + 1]
res[curr_idx] = x[curr_idx].argsort(axis=0)
start = diff_loc + 1
return res
else:
return x.argsort(axis=0)
\ No newline at end of file
return x.argsort(axis=0).argsort(axis=0)
......@@ -94,7 +94,7 @@ def factor_analysis(f_name):
if __name__ == '__main__':
from dask.distributed import Client
client = Client('10.63.6.176:8786')
client = Client('10.63.6.13:8786')
engine = SqlEngine()
df = engine.fetch_factor_coverage()
......
......@@ -16,12 +16,14 @@ from alphamind.model.data_preparing import fetch_predict_phase
from alphamind.data.engines.universe import Universe
from alphamind.data.engines.sqlengine import SqlEngine
from alphamind.data.winsorize import winsorize_normal
from alphamind.data.rank import rank
from alphamind.data.standardize import standardize
from alphamind.model.loader import load_model
PROCESS_MAPPING = {
'winsorize_normal': winsorize_normal,
'standardize': standardize
'standardize': standardize,
'rank': rank,
}
......
......@@ -105,21 +105,28 @@ class LinearConstraints(object):
def __init__(self,
bounds: Dict[str, BoxBoundary],
cons_mat: pd.DataFrame,
backbone: np.ndarray):
backbone: np.ndarray=None):
pyFinAssert(len(bounds) == cons_mat.shape[1], "Number of bounds should be same as number of col of cons_mat")
pyFinAssert(cons_mat.shape[0] == len(backbone),
"length of back bond should be same as number of rows of cons_mat")
self.names = list(bounds.keys())
self.bounds = bounds
self.cons_mat = cons_mat
self.backbone = backbone
pyFinAssert(cons_mat.shape[0] == len(backbone) if backbone is not None else True,
"length of back bond should be same as number of rows of cons_mat")
def risk_targets(self) -> Tuple[np.ndarray, np.ndarray]:
lower_bounds = []
upper_bounds = []
if self.backbone is None:
backbone = np.zeros(len(self.cons_mat))
else:
backbone = self.backbone
for name in self.names:
center = self.backbone @ self.cons_mat[name].values
center = backbone @ self.cons_mat[name].values
l, u = self.bounds[name].bounds(center)
lower_bounds.append(l)
upper_bounds.append(u)
......
......@@ -6,9 +6,33 @@ Created on 2017-8-8
"""
import unittest
import numpy as np
import pandas as pd
from alphamind.data.rank import rank
class TestRank(unittest.TestCase):
def setUp(self):
self.x = np.random.randn(1000, 1)
self.groups = np.random.randint(0, 10, 1000)
def test_rank(self):
pass
\ No newline at end of file
data_rank = rank(self.x)
sorted_array = np.zeros_like(self.x)
for i in range(self.x.shape[0]):
for j in range(self.x.shape[1]):
sorted_array[data_rank[i, j], j] = self.x[i, j]
arr_diff = np.diff(sorted_array, axis=0)
np.testing.assert_array_less(0, arr_diff)
def test_rank_with_groups(self):
data_rank = rank(self.x, groups=self.groups)
df = pd.DataFrame(self.x, index=self.groups)
expected_rank = df.groupby(level=0).apply(lambda x: x.values.argsort(axis=0).argsort(axis=0))
print(expected_rank)
......@@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
......@@ -19,7 +19,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
......@@ -34,7 +34,7 @@
"industry_lower = 1.0\n",
"industry_upper = 1.0\n",
"method = 'risk_neutral'\n",
"neutralize_risk = ['SIZE'] + industry_styles\n",
"neutralize_risk = industry_styles\n",
"industry_name = 'sw_adj'\n",
"industry_level = 1\n",
"benchmark_total_lower = 0.8\n",
......@@ -53,7 +53,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
......@@ -62,7 +62,7 @@
"\"\"\"\n",
"\n",
"industry_names = industry_list(industry_name, industry_level)\n",
"constraint_risk = ['SIZE', 'SIZENL', 'BETA']\n",
"constraint_risk = ['SIZE', 'SIZENL', 'BETA'] + industry_names\n",
"total_risk_names = constraint_risk + ['benchmark', 'total']\n",
"\n",
"b_type = []\n",
......@@ -88,7 +88,7 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
......@@ -160,7 +160,7 @@
" factor_values = factor_processing(total_data[alpha_name].values,\n",
" pre_process=[winsorize_normal, standardize],\n",
" risk_factors=risk_exp,\n",
" post_process=[winsorize_normal, standardize, rank])\n",
" post_process=[winsorize_normal, standardize])\n",
"\n",
" # const linear model\n",
" er = const_model.predict(pd.DataFrame(data={alpha_name[0]: factor_values.flatten()}))\n",
......@@ -222,10 +222,12 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"df = engine.fetch_factor_coverage(start_date='2011-01-01',\n",
" end_date='2018-02-12',\n",
" universe=universe_name[0])\n",
......@@ -236,17 +238,9 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Wall time: 1h 25min 5s\n"
]
}
],
"outputs": [],
"source": [
"%%time\n",
"\n",
......@@ -274,7 +268,7 @@
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
......@@ -287,26 +281,33 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with pd.ExcelWriter(f'{universe_name[0]}_{benchmark_code}.xlsx', engine='xlsxwriter') as writer:\n",
" factor_df.to_excel(writer, sheet_name='ret')\n",
" factor_res.to_excel(writer, sheet_name='ic')\n",
" factor_df.to_excel(writer, sheet_name='ret_stat')\n",
" ic_df.to_excel(writer, sheet_name='ic')\n",
" factor_res.to_excel(writer, sheet_name='ret_stat')\n",
" ic_res.to_excel(writer, sheet_name='ic_stat')"
]
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
......@@ -332,6 +333,35 @@
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
}
},
"nbformat": 4,
......
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> The methodolegy is similar to The Barra China Equity Model (CNE5)'s documentation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"from matplotlib import pyplot as plt\n",
"import statsmodels.api as sm\n",
"from alphamind.api import *\n",
"from PyFin.api import *\n",
"from alphamind.analysis.crosssetctions import cross_section_analysis\n",
"\n",
"plt.style.use('ggplot')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\"\"\"\n",
"Back test parameter settings\n",
"\"\"\"\n",
"\n",
"start_date = '2010-01-01'\n",
"end_date = '2018-02-28'\n",
"category = 'sw_adj'\n",
"level = 1\n",
"freq = '20b'\n",
"universe = Universe('custom', ['ashare_ex'])\n",
"\n",
"\n",
"horizon = map_freq(freq)\n",
"ref_dates = makeSchedule(start_date, end_date, freq, 'china.sse')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def risk_factor_analysis(factor_name):\n",
" data_source = 'postgres+psycopg2://postgres:A12345678!@10.63.6.220/alpha'\n",
" engine = SqlEngine(data_source)\n",
" risk_names = list(set(risk_styles).difference({factor_name}))\n",
" industry_names = list(set(industry_styles).difference({factor_name}))\n",
" constraint_risk = risk_names + industry_names\n",
" \n",
" df = pd.DataFrame(columns=['ret', 'ic', 't.'], dtype=float)\n",
"\n",
" for ref_date in ref_dates:\n",
" df.loc[ref_date, :] = cross_section_analysis(ref_date,\n",
" factor_name,\n",
" universe,\n",
" horizon,\n",
" constraint_risk,\n",
" engine=engine)\n",
" df.index = pd.to_datetime(df.index)\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"candidates_factors = risk_styles + industry_styles"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"\n",
"with Client('10.63.6.176:8786') as client:\n",
" tasks = client.map(risk_factor_analysis, candidates_factors, pure=False)\n",
" res = client.gather(tasks)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = pd.DataFrame()\n",
"\n",
"for f_name, data in zip(candidates_factors, res):\n",
" data['factor'] = f_name\n",
" df = df.append(data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df['abs t.'] = np.abs(df['t.'])\n",
"df[['factor', 'abs t.']].groupby('factor').mean().sort_values('abs t.', ascending=False).head()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"position": {
"height": "607px",
"left": "1093px",
"right": "20px",
"top": "94px",
"width": "756px"
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 2
}
# -*- coding: utf-8 -*-
"""
Created on 2018-3-5
@author: cheng.li
"""
import numpy as np
import pandas as pd
import statsmodels.api as sm
from alphamind.api import (
SqlEngine, LinearConstraints, er_portfolio_analysis, alpha_logger
)
def cross_section_analysis(ref_date,
factor_name,
universe,
horizon,
constraint_risk,
linear_bounds,
lbound,
ubound,
engine):
codes = engine.fetch_codes(ref_date, universe)
risk_exposure = engine.fetch_risk_model(ref_date, codes)[1][['code'] + constraint_risk]
factor_data = engine.fetch_factor(ref_date, factor_name, codes)
industry_matrix = engine.fetch_industry_matrix(ref_date, codes, 'sw_adj', 1)
total_data = pd.merge(factor_data, risk_exposure, on='code')
total_data = pd.merge(total_data, industry_matrix, on='code').dropna()
total_risk_exp = total_data[constraint_risk]
constraints = LinearConstraints(linear_bounds, total_risk_exp)
er = total_data[factor_name].values
industry = total_data.industry_name.values
target_pos, _ = er_portfolio_analysis(er,
industry,
None,
constraints,
False,
None,
method='risk_neutral',
lbound=lbound*np.ones(len(er)),
ubound=ubound*np.ones(len(er)))
codes = total_data.code.tolist()
target_pos['code'] = codes
dx_returns = engine.fetch_dx_return(ref_date, codes, horizon=horizon, offset=1)
target_pos = pd.merge(target_pos, dx_returns, on=['code'])
activate_weight = target_pos.weight.values
excess_return = np.exp(target_pos.dx.values) - 1.
port_ret = np.log(activate_weight @ excess_return + 1.)
ic = np.corrcoef(excess_return, activate_weight)[0, 1]
x = sm.add_constant(activate_weight)
results = sm.OLS(excess_return, x).fit()
t_stats = results.tvalues[1]
alpha_logger.info(f"{ref_date} is finished with {len(target_pos)} stocks for {factor_name}")
return port_ret, ic, t_stats
if __name__ == '__main__':
from alphamind.api import (
Universe, map_freq, risk_styles, industry_styles, macro_styles, BoundaryType, create_box_bounds
)
"""
Back test parameter settings
"""
start_date = '2010-01-01'
end_date = '2018-02-28'
category = 'sw_adj'
level = 1
freq = '20b'
universe = Universe('custom', ['zz800'])
data_source = 'postgres+psycopg2://postgres:A12345678!@10.63.6.220/alpha'
engine = SqlEngine(data_source)
horizon = map_freq(freq)
"""
Factor Model
"""
factor_name = 'SIZE'
"""
Constraints
"""
risk_names = list(set(risk_styles).difference({factor_name}))
industry_names = list(set(industry_styles).difference({factor_name}))
constraint_risk = risk_names + industry_names + macro_styles
b_type = []
l_val = []
u_val = []
for name in constraint_risk:
if name in set(risk_styles):
b_type.append(BoundaryType.ABSOLUTE)
l_val.append(0.0)
u_val.append(0.0)
else:
b_type.append(BoundaryType.RELATIVE)
l_val.append(1.0)
u_val.append(1.0)
linear_bounds = create_box_bounds(constraint_risk, b_type, l_val, u_val)
ref_date = '2018-02-08'
df = pd.DataFrame(columns=['ret', 'ic', 't.'])
print(cross_section_analysis(ref_date,
factor_name,
universe,
horizon,
constraint_risk,
linear_bounds,
lbound=-0.01,
ubound=0.01,
engine=engine))
\ No newline at end of file
......@@ -21,7 +21,7 @@ from alphamind.data.dbmodel.models import *
uqer.DataAPI.api_base.timeout = 300
start_date = dt.datetime(2017, 8, 22)
start_date = dt.datetime(2010, 1, 1)
dag_name = 'update_uqer_data_postgres'
default_args = {
......@@ -71,6 +71,41 @@ def data_info_log(df, table):
raise ValueError(msg)
def update_uqer_index_market(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
df = api.MktIdxdGet(tradeDate=ref_date)
df = df[df.exchangeCD.isin(['XSHE', 'XSHG', 'ZICN'])]
df = df[df.ticker <= '999999']
df.rename(columns={'tradeDate': 'trade_date',
'ticker': 'indexCode',
'CHGPct': 'chgPct',
'secShortName': 'indexShortName'}, inplace=True)
df = df[['trade_date',
'indexCode',
'preCloseIndex',
'openIndex',
'highestIndex',
'lowestIndex',
'closeIndex',
'turnoverVol',
'turnoverValue',
'chgPct']]
df['indexCode'] = df.indexCode.astype(int)
query = delete(IndexMarket).where(IndexMarket.trade_date == this_date)
engine.execute(query)
data_info_log(df, Market)
format_data(df, format='%Y-%m-%d')
df.to_sql(IndexMarket.__table__.name, engine, index=False, if_exists='append')
def update_uqer_factors(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -265,6 +300,105 @@ def update_uqer_universe_zz800(ds, **kwargs):
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zz1000(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where(
and_(
Universe.trade_date == this_date,
Universe.universe == 'zz1000'
)
)
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 852
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'zz1000'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zxb(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where(
and_(
Universe.trade_date == this_date,
Universe.universe == 'zxb'
)
)
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 399005
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'zxb'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_cyb(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where(
and_(
Universe.trade_date == this_date,
Universe.universe == 'cyb'
)
)
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 399006
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'cyb'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_security_master(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -442,7 +576,20 @@ def update_uqer_index_components(ds, **kwargs):
if df.empty:
continue
alpha_logger.info('{0} is finished with previous data {1}'.format(index, len(df)))
else:
################################
# 2017-10-09, patch for uqer bug
def filter_out_eqy(code: str):
if code[0] in ['0', '3'] and code[-4:] in ['XSHE']:
return True
elif code[0] in ['6'] and code[-4:] in ['XSHG']:
return True
else:
return False
df = df[df.consID.apply(lambda x: filter_out_eqy(x))]
################################
df.rename(columns={'ticker': 'indexCode',
'secShortName': 'indexShortName',
'consTickerSymbol': 'code',
......@@ -453,6 +600,7 @@ def update_uqer_index_components(ds, **kwargs):
df['trade_date'] = this_date
del df['secID']
del df['consID']
alpha_logger.info('{0} is finished with new data {1}'.format(index, len(df)))
total_data = total_data.append(df)
index_codes = total_data.indexCode.unique()
......@@ -551,23 +699,6 @@ def update_uqer_risk_model(ds, **kwargs):
df.to_sql(SpecificRiskLong.__table__.name, engine, index=False, if_exists='append')
def update_uqer_daily_return(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
previous_date = advanceDateByCalendar('china.sse', this_date, '-1b').strftime('%Y-%m-%d')
query = select([Market.code, Market.chgPct.label('d1')]).where(Market.trade_date == this_date)
df = pd.read_sql(query, engine)
df['trade_date'] = previous_date
engine.execute(delete(DailyReturn).where(DailyReturn.trade_date == previous_date))
data_info_log(df, DailyReturn)
df.to_sql(DailyReturn.__table__.name, engine, index=False, if_exists='append')
def update_uqer_industry_info(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -623,11 +754,12 @@ def fetch_date(table, query_date, engine):
return df
def update_materialized_views(ds, **kwargs):
alpha_logger.info("starting refresh full_factor_view ...")
engine.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY full_factor_view;")
alpha_logger.info("starting cluster full_factor_view ...")
engine.execute("CLUSTER full_factor_view;")
index_market_task = PythonOperator(
task_id='update_uqer_index_market',
provide_context=True,
python_callable=update_uqer_index_market,
dag=dag
)
uqer_task = PythonOperator(
......@@ -644,13 +776,6 @@ market_task = PythonOperator(
dag=dag
)
return_task = PythonOperator(
task_id='update_uqer_daily_return',
provide_context=True,
python_callable=update_uqer_daily_return,
dag=dag
)
industry_task = PythonOperator(
task_id='update_uqer_industry_info',
provide_context=True,
......@@ -658,7 +783,6 @@ industry_task = PythonOperator(
dag=dag
)
return_task.set_upstream(market_task)
industry_task.set_upstream(market_task)
index_task = PythonOperator(
......@@ -689,6 +813,13 @@ universe800_task = PythonOperator(
dag=dag
)
universe1000_task = PythonOperator(
task_id='update_uqer_universe_zz1000',
provide_context=True,
python_callable=update_uqer_universe_zz1000,
dag=dag
)
universe50_task = PythonOperator(
task_id='update_uqer_universe_sh50',
provide_context=True,
......@@ -696,16 +827,32 @@ universe50_task = PythonOperator(
dag=dag
)
universe_zxb_task = PythonOperator(
task_id='update_uqer_universe_zxb',
provide_context=True,
python_callable=update_uqer_universe_zxb,
dag=dag
)
universe_cyb_task = PythonOperator(
task_id='update_uqer_universe_cyb',
provide_context=True,
python_callable=update_uqer_universe_cyb,
dag=dag
)
universe300_task.set_upstream(index_task)
universe500_task.set_upstream(index_task)
universe800_task.set_upstream(index_task)
universe1000_task.set_upstream(index_task)
universe50_task.set_upstream(index_task)
universe_zxb_task.set_upstream(index_task)
universe_cyb_task.set_upstream(index_task)
security_master_task = PythonOperator(
task_id='update_uqer_universe_security_master',
provide_context=True,
python_callable=update_uqer_universe_security_master,
dependes_on_past=True,
dag=dag
)
......@@ -742,18 +889,6 @@ _ = PythonOperator(
dag=dag
)
refresh_materialized_views_task = PythonOperator(
task_id='update_materialized_views',
provide_context=True,
python_callable=update_materialized_views,
dag=dag
)
refresh_materialized_views_task.set_upstream(market_task)
refresh_materialized_views_task.set_upstream(uqer_task)
refresh_materialized_views_task.set_upstream(risk_model_task)
if __name__ == '__main__':
update_uqer_index_components(ds='2010-02-01')
update_uqer_index_components(ds='2017-11-10')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment