Commit 4d38f7b8 authored by Dr.李's avatar Dr.李

refactor db engine

parent de2d6d88
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Created on 2017-6-26 Created on 2017-7-7
@author: cheng.li @author: cheng.li
""" """
from typing import Iterable from typing import Iterable
from typing import Union from typing import List
import sqlalchemy as sa from typing import Dict
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import sqlalchemy as sa
db_settings = { from alphamind.data.engines.universe import Universe
'alpha':
{
'user': 'licheng',
'password': 'A12345678!',
'host': '10.63.6.220',
'db': 'alpha',
'charset': 'utf8'
}
}
risk_styles = ['BETA', risk_styles = ['BETA',
'MOMENTUM', 'MOMENTUM',
...@@ -66,113 +57,115 @@ industry_styles = [ ...@@ -66,113 +57,115 @@ industry_styles = [
] ]
def fetch_codes(codes: Union[str, Iterable[int]], start_date, end_date, engine): def append_industry_info(df):
code_table = None industry_arr = np.array(industry_styles)
code_str = None industry_codes = np.arange(len(industry_styles), dtype=int)
industry_dummies = df[industry_styles].values.astype(bool)
if isinstance(codes, str): df['industry'], df['industry_code'] = [industry_arr[row][0] for row in industry_dummies], \
# universe [industry_codes[row][0] for row in industry_dummies]
sql = "select Date, Code from universe where Date >= '{0}' and Date <= '{1}' and universe = '{2}'" \
.format(start_date, end_date, codes)
code_table = pd.read_sql(sql, engine)
elif hasattr(codes, '__iter__'): class SqlEngine(object):
code_str = ','.join(str(c) for c in codes) def __init__(self,
db_url: str,
universe: Universe):
self.engine = sa.create_engine(db_url)
self.unv = universe
return code_table, code_str def fetch_codes(self, ref_date: str) -> List[int]:
def get_universe(univ, ref_date):
univ_str = ','.join("'" + u + "'" for u in univ)
sql = "select distinct Code from universe where Date = '{ref_date}' and universe in ({univ_str})".format(
ref_date=ref_date, univ_str=univ_str)
cursor = self.engine.execute(sql)
codes_set = {c[0] for c in cursor.fetchall()}
return codes_set
def industry_mapping(industry_arr, industry_codes, industry_dummies): codes_set = None
return [industry_arr[row][0] for row in industry_dummies], \
[industry_codes[row][0] for row in industry_dummies],
if self.unv.include_universe:
include_codes_set = get_universe(self.unv.include_universe, ref_date)
codes_set = include_codes_set
def append_industry_info(df): if self.unv.exclude_universe:
industry_arr = np.array(industry_styles) exclude_codes_set = get_universe(self.unv.exclude_universe, ref_date)
industry_codes = np.arange(len(industry_styles), dtype=int) codes_set -= exclude_codes_set
industry_dummies = df[industry_styles].values.astype(bool)
df['industry'], df['industry_code'] = industry_mapping(industry_arr, industry_codes, industry_dummies) if self.unv.include_codes:
codes_set = codes_set.union(self.unv.include_codes)
if self.unv.exclude_codes:
codes_set -= set(self.unv.exclude_codes)
def fetch_data(factors: Iterable[str], return sorted(codes_set)
start_date: str,
end_date: str,
codes: Union[str, Iterable[int]] = None,
benchmark: int = None,
risk_model: str = 'day') -> dict:
engine = sa.create_engine('mssql+pymssql://{user}:{password}@{host}/{db}?charset={charset}'
.format(**db_settings['alpha']))
factor_str = ','.join('uqer.' + f for f in factors) def fetch_data(self, ref_date,
code_table, code_str = fetch_codes(codes, start_date, end_date, engine) factors: Iterable[str],
codes: Iterable[int],
benchmark: int = None,
risk_model: str = 'short') -> Dict[str, pd.DataFrame]:
total_risk_factors = risk_styles + industry_styles factor_str = ','.join('uqer.' + f for f in factors)
risk_str = ','.join('risk_exposure.' + f for f in total_risk_factors)
special_risk_table = 'specific_risk_' + risk_model total_risk_factors = risk_styles + industry_styles
risk_str = ','.join('risk_exposure.' + f for f in total_risk_factors)
if code_str: special_risk_table = 'specific_risk_' + risk_model
sql = "select uqer.Date, uqer.Code, {0}, {3}, market.isOpen, daily_return.d1, {5}.SRISK" \ codes_str = ','.join(str(c) for c in codes)
" from (uqer INNER JOIN" \
" risk_exposure on uqer.Date = risk_exposure.Date and uqer.Code = risk_exposure.Code)" \ sql = "select uqer.Code, {factors}, {risks}, market.isOpen, daily_return.d1, {risk_table}.SRISK" \
" INNER JOIN market on uqer.Date = market.Date and uqer.Code = market.Code" \
" INNER JOIN daily_return on uqer.Date = daily_return.Date and uqer.Code = daily_return.Code" \
" INNER JOIN {5} on uqer.Date = {5}.Date and uqer.Code = {5}.Code" \
" where uqer.Date >= '{1}' and uqer.Date <= '{2}' and uqer.Code in ({4})".format(factor_str,
start_date,
end_date,
risk_str,
code_str,
special_risk_table)
else:
sql = "select uqer.Date, uqer.Code, {0}, {3}, market.isOpen, daily_return.d1, {4}.SRISK" \
" from (uqer INNER JOIN" \ " from (uqer INNER JOIN" \
" risk_exposure on uqer.Date = risk_exposure.Date and uqer.Code = risk_exposure.Code)" \ " risk_exposure on uqer.Date = risk_exposure.Date and uqer.Code = risk_exposure.Code)" \
" INNER JOIN market on uqer.Date = market.Date and uqer.Code = market.Code" \ " INNER JOIN market on uqer.Date = market.Date and uqer.Code = market.Code" \
" INNER JOIN daily_return on uqer.Date = daily_return.Date and uqer.Code = daily_return.Code" \ " INNER JOIN daily_return on uqer.Date = daily_return.Date and uqer.Code = daily_return.Code" \
" INNER JOIN {4} on uqer.Date = {4}.Date and uqer.Code = {4}.Code" \ " INNER JOIN {risk_table} on uqer.Date = {risk_table}.Date and uqer.Code = {risk_table}.Code" \
" where uqer.Date >= '{1}' and uqer.Date <= '{2}'".format(factor_str, " where uqer.Date = '{ref_date}' and uqer.Code in ({codes})".format(factors=factor_str,
start_date, ref_date=ref_date,
end_date, codes=codes_str,
risk_str, risks=risk_str,
special_risk_table) risk_table=special_risk_table)
factor_data = pd.read_sql(sql, engine) factor_data = pd.read_sql(sql, self.engine)
if code_table is not None: risk_cov_table = 'risk_cov_' + risk_model
factor_data = pd.merge(factor_data, code_table, on=['Date', 'Code']) risk_str = ','.join(risk_cov_table + '.' + f for f in total_risk_factors)
risk_cov_table = 'risk_cov_' + risk_model sql = "select FactorID, Factor, {risks} from {risk_table} where Date = '{ref_date}'".format(ref_date=ref_date,
risk_str = ','.join(risk_cov_table + '.' + f for f in total_risk_factors) risks=risk_str,
risk_table=risk_cov_table)
sql = "select Date, FactorID, Factor, {0} from {1} where Date >= '{2}' and Date <= '{3}'".format(risk_str, risk_cov_data = pd.read_sql(sql, self.engine).sort_values('FactorID')
risk_cov_table,
start_date,
end_date)
risk_cov_data = pd.read_sql(sql, engine) total_data = {'factor': factor_data, 'risk_cov': risk_cov_data}
total_data = {'factor': factor_data, 'risk_cov': risk_cov_data} if benchmark:
sql = "select Code, weight / 100. as weight from index_components " \
"where Date = '{ref_date}' and indexCode = {benchmakr}".format(ref_date=ref_date,
benchmakr=benchmark)
if benchmark: benchmark_data = pd.read_sql(sql, self.engine)
sql = "select Date, Code, weight / 100. as weight from index_components " \ total_data['benchmark'] = benchmark_data
"where Date >= '{0}' and Date <= '{1}' and indexCode = {2}".format(start_date,
end_date,
benchmark)
benchmark_data = pd.read_sql(sql, engine) append_industry_info(factor_data)
total_data['benchmark'] = benchmark_data return total_data
append_industry_info(factor_data)
return total_data
if __name__ == '__main__': if __name__ == '__main__':
db_url = 'mysql+mysqldb://root:we083826@localhost/alpha?charset=utf8'
universe = Universe(['zz500'])
engine = SqlEngine(db_url, universe)
ref_date = '2017-07-04'
import datetime as dt import datetime as dt
start = dt.datetime.now() start = dt.datetime.now()
res = fetch_data(['EPS'], '2017-01-03', '2017-06-05', benchmark=905, codes='zz500') for i in range(500):
print(res) codes = engine.fetch_codes('2017-07-04')
total_data = engine.fetch_data(ref_date, ['EPS'], [1, 5], 905)
print(dt.datetime.now() - start) print(dt.datetime.now() - start)
print(total_data)
# -*- coding: utf-8 -*-
"""
Created on 2017-7-7
@author: cheng.li
"""
from typing import Iterable
class Universe(object):
def __init__(self,
include_universe: Iterable[str]=None,
exclude_universe: Iterable[str]=None,
include_codes: Iterable[str]=None,
exclude_codes: Iterable[str]=None):
self.include_universe = include_universe
self.exclude_universe = exclude_universe
self.include_codes = include_codes
self.exclude_codes = exclude_codes
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment