Commit dc469751 authored by Dr.李's avatar Dr.李

added sql engine tests

parent 69b2d0d7
......@@ -10,6 +10,7 @@ from typing import List
from typing import Dict
from typing import Tuple
from typing import Union
import numpy as np
import pandas as pd
import sqlalchemy as sa
import sqlalchemy.orm as orm
......@@ -43,8 +44,10 @@ from alphamind.data.engines.utilities import _map_industry_category
from alphamind.data.engines.utilities import _map_risk_model_table
from alphamind.data.engines.utilities import factor_tables
from alphamind.data.engines.utilities import industry_list
from alphamind.data.processing import factor_processing
from PyFin.api import advanceDateByCalendar
risk_styles = ['BETA',
'MOMENTUM',
'SIZE',
......@@ -196,7 +199,10 @@ class SqlEngine(object):
codes: Iterable[int],
expiry_date: str = None,
horizon: int = 0,
offset: int = 0) -> pd.DataFrame:
offset: int = 0,
neutralized_risks: list = None,
pre_process=None,
post_process=None) -> pd.DataFrame:
start_date = ref_date
if not expiry_date:
......@@ -216,6 +222,15 @@ class SqlEngine(object):
df = pd.read_sql(query, self.session.bind).dropna()
df = df[df.trade_date == ref_date]
if neutralized_risks:
_, risk_exp = self.fetch_risk_model(ref_date, codes)
df = pd.merge(df, risk_exp, on='code').dropna()
df[['dx']] = factor_processing(df[['dx']].values,
pre_process=pre_process,
risk_factors=df[neutralized_risks].values,
post_process=post_process)
return df[['code', 'dx']]
def fetch_dx_return_range(self,
......@@ -257,7 +272,7 @@ class SqlEngine(object):
if dates:
df = df[df.trade_date.isin(dates)]
return df
return df.sort_values(['trade_date', 'code'])
def fetch_dx_return_index(self,
ref_date: str,
......@@ -273,8 +288,7 @@ class SqlEngine(object):
else:
end_date = expiry_date
stats = self._create_stats(IndexMarket, horizon, offset, code_attr='indexCode')
stats = self._create_stats(IndexMarket, horizon, offset, code_attr='indexCode')
query = select([IndexMarket.trade_date, IndexMarket.indexCode.label('code'), stats]).where(
and_(
IndexMarket.trade_date.between(start_date, end_date),
......@@ -302,7 +316,6 @@ class SqlEngine(object):
str(1 + horizon + offset + DAILY_RETURN_OFFSET) + 'b').strftime('%Y-%m-%d')
stats = self._create_stats(IndexMarket, horizon, offset, code_attr='indexCode')
query = select([IndexMarket.trade_date, IndexMarket.indexCode.label('code'), stats]) \
.where(
and_(
......@@ -355,13 +368,17 @@ class SqlEngine(object):
.select_from(big_table).where(and_(Market.trade_date.between(start_date, end_date),
Market.code.in_(codes)))
df = pd.read_sql(query, self.engine).sort_values(['trade_date', 'code']).set_index('trade_date')
res = transformer.transform('code', df)
df = pd.read_sql(query, self.engine) \
.replace([-np.inf, np.inf], np.nan) \
.sort_values(['trade_date', 'code']) \
.set_index('trade_date')
res = transformer.transform('code', df).replace([-np.inf, np.inf], np.nan)
for col in res.columns:
if col not in set(['code', 'isOpen']) and col not in df.columns:
df[col] = res[col].values
df.dropna(inplace=True)
df['isOpen'] = df.isOpen.astype(bool)
df = df.loc[ref_date]
df.index = list(range(len(df)))
......@@ -415,7 +432,7 @@ class SqlEngine(object):
)
).distinct()
df = pd.read_sql(query, self.engine)
df = pd.read_sql(query, self.engine).replace([-np.inf, np.inf], np.nan)
if universe.is_filtered:
df = pd.merge(df, universe_df, how='inner', on=['trade_date', 'code'])
......@@ -424,12 +441,13 @@ class SqlEngine(object):
df.sort_values(['trade_date', 'code'], inplace=True)
df.set_index('trade_date', inplace=True)
res = transformer.transform('code', df)
res = transformer.transform('code', df).replace([-np.inf, np.inf], np.nan)
for col in res.columns:
if col not in set(['code', 'isOpen']) and col not in df.columns:
df[col] = res[col].values
df.dropna(inplace=True)
df['isOpen'] = df.isOpen.astype(bool)
df = df.reset_index()
return pd.merge(df, universe_df[['trade_date', 'code']], how='inner')
......@@ -440,7 +458,6 @@ class SqlEngine(object):
start_date: str = None,
end_date: str = None,
dates: Iterable[str] = None):
if isinstance(factors, Transformer):
transformer = factors
else:
......@@ -480,7 +497,10 @@ class SqlEngine(object):
)
)
df = pd.read_sql(query, self.engine).sort_values(['trade_date', 'code'])
df = pd.read_sql(query, self.engine) \
.replace([-np.inf, np.inf], np.nan) \
.dropna() \
.sort_values(['trade_date', 'code'])
return pd.merge(df, codes[['trade_date', 'code']], how='inner')
def fetch_benchmark(self,
......@@ -553,7 +573,7 @@ class SqlEngine(object):
RiskExposure.code.in_(codes)
)).distinct()
risk_exp = pd.read_sql(query, self.engine)
risk_exp = pd.read_sql(query, self.engine).dropna()
return risk_cov, risk_exp
......@@ -608,7 +628,7 @@ class SqlEngine(object):
special_risk_table.SRISK.label('srisk')] + risk_exposure_cols).select_from(big_table) \
.distinct()
risk_exp = pd.read_sql(query, self.engine).sort_values(['trade_date', 'code'])
risk_exp = pd.read_sql(query, self.engine).sort_values(['trade_date', 'code']).dropna()
if universe.is_filtered:
codes = universe.query(self, start_date, end_date, dates)
......@@ -637,7 +657,7 @@ class SqlEngine(object):
)
).distinct()
return pd.read_sql(query, self.engine)
return pd.read_sql(query, self.engine).dropna()
def fetch_industry_matrix(self,
ref_date: str,
......@@ -687,7 +707,7 @@ class SqlEngine(object):
getattr(Industry, code_name).label('industry_code'),
getattr(Industry, category_name).label('industry')]).select_from(big_table).distinct()
df = pd.read_sql(query, self.engine)
df = pd.read_sql(query, self.engine).dropna()
if universe.is_filtered:
codes = universe.query(self, start_date, end_date, dates)
df = pd.merge(df, codes, how='inner', on=['trade_date', 'code']).sort_values(['trade_date', 'code'])
......@@ -1037,5 +1057,5 @@ if __name__ == '__main__':
codes = engine.fetch_codes(ref_date, universe)
dates = makeSchedule('2018-01-01', '2018-02-01', '10b', 'china.sse')
factor_data = engine.fetch_factor_range_forward(universe, ['roe_q'], dates=dates)
factor_data = engine.fetch_dx_return('2018-01-30', codes, neutralized_risks=risk_styles+industry_styles)
print(factor_data)
# -*- coding: utf-8 -*-
"""
Created on 2018-4-17
@author: cheng.li
"""
import unittest
import numpy as np
import pandas as pd
from sqlalchemy import select, and_
from PyFin.api import adjustDateByCalendar
from PyFin.api import makeSchedule
from PyFin.api import advanceDateByCalendar
from alphamind.data.dbmodel.models import Universe as UniverseTable
from alphamind.data.dbmodel.models import Market
from alphamind.data.dbmodel.models import IndexMarket
from alphamind.data.dbmodel.models import IndexComponent
from alphamind.data.dbmodel.models import Uqer
from alphamind.data.engines.sqlengine import SqlEngine
from alphamind.data.engines.universe import Universe
@unittest.skip("Omit sql engine tests")
class TestSqlEngine(unittest.TestCase):
def setUp(self):
self.engine = SqlEngine()
def test_sql_engine_fetch_codes(self):
ref_date = adjustDateByCalendar('china.sse', '2017-01-31')
universe = Universe('custom', ['zz500', 'zz1000'])
codes = self.engine.fetch_codes(ref_date, universe)
query = select([UniverseTable.code]).where(
and_(
UniverseTable.trade_date == ref_date,
UniverseTable.universe.in_(['zz500', 'zz1000'])
)
).distinct()
df = pd.read_sql(query, con=self.engine.engine).sort_values('code')
self.assertListEqual(codes, list(df.code.values))
def test_sql_engine_fetch_codes_range(self):
ref_dates = makeSchedule('2017-01-01', '2017-06-30', '60b', 'china.sse')
universe = Universe('custom', ['zz500', 'zz1000'])
codes = self.engine.fetch_codes_range(universe, dates=ref_dates)
query = select([UniverseTable.trade_date, UniverseTable.code]).where(
and_(
UniverseTable.trade_date.in_(ref_dates),
UniverseTable.universe.in_(['zz500', 'zz1000'])
)
).distinct()
df = pd.read_sql(query, con=self.engine.engine).sort_values('code')
for ref_date in ref_dates:
calculated_codes = list(sorted(codes[codes.trade_date == ref_date].code.values))
expected_codes = list(sorted(df[df.trade_date == ref_date].code.values))
self.assertListEqual(calculated_codes, expected_codes)
def test_sql_engine_fetch_dx_return(self):
horizon = 4
offset = 1
ref_date = adjustDateByCalendar('china.sse', '2017-01-31')
universe = Universe('custom', ['zz500', 'zz1000'])
codes = self.engine.fetch_codes(ref_date, universe)
dx_return = self.engine.fetch_dx_return(ref_date, codes, horizon=horizon, offset=offset)
start_date = advanceDateByCalendar('china.sse', ref_date, '2b')
end_date = advanceDateByCalendar('china.sse', ref_date, '6b')
query = select([Market.code, Market.chgPct]).where(
and_(
Market.trade_date.between(start_date, end_date),
Market.code.in_(dx_return.code.unique().tolist())
)
)
df = pd.read_sql(query, con=self.engine.engine)
res = df.groupby('code').apply(lambda x: np.log(1. + x).sum())
np.testing.assert_array_almost_equal(dx_return.dx.values, res.chgPct.values)
horizon = 4
offset = 0
ref_date = adjustDateByCalendar('china.sse', '2017-01-31')
universe = Universe('custom', ['zz500', 'zz1000'])
codes = self.engine.fetch_codes(ref_date, universe)
dx_return = self.engine.fetch_dx_return(ref_date, codes, horizon=horizon, offset=offset)
start_date = advanceDateByCalendar('china.sse', ref_date, '1b')
end_date = advanceDateByCalendar('china.sse', ref_date, '5b')
query = select([Market.code, Market.chgPct]).where(
and_(
Market.trade_date.between(start_date, end_date),
Market.code.in_(dx_return.code.unique().tolist())
)
)
df = pd.read_sql(query, con=self.engine.engine)
res = df.groupby('code').apply(lambda x: np.log(1. + x).sum())
np.testing.assert_array_almost_equal(dx_return.dx.values, res.chgPct.values)
def test_sql_engine_fetch_dx_return_range(self):
ref_dates = makeSchedule('2017-01-01', '2017-06-30', '60b', 'china.sse')
universe = Universe('custom', ['zz500', 'zz1000'])
dx_return = self.engine.fetch_dx_return_range(universe,
dates=ref_dates,
horizon=4,
offset=1)
codes = self.engine.fetch_codes_range(universe, dates=ref_dates)
groups = codes.groupby('trade_date')
for ref_date, g in groups:
start_date = advanceDateByCalendar('china.sse', ref_date, '2b')
end_date = advanceDateByCalendar('china.sse', ref_date, '6b')
query = select([Market.code, Market.chgPct]).where(
and_(
Market.trade_date.between(start_date, end_date),
Market.code.in_(g.code.unique().tolist())
)
)
df = pd.read_sql(query, con=self.engine.engine)
res = df.groupby('code').apply(lambda x: np.log(1. + x).sum())
calculated_return = dx_return[dx_return.trade_date == ref_date]
np.testing.assert_array_almost_equal(calculated_return.dx.values, res.chgPct.values)
def test_sql_engine_fetch_dx_return_index(self):
horizon = 4
offset = 1
ref_date = adjustDateByCalendar('china.sse', '2017-01-31')
dx_return = self.engine.fetch_dx_return_index(ref_date,
905,
horizon=horizon,
offset=offset)
start_date = advanceDateByCalendar('china.sse', ref_date, '2b')
end_date = advanceDateByCalendar('china.sse', ref_date, '6b')
query = select([IndexMarket.indexCode, IndexMarket.chgPct]).where(
and_(
IndexMarket.trade_date.between(start_date, end_date),
IndexMarket.indexCode == 905
)
)
df = pd.read_sql(query, con=self.engine.engine)
res = df.groupby('indexCode').apply(lambda x: np.log(1. + x).sum())
np.testing.assert_array_almost_equal(dx_return.dx.values, res.chgPct.values)
def test_sql_engine_fetch_dx_return_index_range(self):
ref_dates = makeSchedule('2017-01-01', '2017-06-30', '60b', 'china.sse')
index_code = 906
dx_return = self.engine.fetch_dx_return_index_range(index_code,
dates=ref_dates,
horizon=4,
offset=1)
for ref_date in ref_dates:
start_date = advanceDateByCalendar('china.sse', ref_date, '2b')
end_date = advanceDateByCalendar('china.sse', ref_date, '6b')
query = select([IndexMarket.indexCode, IndexMarket.chgPct]).where(
and_(
IndexMarket.trade_date.between(start_date, end_date),
IndexMarket.indexCode == index_code
)
)
df = pd.read_sql(query, con=self.engine.engine)
res = df.groupby('indexCode').apply(lambda x: np.log(1. + x).sum())
calculated_return = dx_return[dx_return.trade_date == ref_date]
np.testing.assert_array_almost_equal(calculated_return.dx.values, res.chgPct.values)
def test_sql_engine_fetch_factor(self):
ref_date = adjustDateByCalendar('china.sse', '2017-01-31')
universe = Universe('custom', ['zz500', 'zz1000'])
codes = self.engine.fetch_codes(ref_date, universe)
factor = 'ROE'
factor_data = self.engine.fetch_factor(ref_date, factor, codes)
query = select([Uqer.code, Uqer.ROE]).where(
and_(
Uqer.trade_date == ref_date,
Uqer.code.in_(codes)
)
)
df = pd.read_sql(query, con=self.engine.engine).sort_values('code').dropna()
np.testing.assert_array_almost_equal(factor_data.ROE.values, df.ROE.values)
def test_sql_engine_fetch_factor_range(self):
ref_dates = makeSchedule('2017-01-01', '2017-06-30', '60b', 'china.sse')
universe = Universe('custom', ['zz500', 'zz1000'])
factor = 'ROE'
factor_data = self.engine.fetch_factor_range(universe, factor, dates=ref_dates)
codes = self.engine.fetch_codes_range(universe, dates=ref_dates)
groups = codes.groupby('trade_date')
for ref_date, g in groups:
query = select([Uqer.code, Uqer.ROE]).where(
and_(
Uqer.trade_date == ref_date,
Uqer.code.in_(g.code.unique().tolist())
)
)
df = pd.read_sql(query, con=self.engine.engine).dropna()
calculated_factor = factor_data[factor_data.trade_date == ref_date]
np.testing.assert_array_almost_equal(calculated_factor.ROE.values, df.ROE.values)
def test_sql_engine_fetch_factor_range_forward(self):
ref_dates = makeSchedule('2017-01-01', '2017-09-30', '60b', 'china.sse')
ref_dates = ref_dates + [advanceDateByCalendar('china.sse', ref_dates[-1], '60b').strftime('%Y-%m-%d')]
universe = Universe('custom', ['zz500', 'zz1000'])
factor = 'ROE'
factor_data = self.engine.fetch_factor_range_forward(universe, factor, dates=ref_dates)
codes = self.engine.fetch_codes_range(universe, dates=ref_dates[:-1])
groups = codes.groupby('trade_date')
for ref_date, g in groups:
forward_ref_date = advanceDateByCalendar('china.sse', ref_date, '60b').strftime('%Y-%m-%d')
query = select([Uqer.code, Uqer.ROE]).where(
and_(
Uqer.trade_date == forward_ref_date,
Uqer.code.in_(g.code.unique().tolist())
)
)
df = pd.read_sql(query, con=self.engine.engine).dropna()
calculated_factor = factor_data[factor_data.trade_date == ref_date]
np.testing.assert_array_almost_equal(calculated_factor.dx.values, df.ROE.values)
def test_sql_engine_fetch_benchmark(self):
ref_date = adjustDateByCalendar('china.sse', '2017-01-31')
benchmark = 906
index_data = self.engine.fetch_benchmark(ref_date, benchmark)
query = select([IndexComponent.code, (IndexComponent.weight / 100.).label('weight')]).where(
and_(
IndexComponent.trade_date == ref_date,
IndexComponent.indexCode == benchmark
)
)
df = pd.read_sql(query, con=self.engine.engine)
np.testing.assert_array_almost_equal(df.weight.values, index_data.weight.values)
def test_sql_engine_fetch_benchmark_range(self):
ref_dates = makeSchedule('2017-01-01', '2017-09-30', '60b', 'china.sse')
benchmark = 906
index_data = self.engine.fetch_benchmark_range(benchmark, dates=ref_dates)
query = select([IndexComponent.trade_date, IndexComponent.code, (IndexComponent.weight / 100.).label('weight')]).where(
and_(
IndexComponent.trade_date.in_(ref_dates),
IndexComponent.indexCode == benchmark
)
)
df = pd.read_sql(query, con=self.engine.engine)
for ref_date in ref_dates:
calculated_data = index_data[index_data.trade_date == ref_date]
expected_data = df[df.trade_date == ref_date]
np.testing.assert_array_almost_equal(calculated_data.weight.values, expected_data.weight.values)
......@@ -15,6 +15,7 @@ from alphamind.tests.data.test_neutralize import TestNeutralize
from alphamind.tests.data.test_standardize import TestStandardize
from alphamind.tests.data.test_winsorize import TestWinsorize
from alphamind.tests.data.test_quantile import TestQuantile
from alphamind.tests.data.engines.test_sql_engine import TestSqlEngine
from alphamind.tests.data.engines.test_universe import TestUniverse
from alphamind.tests.portfolio.test_constraints import TestConstraints
from alphamind.tests.portfolio.test_evolver import TestEvolver
......@@ -45,6 +46,7 @@ if __name__ == '__main__':
TestStandardize,
TestWinsorize,
TestQuantile,
TestSqlEngine,
TestUniverse,
TestConstraints,
TestEvolver,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment