Commit 1bf94321 authored by Dr.李's avatar Dr.李

enhance sql engine fetching performance

parent 861f09a6
...@@ -203,20 +203,28 @@ class SqlEngine(object): ...@@ -203,20 +203,28 @@ class SqlEngine(object):
end_date = advanceDateByCalendar('china.sse', end_date, end_date = advanceDateByCalendar('china.sse', end_date,
str(1 + horizon + offset + DAILY_RETURN_OFFSET) + 'b').strftime('%Y-%m-%d') str(1 + horizon + offset + DAILY_RETURN_OFFSET) + 'b').strftime('%Y-%m-%d')
codes = universe.query(self, start_date, end_date, dates)
stats = func.sum(self.ln_func(1. + Market.chgPct)).over( stats = func.sum(self.ln_func(1. + Market.chgPct)).over(
partition_by=Market.code, partition_by=Market.code,
order_by=Market.trade_date, order_by=Market.trade_date,
rows=(1 + offset + DAILY_RETURN_OFFSET, 1 + horizon + offset + DAILY_RETURN_OFFSET)).label('dx') rows=(1 + offset + DAILY_RETURN_OFFSET, 1 + horizon + offset + DAILY_RETURN_OFFSET)).label('dx')
query = select([Market.trade_date, Market.code, stats]) \ cond = universe._query_statements(start_date, end_date, None)
.where(
and_(Market.trade_date.in_(codes.trade_date.dt.strftime('%Y-%m-%d').unique()), big_table = join(Market, UniverseTable,
Market.code.in_(codes.code.tolist())) and_(
Market.trade_date == UniverseTable.trade_date,
Market.code == UniverseTable.code,
cond
)
) )
query = select([Market.trade_date, Market.code, stats]) \
.select_from(big_table)
df = pd.read_sql(query, self.session.bind).dropna() df = pd.read_sql(query, self.session.bind).dropna()
if universe.is_filtered:
codes = universe.query(self, start_date, end_date, dates)
df = pd.merge(df, codes, how='inner', on=['trade_date', 'code']) df = pd.merge(df, codes, how='inner', on=['trade_date', 'code'])
if dates: if dates:
...@@ -354,8 +362,6 @@ class SqlEngine(object): ...@@ -354,8 +362,6 @@ class SqlEngine(object):
else: else:
factor_cols = _map_factors(dependency, factor_tables) factor_cols = _map_factors(dependency, factor_tables)
codes = universe.query(self, start_date, end_date, dates)
big_table = FullFactor big_table = FullFactor
for t in set(factor_cols.values()): for t in set(factor_cols.values()):
...@@ -369,14 +375,23 @@ class SqlEngine(object): ...@@ -369,14 +375,23 @@ class SqlEngine(object):
FullFactor.code == t.code, FullFactor.code == t.code,
FullFactor.trade_date.between(start_date, end_date))) FullFactor.trade_date.between(start_date, end_date)))
cond = universe._query_statements(start_date, end_date, dates)
big_table = join(big_table, UniverseTable,
and_(
FullFactor.trade_date == UniverseTable.trade_date,
FullFactor.code == UniverseTable.code,
cond
)
)
query = select( query = select(
[FullFactor.trade_date, FullFactor.code, FullFactor.isOpen] + list(factor_cols.keys())) \ [FullFactor.trade_date, FullFactor.code, FullFactor.isOpen] + list(factor_cols.keys())) \
.select_from(big_table).where( .select_from(big_table).distinct()
and_(FullFactor.trade_date.in_(codes.trade_date.dt.strftime('%Y-%m-%d').unique()),
FullFactor.code.in_(codes.code.tolist()))
).distinct()
df = pd.read_sql(query, self.engine) df = pd.read_sql(query, self.engine)
if universe.is_filtered:
codes = universe.query(self, start_date, end_date, dates)
df = pd.merge(df, codes, how='inner', on=['trade_date', 'code']).sort_values(['trade_date', 'code']) df = pd.merge(df, codes, how='inner', on=['trade_date', 'code']).sort_values(['trade_date', 'code'])
if external_data is not None: if external_data is not None:
...@@ -477,15 +492,24 @@ class SqlEngine(object): ...@@ -477,15 +492,24 @@ class SqlEngine(object):
risk_exposure_cols = [FullFactor.__table__.columns[f] for f in total_risk_factors if f not in set(excluded)] risk_exposure_cols = [FullFactor.__table__.columns[f] for f in total_risk_factors if f not in set(excluded)]
codes = universe.query(self, start_date, end_date, dates) cond = universe._query_statements(start_date, end_date, dates)
big_table = join(FullFactor, UniverseTable,
and_(
FullFactor.trade_date == UniverseTable.trade_date,
FullFactor.code == UniverseTable.code,
cond
)
)
query = select( query = select(
[FullFactor.trade_date, FullFactor.code, special_risk_col] + risk_exposure_cols).where( [FullFactor.trade_date, FullFactor.code, special_risk_col] + risk_exposure_cols).select_from(big_table) \
and_(FullFactor.trade_date.in_(codes.trade_date.dt.strftime('%Y-%m-%d').unique()), .distinct()
FullFactor.code.in_(codes.code.tolist()))
).distinct()
risk_exp = pd.read_sql(query, self.engine) risk_exp = pd.read_sql(query, self.engine)
if universe.is_filtered:
codes = universe.query(self, start_date, end_date, dates)
risk_exp = pd.merge(risk_exp, codes, how='inner', on=['trade_date', 'code']).sort_values(['trade_date', 'code']) risk_exp = pd.merge(risk_exp, codes, how='inner', on=['trade_date', 'code']).sort_values(['trade_date', 'code'])
return risk_cov, risk_exp return risk_cov, risk_exp
...@@ -516,18 +540,26 @@ class SqlEngine(object): ...@@ -516,18 +540,26 @@ class SqlEngine(object):
dates: Iterable[str] = None, dates: Iterable[str] = None,
category: str = 'sw'): category: str = 'sw'):
industry_category_name = _map_industry_category(category) industry_category_name = _map_industry_category(category)
codes = universe.query(self, start_date, end_date, dates)
cond = universe._query_statements(start_date, end_date, dates)
big_table = join(Industry, UniverseTable,
and_(
Industry.trade_date == UniverseTable.trade_date,
Industry.code == UniverseTable.code,
Industry.industry == industry_category_name,
cond
)
)
query = select([Industry.trade_date, query = select([Industry.trade_date,
Industry.code, Industry.code,
Industry.industryID1.label('industry_code'), Industry.industryID1.label('industry_code'),
Industry.industryName1.label('industry')]).where( Industry.industryName1.label('industry')]).select_from(big_table).distinct()
and_(Industry.trade_date.in_(codes.trade_date.dt.strftime('%Y-%m-%d').unique()),
Industry.code.in_(codes.code.tolist()),
Industry.industry == industry_category_name)
).distinct()
df = pd.read_sql(query, self.engine) df = pd.read_sql(query, self.engine)
if universe.is_filtered:
codes = universe.query(self, start_date, end_date, dates)
df = pd.merge(df, codes, how='inner', on=['trade_date', 'code']).sort_values(['trade_date', 'code']) df = pd.merge(df, codes, how='inner', on=['trade_date', 'code']).sort_values(['trade_date', 'code'])
return df return df
...@@ -774,7 +806,7 @@ if __name__ == '__main__': ...@@ -774,7 +806,7 @@ if __name__ == '__main__':
df = engine.fetch_factor_range(universe, df = engine.fetch_factor_range(universe,
['closePrice'], ['closePrice'],
start_date='2012-12-21', start_date='2015-12-21',
end_date='2017-12-25') end_date='2017-12-25')
print(df) print(df)
...@@ -27,13 +27,20 @@ class Universe(object): ...@@ -27,13 +27,20 @@ class Universe(object):
self.base_universe = base_universe self.base_universe = base_universe
self.filter_cond = filter_cond self.filter_cond = filter_cond
def query(self, engine, start_date: str=None, end_date: str=None, dates=None) -> pd.DataFrame: @property
def is_filtered(self):
return True if self.filter_cond is not None else False
universe_cond = and_( def _query_statements(self, start_date, end_date, dates):
return and_(
UniverseTable.trade_date.in_(dates) if dates else UniverseTable.trade_date.between(start_date, end_date), UniverseTable.trade_date.in_(dates) if dates else UniverseTable.trade_date.between(start_date, end_date),
UniverseTable.universe.in_(self.base_universe) UniverseTable.universe.in_(self.base_universe)
) )
def query(self, engine, start_date: str=None, end_date: str=None, dates=None) -> pd.DataFrame:
universe_cond = self._query_statements(start_date, end_date, dates)
if self.filter_cond is None: if self.filter_cond is None:
# simple case # simple case
query = select([UniverseTable.trade_date, UniverseTable.code]).where( query = select([UniverseTable.trade_date, UniverseTable.code]).where(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment