Commit 1acf5b7a authored by Dr.李's avatar Dr.李

restructure fetch data

parent c0cdfcde
...@@ -8,6 +8,7 @@ Created on 2017-7-7 ...@@ -8,6 +8,7 @@ Created on 2017-7-7
from typing import Iterable from typing import Iterable
from typing import List from typing import List
from typing import Dict from typing import Dict
from typing import Tuple
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import sqlalchemy as sa import sqlalchemy as sa
...@@ -90,7 +91,7 @@ def append_industry_info(df): ...@@ -90,7 +91,7 @@ def append_industry_info(df):
[industry_codes[row][0] for row in industry_dummies] [industry_codes[row][0] for row in industry_dummies]
def _map_risk_model_table(risk_model): def _map_risk_model_table(risk_model: str) -> tuple:
if risk_model == 'day': if risk_model == 'day':
return RiskCovDay, SpecificRiskDay return RiskCovDay, SpecificRiskDay
elif risk_model == 'short': elif risk_model == 'short':
...@@ -101,12 +102,12 @@ def _map_risk_model_table(risk_model): ...@@ -101,12 +102,12 @@ def _map_risk_model_table(risk_model):
raise ValueError("risk model name {0} is not recognized".format(risk_model)) raise ValueError("risk model name {0} is not recognized".format(risk_model))
def _map_factors(factors): def _map_factors(factors: Iterable[str]) -> dict:
factor_cols = [] factor_cols = {}
for f in factors: for f in factors:
for t in factor_tables: for t in factor_tables:
if f in t.__table__.columns: if f in t.__table__.columns:
factor_cols.append(t.__table__.columns[f]) factor_cols[t.__table__.columns[f]] = t
break break
return factor_cols return factor_cols
...@@ -166,52 +167,82 @@ class SqlEngine(object): ...@@ -166,52 +167,82 @@ class SqlEngine(object):
return pd.read_sql(query, self.session.bind) return pd.read_sql(query, self.session.bind)
def fetch_data(self, ref_date, def fetch_factor(self,
factors: Iterable[str], ref_date: str,
codes: Iterable[int], factors: Iterable[str],
benchmark: int = None, codes: Iterable[int]) -> pd.DataFrame:
risk_model: str = 'short') -> Dict[str, pd.DataFrame]:
risk_cov_table, special_risk_table = _map_risk_model_table(risk_model)
factor_cols = _map_factors(factors) factor_cols = _map_factors(factors)
cov_risk_cols = [risk_cov_table.__table__.columns[f] for f in total_risk_factors]
risk_exposure_cols = [RiskExposure.__table__.columns[f] for f in total_risk_factors]
big_table = outerjoin(Uqer, RiskExposure, and_(RiskExposure.Date == Uqer.Date, RiskExposure.Code == Uqer.Code)) big_table = Market
big_table = outerjoin(big_table, Market, and_(Market.Date == Uqer.Date, Market.Code == Uqer.Code)) for t in set(factor_cols.values()):
big_table = outerjoin(big_table, Tiny, and_(Tiny.Date == Uqer.Date, Tiny.Code == Uqer.Code)) big_table = outerjoin(big_table, t, and_(Market.Date == t.Date, Market.Code == t.Code))
big_table = outerjoin(big_table, LegacyFactor, and_(LegacyFactor.Date == Uqer.Date, LegacyFactor.Code == Uqer.Code))
big_table = outerjoin(big_table, special_risk_table, and_(special_risk_table.Date == Uqer.Date, special_risk_table.Code == Uqer.Code))
query = select([Uqer.Code, Market.isOpen, special_risk_table.SRISK] + factor_cols + risk_exposure_cols) \ query = select([Market.Code, Market.isOpen] + list(factor_cols.keys())) \
.select_from(big_table) \ .select_from(big_table) \
.where(and_(Uqer.Date == ref_date, Uqer.Code.in_(codes))) .where(and_(Market.Date == ref_date, Market.Code.in_(codes)))
return pd.read_sql(query, self.engine)
def fetch_benchmark(self,
ref_date: str,
benchmark: int) -> pd.DataFrame:
query = select([IndexComponent.Code, (IndexComponent.weight / 100.).label('weight')]).where(
and_(
IndexComponent.Date == ref_date,
IndexComponent.indexCode == benchmark
)
)
factor_data = pd.read_sql(query, self.engine) return pd.read_sql(query, self.engine)
def fetch_risk_model(self,
ref_date: str,
codes: Iterable[int],
risk_model: str = 'short') -> Tuple[pd.DataFrame, pd.DataFrame]:
risk_cov_table, special_risk_table = _map_risk_model_table(risk_model)
cov_risk_cols = [risk_cov_table.__table__.columns[f] for f in total_risk_factors]
query = select([risk_cov_table.FactorID, query = select([risk_cov_table.FactorID,
risk_cov_table.Factor] risk_cov_table.Factor]
+ cov_risk_cols).where( + cov_risk_cols).where(
risk_cov_table.Date == ref_date risk_cov_table.Date == ref_date
) )
risk_cov_data = pd.read_sql(query, self.engine).sort_values('FactorID') risk_cov = pd.read_sql(query, self.engine).sort_values('FactorID')
total_data = {'risk_cov': risk_cov_data} risk_exposure_cols = [RiskExposure.__table__.columns[f] for f in total_risk_factors]
big_table = outerjoin(special_risk_table, RiskExposure,
and_(special_risk_table.Date == RiskExposure.Date,
special_risk_table.Code == RiskExposure.Code))
query = select(
[RiskExposure.Code, special_risk_table.SRISK] + risk_exposure_cols) \
.select_from(big_table) \
.where(and_(RiskExposure.Date == ref_date, RiskExposure.Code.in_(codes)))
if benchmark: risk_exp = pd.read_sql(query, self.engine)
query = select([IndexComponent.Code, (IndexComponent.weight / 100.).label('weight')]).where(
and_( return risk_cov, risk_exp
IndexComponent.Date == ref_date,
IndexComponent.indexCode == benchmark def fetch_data(self, ref_date,
) factors: Iterable[str],
) codes: Iterable[int],
benchmark: int = None,
risk_model: str = 'short') -> Dict[str, pd.DataFrame]:
benchmark_data = pd.read_sql(query, self.engine) total_data = {}
factor_data = self.fetch_factor(ref_date, factors, codes)
if benchmark:
benchmark_data = self.fetch_benchmark(ref_date, benchmark)
total_data['benchmark'] = benchmark_data total_data['benchmark'] = benchmark_data
factor_data = pd.merge(factor_data, benchmark_data, how='left', on=['Code']) factor_data = pd.merge(factor_data, benchmark_data, how='left', on=['Code'])
factor_data['weight'] = factor_data['weight'].fillna(0.) factor_data['weight'] = factor_data['weight'].fillna(0.)
if risk_model:
risk_cov, risk_exp = self.fetch_risk_model(ref_date, codes, risk_model)
factor_data = pd.merge(factor_data, risk_exp, how='left', on=['Code'])
total_data['risk_cov'] = risk_cov
total_data['factor'] = factor_data total_data['factor'] = factor_data
append_industry_info(factor_data) append_industry_info(factor_data)
...@@ -219,14 +250,14 @@ class SqlEngine(object): ...@@ -219,14 +250,14 @@ class SqlEngine(object):
if __name__ == '__main__': if __name__ == '__main__':
db_url = 'postgresql+psycopg2://postgres:A12345678!@10.63.6.220/alpha' db_url = 'postgresql+psycopg2://postgres:we083826@localhost/alpha'
universe = Universe('custom', ['zz500']) universe = Universe('custom', ['zz500'])
engine = SqlEngine(db_url) engine = SqlEngine(db_url)
ref_date = '2017-08-10' ref_date = '2017-08-10'
codes = engine.fetch_codes(ref_date, universe) codes = engine.fetch_codes(ref_date, universe)
data = engine.fetch_data(ref_date, ['EPS'], codes, 905) data = engine.fetch_data(ref_date, ['EPS'], codes, 905, 'short')
d1ret = engine.fetch_dx_return(ref_date, codes, horizon=0) d1ret = engine.fetch_dx_return(ref_date, codes, horizon=0)
missing_codes = [c for c in data['factor'].Code if c not in set(d1ret.Code)] missing_codes = [c for c in data['factor'].Code if c not in set(d1ret.Code)]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment