Commit 38fd0ff5 authored by Dr.李's avatar Dr.李

added single phase data fetching

parent 8a405fdd
......@@ -29,7 +29,7 @@ re-balance - 1 week
training - every 4 week
'''
engine = SqlEngine('postgresql+psycopg2://postgres:we083826@192.168.0.101/alpha')
engine = SqlEngine('postgresql+psycopg2://postgres:A12345678!@10.63.6.220/alpha')
universe = Universe('zz500', ['zz500'])
neutralize_risk = ['SIZE'] + industry_styles
portfolio_risk_neutralize = ['SIZE']
......@@ -59,7 +59,7 @@ data_package = fetch_data_package(engine,
neutralized_risk=neutralize_risk,
pre_process=[winsorize_normal, standardize],
post_process=[standardize],
warm_start=20)
warm_start=8)
'''
training phase: using Linear - regression from scikit-learn
......
......@@ -5,12 +5,14 @@ Created on 2017-8-24
@author: cheng.li
"""
import datetime as dt
import numpy as np
import pandas as pd
from typing import Iterable
from typing import Union
from PyFin.api import makeSchedule
from PyFin.api import BizDayConventions
from PyFin.api import DateGeneration
from PyFin.api import advanceDateByCalendar
from PyFin.DateUtilities import Period
from PyFin.Enums import TimeUnits
......@@ -35,6 +37,31 @@ def _map_horizon(frequency: str) -> int:
raise ValueError('{0} is an unrecognized frequency rule'.format(frequency))
def _merge_df(engine, names, factor_df, return_df, universe, dates, risk_model, neutralized_risk):
if neutralized_risk:
risk_df = engine.fetch_risk_model_range(universe, dates=dates, risk_model=risk_model)[1]
used_neutralized_risk = list(set(neutralized_risk).difference(names))
risk_df = risk_df[['trade_date', 'code'] + used_neutralized_risk].dropna()
train_x = pd.merge(factor_df, risk_df, on=['trade_date', 'code'])
return_df = pd.merge(return_df, risk_df, on=['trade_date', 'code'])[['trade_date', 'code', 'dx']]
train_y = return_df.copy()
risk_exp = train_x[neutralized_risk].values.astype(float)
x_values = train_x[names].values.astype(float)
y_values = train_y[['dx']].values
else:
risk_exp = None
train_x = factor_df.copy()
train_y = return_df.copy()
x_values = train_x[names].values.astype(float)
y_values = train_y[['dx']].values
date_label = pd.DatetimeIndex(factor_df.trade_date).to_pydatetime()
dates = np.unique(date_label)
return return_df, dates, date_label, risk_exp, x_values, y_values, train_x, train_y
def prepare_data(engine: SqlEngine,
factors: Union[Transformer, Iterable[object]],
start_date: str,
......@@ -44,9 +71,16 @@ def prepare_data(engine: SqlEngine,
benchmark: int,
warm_start: int = 0):
if warm_start > 0:
start_date = advanceDateByCalendar('china.sse', start_date, str(-warm_start) + 'b').strftime('%Y-%m-%d')
p = Period(frequency)
p = Period(length=-warm_start * p.length(), units=p.units())
start_date = advanceDateByCalendar('china.sse', start_date, p).strftime('%Y-%m-%d')
dates = makeSchedule(start_date, end_date, frequency, calendar='china.sse', dateRule=BizDayConventions.Following)
dates = makeSchedule(start_date,
end_date,
frequency,
calendar='china.sse',
dateRule=BizDayConventions.Following,
dateGenerationRule=DateGeneration.Backward)
horizon = _map_horizon(frequency)
......@@ -155,27 +189,8 @@ def fetch_data_package(engine: SqlEngine,
benchmark,
warm_start)
if neutralized_risk:
risk_df = engine.fetch_risk_model_range(universe, dates=dates, risk_model=risk_model)[1]
used_neutralized_risk = list(set(neutralized_risk).difference(transformer.names))
risk_df = risk_df[['trade_date', 'code'] + used_neutralized_risk].dropna()
train_x = pd.merge(factor_df, risk_df, on=['trade_date', 'code'])
return_df = pd.merge(return_df, risk_df, on=['trade_date', 'code'])[['trade_date', 'code', 'dx']]
train_y = return_df.copy()
risk_exp = train_x[neutralized_risk].values.astype(float)
x_values = train_x[transformer.names].values.astype(float)
y_values = train_y[['dx']].values
else:
risk_exp = None
train_x = factor_df.copy()
train_y = return_df.copy()
x_values = train_x[transformer.names].values.astype(float)
y_values = train_y[['dx']].values
date_label = pd.DatetimeIndex(factor_df.trade_date).to_pydatetime()
dates = np.unique(date_label)
return_df, dates, date_label, risk_exp, x_values, y_values, train_x, train_y = \
_merge_df(engine, transformer.names, factor_df, return_df, universe, dates, risk_model, neutralized_risk)
return_df['weight'] = train_x['weight']
return_df['industry'] = train_x['industry']
......@@ -205,18 +220,171 @@ def fetch_data_package(engine: SqlEngine,
return ret
if __name__ == '__main__':
from PyFin.api import MA
def fetch_train_phase(engine,
alpha_factors: Iterable[object],
ref_date,
frequency,
universe,
batch,
neutralized_risk: Iterable[str] = None,
risk_model: str = 'short',
pre_process: Iterable[object] = None,
post_process: Iterable[object] = None,
warm_start: int = 0):
transformer = Transformer(alpha_factors)
p = Period(frequency)
p = Period(length=-(warm_start + batch + 1) * p.length(), units=p.units())
start_date = advanceDateByCalendar('china.sse', ref_date, p, BizDayConventions.Following)
dates = makeSchedule(start_date,
ref_date,
frequency,
calendar='china.sse',
dateRule=BizDayConventions.Following,
dateGenerationRule=DateGeneration.Backward)
horizon = _map_horizon(frequency)
factor_df = engine.fetch_factor_range(universe, factors=transformer, dates=dates)
return_df = engine.fetch_dx_return_range(universe, dates=dates, horizon=horizon)
df = pd.merge(factor_df, return_df, on=['trade_date', 'code']).dropna()
return_df, factor_df = df[['trade_date', 'code', 'dx']], df[
['trade_date', 'code', 'isOpen'] + transformer.names]
return_df, dates, date_label, risk_exp, x_values, y_values, _, _ = \
_merge_df(engine, transformer.names, factor_df, return_df, universe, dates, risk_model, neutralized_risk)
if dates[-1] == dt.datetime.strptime(ref_date, '%Y-%m-%d'):
end = dates[-2]
start = dates[-batch - 1]
else:
end = dates[-1]
start = dates[-batch]
index = (date_label >= start) & (date_label <= end)
this_raw_x = x_values[index]
this_raw_y = y_values[index]
if risk_exp is not None:
this_risk_exp = risk_exp[index]
else:
this_risk_exp = None
ne_x = factor_processing(this_raw_x,
pre_process=pre_process,
risk_factors=this_risk_exp,
post_process=post_process)
ne_y = factor_processing(this_raw_y,
pre_process=pre_process,
risk_factors=this_risk_exp,
post_process=post_process)
ret = dict()
ret['x_names'] = transformer.names
ret['train'] = {'x': ne_x, 'y': ne_y}
return ret
def fetch_predict_phase(engine,
alpha_factors: Iterable[object],
ref_date,
frequency,
universe,
batch,
neutralized_risk: Iterable[str] = None,
risk_model: str = 'short',
pre_process: Iterable[object] = None,
post_process: Iterable[object] = None,
warm_start: int = 0):
transformer = Transformer(alpha_factors)
p = Period(frequency)
p = Period(length=-(warm_start + batch) * p.length(), units=p.units())
start_date = advanceDateByCalendar('china.sse', ref_date, p, BizDayConventions.Following)
dates = makeSchedule(start_date,
ref_date,
frequency,
calendar='china.sse',
dateRule=BizDayConventions.Following,
dateGenerationRule=DateGeneration.Backward)
factor_df = engine.fetch_factor_range(universe, factors=transformer, dates=dates).dropna()
names = transformer.names
if neutralized_risk:
risk_df = engine.fetch_risk_model_range(universe, dates=dates, risk_model=risk_model)[1]
used_neutralized_risk = list(set(neutralized_risk).difference(names))
risk_df = risk_df[['trade_date', 'code'] + used_neutralized_risk].dropna()
train_x = pd.merge(factor_df, risk_df, on=['trade_date', 'code'])
risk_exp = train_x[neutralized_risk].values.astype(float)
x_values = train_x[names].values.astype(float)
else:
train_x = factor_df.copy()
risk_exp = None
date_label = pd.DatetimeIndex(factor_df.trade_date).to_pydatetime()
dates = np.unique(date_label)
if dates[-1] == dt.datetime.strptime(ref_date, '%Y-%m-%d'):
end = dates[-1]
start = dates[-batch]
index = (date_label >= start) & (date_label <= end)
this_raw_x = x_values[index]
sub_dates = date_label[index]
if risk_exp is not None:
this_risk_exp = risk_exp[index]
else:
this_risk_exp = None
ne_x = factor_processing(this_raw_x,
pre_process=pre_process,
risk_factors=this_risk_exp,
post_process=post_process)
ne_x = ne_x[sub_dates == end]
codes = train_x.code.values[date_label == end]
else:
ne_x = None
codes = None
ret = dict()
ret['x_names'] = transformer.names
ret['predict'] = {'x': ne_x, 'code': codes}
return ret
if __name__ == '__main__':
engine = SqlEngine('postgresql+psycopg2://postgres:A12345678!@10.63.6.220/alpha')
universe = Universe('zz500', ['zz500'])
res = fetch_data_package(engine,
MA(10, 'EPS'),
'2012-01-01',
'2012-04-01',
'1m',
universe,
905,
0)
universe = Universe('zz500', ['ashare_ex'])
neutralized_risk = ['SIZE']
res = fetch_train_phase(engine,
['EPS', 'CFinc1'],
'2017-09-04',
'2w',
universe,
4,
warm_start=1,
neutralized_risk=neutralized_risk)
print(res)
res = fetch_predict_phase(engine,
['EPS', 'CFinc1'],
'2017-09-04',
'2w',
universe,
4,
warm_start=1,
neutralized_risk=neutralized_risk)
print(res)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment