Commit 7f602e97 authored by Dr.李's avatar Dr.李

modified sql script for fetching data

parent 65c88236
...@@ -1677,6 +1677,9 @@ class FullFactorView(Base): ...@@ -1677,6 +1677,9 @@ class FullFactorView(Base):
AERODEF = Column(BigInteger) AERODEF = Column(BigInteger)
Conglomerates = Column(BigInteger) Conglomerates = Column(BigInteger)
COUNTRY = Column(BigInteger) COUNTRY = Column(BigInteger)
d_srisk = Column(Float(53))
s_srisk = Column(Float(53))
l_srisk = Column(Float(53))
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -35,6 +35,7 @@ from alphamind.data.dbmodel.models import RiskCovLong ...@@ -35,6 +35,7 @@ from alphamind.data.dbmodel.models import RiskCovLong
from alphamind.data.dbmodel.models import RiskExposure from alphamind.data.dbmodel.models import RiskExposure
from alphamind.data.dbmodel.models import Market from alphamind.data.dbmodel.models import Market
from alphamind.data.dbmodel.models import FullFactorView from alphamind.data.dbmodel.models import FullFactorView
from alphamind.data.dbmodel.models import Universe as UniverseTable
from alphamind.data.transformer import Transformer from alphamind.data.transformer import Transformer
from PyFin.api import advanceDateByCalendar from PyFin.api import advanceDateByCalendar
from PyFin.Analysis.SecurityValueHolders import SecurityLatestValueHolder from PyFin.Analysis.SecurityValueHolders import SecurityLatestValueHolder
...@@ -99,11 +100,11 @@ def append_industry_info(df): ...@@ -99,11 +100,11 @@ def append_industry_info(df):
def _map_risk_model_table(risk_model: str) -> tuple: def _map_risk_model_table(risk_model: str) -> tuple:
if risk_model == 'day': if risk_model == 'day':
return RiskCovDay, SpecificRiskDay return RiskCovDay, FullFactorView.d_srisk
elif risk_model == 'short': elif risk_model == 'short':
return RiskCovShort, SpecificRiskShort return RiskCovShort, FullFactorView.s_srisk
elif risk_model == 'long': elif risk_model == 'long':
return RiskCovLong, SpecificRiskLong return RiskCovLong, FullFactorView.l_srisk
else: else:
raise ValueError("risk model name {0} is not recognized".format(risk_model)) raise ValueError("risk model name {0} is not recognized".format(risk_model))
...@@ -172,7 +173,8 @@ class SqlEngine(object): ...@@ -172,7 +173,8 @@ class SqlEngine(object):
return strategy_names return strategy_names
def fetch_codes(self, ref_date: str, universe: Universe) -> List[int]: def fetch_codes(self, ref_date: str, universe: Universe) -> List[int]:
query = universe.query(ref_date) cond = universe.query(ref_date)
query = select([UniverseTable.trade_date, UniverseTable.code]).distinct().where(cond)
cursor = self.engine.execute(query) cursor = self.engine.execute(query)
codes_set = {c[1] for c in cursor.fetchall()} codes_set = {c[1] for c in cursor.fetchall()}
return sorted(codes_set) return sorted(codes_set)
...@@ -182,7 +184,8 @@ class SqlEngine(object): ...@@ -182,7 +184,8 @@ class SqlEngine(object):
start_date: str = None, start_date: str = None,
end_date: str = None, end_date: str = None,
dates: Iterable[str] = None) -> pd.DataFrame: dates: Iterable[str] = None) -> pd.DataFrame:
query = universe.query_range(start_date, end_date, dates) cond = universe.query_range(start_date, end_date, dates)
query = select([UniverseTable.trade_date, UniverseTable.code]).distinct().where(cond)
return pd.read_sql(query, self.engine) return pd.read_sql(query, self.engine)
def fetch_dx_return(self, def fetch_dx_return(self,
...@@ -219,9 +222,11 @@ class SqlEngine(object): ...@@ -219,9 +222,11 @@ class SqlEngine(object):
end_date = advanceDateByCalendar('china.sse', end_date, str(horizon) + 'b').strftime('%Y-%m-%d') end_date = advanceDateByCalendar('china.sse', end_date, str(horizon) + 'b').strftime('%Y-%m-%d')
q2 = universe.query_range(start_date, end_date).alias('temp_universe') cond = universe.query_range(start_date, end_date)
big_table = join(DailyReturn, q2, big_table = join(DailyReturn, UniverseTable,
and_(DailyReturn.trade_date == q2.c.trade_date, DailyReturn.code == q2.c.code)) and_(DailyReturn.trade_date == UniverseTable.trade_date,
DailyReturn.code == UniverseTable.code,
cond))
stats = func.sum(self.ln_func(1. + DailyReturn.d1)).over( stats = func.sum(self.ln_func(1. + DailyReturn.d1)).over(
partition_by=DailyReturn.code, partition_by=DailyReturn.code,
...@@ -229,8 +234,7 @@ class SqlEngine(object): ...@@ -229,8 +234,7 @@ class SqlEngine(object):
rows=(0, horizon)).label('dx') rows=(0, horizon)).label('dx')
query = select([DailyReturn.trade_date, DailyReturn.code, stats]) \ query = select([DailyReturn.trade_date, DailyReturn.code, stats]) \
.select_from(big_table) \ .select_from(big_table)
.where(DailyReturn.trade_date.between(start_date, end_date))
df = pd.read_sql(query, self.session.bind) df = pd.read_sql(query, self.session.bind)
...@@ -310,10 +314,12 @@ class SqlEngine(object): ...@@ -310,10 +314,12 @@ class SqlEngine(object):
real_end_date = end_date real_end_date = end_date
real_dates = None real_dates = None
q2 = universe.query_range(real_start_date, real_end_date, real_dates).alias('temp_universe') cond = universe.query_range(real_start_date, real_end_date, real_dates)
big_table = join(FullFactorView, q2, big_table = join(FullFactorView, UniverseTable,
and_(FullFactorView.trade_date == q2.c.trade_date, FullFactorView.code == q2.c.code)) and_(FullFactorView.trade_date == UniverseTable.trade_date,
FullFactorView.code == UniverseTable.code,
cond))
query = select([FullFactorView.trade_date, FullFactorView.code, FullFactorView.isOpen] + factor_cols) \ query = select([FullFactorView.trade_date, FullFactorView.code, FullFactorView.isOpen] + factor_cols) \
.select_from(big_table) .select_from(big_table)
...@@ -365,7 +371,7 @@ class SqlEngine(object): ...@@ -365,7 +371,7 @@ class SqlEngine(object):
codes: Iterable[int], codes: Iterable[int],
risk_model: str = 'short', risk_model: str = 'short',
excluded: Iterable[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame]: excluded: Iterable[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame]:
risk_cov_table, special_risk_table = _map_risk_model_table(risk_model) risk_cov_table, special_risk_col = _map_risk_model_table(risk_model)
cov_risk_cols = [risk_cov_table.__table__.columns[f] for f in total_risk_factors] cov_risk_cols = [risk_cov_table.__table__.columns[f] for f in total_risk_factors]
query = select([risk_cov_table.FactorID, query = select([risk_cov_table.FactorID,
...@@ -375,14 +381,9 @@ class SqlEngine(object): ...@@ -375,14 +381,9 @@ class SqlEngine(object):
) )
risk_cov = pd.read_sql(query, self.engine).sort_values('FactorID') risk_cov = pd.read_sql(query, self.engine).sort_values('FactorID')
risk_exposure_cols = [RiskExposure.__table__.columns[f] for f in total_risk_factors if f not in set(excluded)] risk_exposure_cols = [FullFactorView.__table__.columns[f] for f in total_risk_factors if f not in set(excluded)]
big_table = outerjoin(special_risk_table, RiskExposure, query = select([FullFactorView.code, special_risk_col] + risk_exposure_cols) \
and_(special_risk_table.trade_date == RiskExposure.trade_date, .where(and_(FullFactorView.trade_date == ref_date, FullFactorView.code.in_(codes)))
special_risk_table.code == RiskExposure.code))
query = select(
[RiskExposure.code, special_risk_table.SRISK] + risk_exposure_cols) \
.select_from(big_table) \
.where(and_(RiskExposure.trade_date == ref_date, RiskExposure.code.in_(codes)))
risk_exp = pd.read_sql(query, self.engine) risk_exp = pd.read_sql(query, self.engine)
...@@ -396,7 +397,7 @@ class SqlEngine(object): ...@@ -396,7 +397,7 @@ class SqlEngine(object):
risk_model: str = 'short', risk_model: str = 'short',
excluded: Iterable[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame]: excluded: Iterable[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame]:
risk_cov_table, special_risk_table = _map_risk_model_table(risk_model) risk_cov_table, special_risk_col = _map_risk_model_table(risk_model)
cov_risk_cols = [risk_cov_table.__table__.columns[f] for f in total_risk_factors] cov_risk_cols = [risk_cov_table.__table__.columns[f] for f in total_risk_factors]
...@@ -414,17 +415,16 @@ class SqlEngine(object): ...@@ -414,17 +415,16 @@ class SqlEngine(object):
if not excluded: if not excluded:
excluded = [] excluded = []
risk_exposure_cols = [RiskExposure.__table__.columns[f] for f in total_risk_factors if f not in set(excluded)] risk_exposure_cols = [FullFactorView.__table__.columns[f] for f in total_risk_factors if f not in set(excluded)]
big_table = outerjoin(special_risk_table, RiskExposure,
and_(special_risk_table.trade_date == RiskExposure.trade_date,
special_risk_table.code == RiskExposure.code))
q2 = universe.query_range(start_date, end_date, dates).alias('temp_universe') cond = universe.query_range(start_date, end_date, dates)
big_table = join(big_table, q2, big_table = join(FullFactorView, UniverseTable,
and_(special_risk_table.trade_date == q2.c.trade_date, special_risk_table.code == q2.c.code)) and_(FullFactorView.trade_date == UniverseTable.trade_date,
FullFactorView.code == UniverseTable.code,
cond))
query = select( query = select(
[RiskExposure.trade_date, RiskExposure.code, special_risk_table.SRISK] + risk_exposure_cols) \ [FullFactorView.trade_date, FullFactorView.code, special_risk_col] + risk_exposure_cols) \
.select_from(big_table) .select_from(big_table)
risk_exp = pd.read_sql(query, self.engine) risk_exp = pd.read_sql(query, self.engine)
...@@ -457,27 +457,27 @@ class SqlEngine(object): ...@@ -457,27 +457,27 @@ class SqlEngine(object):
dates: Iterable[str] = None, dates: Iterable[str] = None,
category: str = 'sw'): category: str = 'sw'):
industry_category_name = _map_industry_category(category) industry_category_name = _map_industry_category(category)
cond = universe.query_range(start_date, end_date, dates)
if dates: if dates:
q1 = select([Industry.trade_date, big_table = join(Industry, UniverseTable,
Industry.code, and_(Industry.trade_date == UniverseTable.trade_date,
Industry.industryID1.label('industry_code'), Industry.code == UniverseTable.code,
Industry.industryName1.label('industry')]).where( Industry.industry == industry_category_name,
and_(Industry.industry == industry_category_name, Industry.trade_date.in_(dates),
Industry.trade_date.in_(dates))).alias('temp_industry') cond))
else: else:
q1 = select([Industry.trade_date, big_table = join(Industry, UniverseTable,
Industry.code, and_(Industry.trade_date == UniverseTable.trade_date,
Industry.industryID1.label('industry_code'), Industry.code == UniverseTable.code,
Industry.industryName1.label('industry')]).where( Industry.industry == industry_category_name,
and_(Industry.industry == industry_category_name, Industry.trade_date.between(start_date, end_date),
Industry.trade_date.between(start_date, end_date))).alias('temp_industry') cond))
q2 = universe.query_range(start_date, end_date, dates).alias('temp_universe') query = select([Industry.trade_date,
big_table = join(q1, q2, Industry.code,
and_(q1.c.trade_date == q2.c.trade_date, q1.c.code == q2.c.code)) Industry.industryID1.label('industry_code'),
Industry.industryName1.label('industry')]).select_from(big_table)
query = select([q1]).select_from(big_table)
return pd.read_sql(query, self.engine) return pd.read_sql(query, self.engine)
def fetch_data(self, ref_date: str, def fetch_data(self, ref_date: str,
...@@ -551,18 +551,19 @@ if __name__ == '__main__': ...@@ -551,18 +551,19 @@ if __name__ == '__main__':
from PyFin.api import * from PyFin.api import *
from alphamind.api import alpha_logger from alphamind.api import alpha_logger
db_url = 'postgresql+psycopg2://postgres:A12345678!@10.63.6.220/alpha' db_url = 'postgresql+psycopg2://postgres:we083826@localhost/alpha'
# db_url = 'mssql+pymssql://licheng:A12345678!@10.63.6.220/alpha' # db_url = 'mssql+pymssql://licheng:A12345678!@10.63.6.220/alpha'
universe = Universe('custom', ['zz500']) universe = Universe('custom', ['zz500'])
engine = SqlEngine(db_url) engine = SqlEngine(db_url)
start_date = '2017-08-01' ref_date = '2017-08-02'
start_date = '2012-01-01'
end_date = '2017-08-31' end_date = '2017-08-31'
dates = makeSchedule(start_date, end_date, '1w', 'china.sse') dates = makeSchedule(start_date, end_date, '1w', 'china.sse')
alpha_logger.info('start') alpha_logger.info('start')
# codes = engine.fetch_codes(universe=universe, ref_date=ref_date) codes = engine.fetch_codes_range(universe=universe, dates=dates)
data1 = engine.fetch_factor_range(universe=universe, data1 = engine.fetch_factor_range(universe=universe,
start_date=start_date, start_date=start_date,
...@@ -578,3 +579,4 @@ if __name__ == '__main__': ...@@ -578,3 +579,4 @@ if __name__ == '__main__':
alpha_logger.info('end') alpha_logger.info('end')
data2 = engine.fetch_codes_range(universe, start_date=start_date, end_date=end_date, dates=dates) data2 = engine.fetch_codes_range(universe, start_date=start_date, end_date=end_date, dates=dates)
alpha_logger.info('end') alpha_logger.info('end')
alpha_logger.info(len(codes))
...@@ -56,35 +56,37 @@ class Universe(object): ...@@ -56,35 +56,37 @@ class Universe(object):
return all_and_conditions, all_or_conditions return all_and_conditions, all_or_conditions
def query(self, ref_date): def query(self, ref_date):
query = select([UniverseTable.trade_date, UniverseTable.code]).distinct()
all_and_conditions, all_or_conditions = self._create_condition() all_and_conditions, all_or_conditions = self._create_condition()
query = query.where( if all_or_conditions:
and_( query = and_(
UniverseTable.trade_date == ref_date, UniverseTable.trade_date == ref_date,
or_( or_(
and_(*all_and_conditions), and_(*all_and_conditions),
*all_or_conditions *all_or_conditions
)
) )
else:
query = and_(
UniverseTable.trade_date == ref_date,
*all_and_conditions
) )
)
return query return query
def query_range(self, start_date=None, end_date=None, dates=None): def query_range(self, start_date=None, end_date=None, dates=None):
query = select([UniverseTable.trade_date, UniverseTable.code]).distinct()
all_and_conditions, all_or_conditions = self._create_condition() all_and_conditions, all_or_conditions = self._create_condition()
dates_cond = UniverseTable.trade_date.in_(dates) if dates else UniverseTable.trade_date.between(start_date, end_date) dates_cond = UniverseTable.trade_date.in_(dates) if dates else UniverseTable.trade_date.between(start_date, end_date)
query = query.where( if all_or_conditions:
and_( query = and_(
dates_cond, dates_cond,
or_( or_(
and_(*all_and_conditions), and_(*all_and_conditions),
*all_or_conditions *all_or_conditions
)
) )
) else:
) query = and_(dates_cond, *all_and_conditions)
return query return query
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment