Commit 6c3e1b28 authored by Dr.李's avatar Dr.李

update auto scrips

parent 4d38f7b8
...@@ -92,3 +92,12 @@ python setup.py install ...@@ -92,3 +92,12 @@ python setup.py install
可以达到一样的效果。 可以达到一样的效果。
* 数据库更新
在目录``scrips`` 下有[airflow]()脚本文件``update_uqer_data.py``可以用来做每天的数据更新。使用之前除了要配置好airflow服务器之外,需要更新脚本中以下两行:
```
_ = uqer.Client(token='')
engine = sqlalchemy.create_engine('')
```
...@@ -8,3 +8,4 @@ scipy >= 0.19.0 ...@@ -8,3 +8,4 @@ scipy >= 0.19.0
simpleutils >= 0.1.0 simpleutils >= 0.1.0
sqlalchemy >= 1.1.9 sqlalchemy >= 1.1.9
pandas >= 0.19.2 pandas >= 0.19.2
Finance-Python >= 0.5.5
\ No newline at end of file
...@@ -14,13 +14,15 @@ from airflow.models import DAG ...@@ -14,13 +14,15 @@ from airflow.models import DAG
from uqer import DataAPI as api from uqer import DataAPI as api
from alphamind.utilities import alpha_logger from alphamind.utilities import alpha_logger
from PyFin.api import advanceDateByCalendar from PyFin.api import advanceDateByCalendar
from PyFin.api import isBizDay
uqer.DataAPI.api_base.timeout = 300
start_date = dt.datetime(2016, 12, 31) start_date = dt.datetime(2017, 2, 3)
dag_name = 'update_uqer_data' dag_name = 'update_uqer_data'
default_args = { default_args = {
'owner': 'user', 'owner': 'wegamekinglc',
'depends_on_past': False, 'depends_on_past': False,
'start_date': start_date 'start_date': start_date
} }
...@@ -28,12 +30,11 @@ default_args = { ...@@ -28,12 +30,11 @@ default_args = {
dag = DAG( dag = DAG(
dag_id=dag_name, dag_id=dag_name,
default_args=default_args, default_args=default_args,
schedule_interval='0 6,18 * * 1,2,3,4,5' schedule_interval='0 18 * * 1,2,3,4,5'
) )
_ = uqer.Client(token='') _ = uqer.Client(token='')
engine = sqlalchemy.create_engine('mssql+pymssql://licheng:A12345678!@10.63.6.220/alpha') engine = sqlalchemy.create_engine('')
def process_date(ds): def process_date(ds):
...@@ -47,17 +48,32 @@ def format_data(df, format='%Y%m%d'): ...@@ -47,17 +48,32 @@ def format_data(df, format='%Y%m%d'):
df['Date'] = pd.to_datetime(df['Date'], format=format) df['Date'] = pd.to_datetime(df['Date'], format=format)
def check_holiday(this_date):
flag = isBizDay('china.sse', this_date)
if not flag:
alpha_logger.info('Job will be omitted as {0} is a holiday'.format(this_date))
return flag
def data_info_log(df, table): def data_info_log(df, table):
data_len = len(df) data_len = len(df)
if data_len > 0: if data_len > 0:
alpha_logger.info("{0} records will be inserted in {1}".format(data_len, table)) alpha_logger.info("{0} records will be inserted in {1}".format(data_len, table))
else: else:
alpha_logger.warning("No records will be inserted in {1}".format(data_len, table)) msg = "No records will be inserted in {0}".format(table)
alpha_logger.warning(msg)
raise ValueError(msg)
def update_uqer_factors(ds, **kwargs): def update_uqer_factors(ds, **kwargs):
ref_date, _ = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
df = api.MktStockFactorsOneDayProGet(tradeDate=ref_date) df = api.MktStockFactorsOneDayProGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date', 'ticker': 'Code'}, inplace=True) df.rename(columns={'tradeDate': 'Date', 'ticker': 'Code'}, inplace=True)
...@@ -68,13 +84,17 @@ def update_uqer_factors(ds, **kwargs): ...@@ -68,13 +84,17 @@ def update_uqer_factors(ds, **kwargs):
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table, format='%Y-%m-%d') data_info_log(df, table)
format_data(df) format_data(df, format='%Y-%m-%d')
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_market(ds, **kwargs): def update_uqer_market(ds, **kwargs):
ref_date, _ = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'market' table = 'market'
...@@ -90,7 +110,11 @@ def update_uqer_market(ds, **kwargs): ...@@ -90,7 +110,11 @@ def update_uqer_market(ds, **kwargs):
def update_uqer_halt_list(ds, **kwargs): def update_uqer_halt_list(ds, **kwargs):
ref_date, _ = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'halt_list' table = 'halt_list'
...@@ -109,11 +133,20 @@ def update_uqer_halt_list(ds, **kwargs): ...@@ -109,11 +133,20 @@ def update_uqer_halt_list(ds, **kwargs):
def update_uqer_universe_hs300(ds, **kwargs): def update_uqer_universe_hs300(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'universe' table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'hs300';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}' and universe = 'hs300';".format(table, ref_date))
df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 300".format(ref_date), engine) df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 300".format(ref_date),
engine)
if df.empty:
return
df['universe'] = 'hs300' df['universe'] = 'hs300'
data_info_log(df, table) data_info_log(df, table)
...@@ -123,11 +156,20 @@ def update_uqer_universe_hs300(ds, **kwargs): ...@@ -123,11 +156,20 @@ def update_uqer_universe_hs300(ds, **kwargs):
def update_uqer_universe_zz500(ds, **kwargs): def update_uqer_universe_zz500(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'universe' table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'zz500';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}' and universe = 'zz500';".format(table, ref_date))
df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 905".format(ref_date), engine) df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 905".format(ref_date),
engine)
if df.empty:
return
df['universe'] = 'zz500' df['universe'] = 'zz500'
data_info_log(df, table) data_info_log(df, table)
...@@ -135,19 +177,100 @@ def update_uqer_universe_zz500(ds, **kwargs): ...@@ -135,19 +177,100 @@ def update_uqer_universe_zz500(ds, **kwargs):
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_universe_zz800(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'zz800';".format(table, ref_date))
df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 906".format(ref_date),
engine)
if df.empty:
return
df['universe'] = 'zz800'
data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_index_components(ds, **kwargs): def update_uqer_index_components(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'index_components' table = 'index_components'
index_codes = ['000001', '000300', '000905', '000016', '399005', '399006'] index_codes = ['000001',
'000002',
'000003',
'000004',
'000005',
'000006',
'000007',
'000008',
'000009',
'000010',
'000015',
'000016',
'000020',
'000090',
'000132',
'000133',
'000300',
'000852',
'000902',
'000903',
'000904',
'000905',
'000906',
'000907',
'000922',
'399001',
'399002',
'399004',
'399005',
'399006',
'399007',
'399008',
'399009',
'399010',
'399011',
'399012',
'399013',
'399107',
'399324',
'399330',
'399333',
'399400',
'399401',
'399649']
total_data = pd.DataFrame() total_data = pd.DataFrame()
for index in index_codes: for index in index_codes:
df = api.IdxCloseWeightGet(ticker=index, df = api.IdxCloseWeightGet(ticker=index,
beginDate=dt.datetime(this_date.year - 1, this_date.month, this_date.day).strftime( beginDate=ref_date,
'%Y%m%d'), endDate=ref_date) endDate=ref_date)
df = df[df.effDate == df.effDate.unique()[-1]]
if df.empty:
ref_previous_date = advanceDateByCalendar('china.sse', this_date, '-1b')
df = pd.read_sql("select * from {0} where Date = '{1}' and indexCode = {2}".format(table,
ref_previous_date,
int(index)),
engine)
df['Date'] = this_date
if df.empty:
continue
else:
df.rename(columns={'ticker': 'indexCode', df.rename(columns={'ticker': 'indexCode',
'secShortName': 'indexShortName', 'secShortName': 'indexShortName',
'consTickerSymbol': 'Code', 'consTickerSymbol': 'Code',
...@@ -161,6 +284,10 @@ def update_uqer_index_components(ds, **kwargs): ...@@ -161,6 +284,10 @@ def update_uqer_index_components(ds, **kwargs):
total_data = total_data.append(df) total_data = total_data.append(df)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
if total_data.empty:
return
data_info_log(total_data, table) data_info_log(total_data, table)
format_data(total_data) format_data(total_data)
total_data.to_sql(table, engine, index=False, if_exists='append') total_data.to_sql(table, engine, index=False, if_exists='append')
...@@ -168,6 +295,10 @@ def update_uqer_index_components(ds, **kwargs): ...@@ -168,6 +295,10 @@ def update_uqer_index_components(ds, **kwargs):
def update_uqer_risk_model(ds, **kwargs): def update_uqer_risk_model(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'risk_exposure' table = 'risk_exposure'
...@@ -255,10 +386,14 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -255,10 +386,14 @@ def update_uqer_risk_model(ds, **kwargs):
def update_uqer_daily_return(ds, **kwargs): def update_uqer_daily_return(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
previous_date = advanceDateByCalendar('china.sse', this_date, '-1b').strftime('%Y-%m-%d') previous_date = advanceDateByCalendar('china.sse', this_date, '-1b').strftime('%Y-%m-%d')
table = 'daily_return' table = 'daily_return'
df = pd.read_sql("select Code, chgPct as d1 from market where Date = '{0}'".format(this_date), engine) df = pd.read_sql("select Code, chgPct as d1 from market where Date = '{0}'".format(this_date), engine)
df['Date'] = previous_date df['Date'] = previous_date
engine.execute("delete from {0} where Date = '{1}'".format(table, previous_date)) engine.execute("delete from {0} where Date = '{1}'".format(table, previous_date))
...@@ -266,19 +401,41 @@ def update_uqer_daily_return(ds, **kwargs): ...@@ -266,19 +401,41 @@ def update_uqer_daily_return(ds, **kwargs):
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
def update_stratgy_table(ds, **kwargs): def update_uqer_industry_info(ds, **kwargs):
strategy_date = kwargs['next_execution_date'] ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
df = pd.DataFrame({'strategyName': ['mutual_fund', 'mutual_fund', 'mutual_fund', 'mutual_fund', 'mutual_fund', 'prod', 'prod'], table = 'market'
'factor': ['BDTO', 'CFinc1', 'DivP', 'EPSAfterNonRecurring', 'RVOL', 'CoppockCurve', 'EPS'], df = pd.read_sql("select Code from {0} where Date = '{1}'".format(table, this_date), engine)
'weight': [0.1002, 0.2314, 0.1764, 0.3739, 0.1181, -0.3333, 0.6666], codes = df.Code.astype(str).str.zfill(6)
'source': ['tiny', 'tiny', 'tiny', 'tiny', 'tiny', 'uqer', 'uqer']})
table = 'strategy' table = 'industry'
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
df['Date'] = strategy_date df = api.EquIndustryGet(intoDate=ref_date)
df = df[df.ticker.isin(codes)]
df['Code'] = df.ticker.astype(int)
df['Date'] = this_date
df.rename(columns={'ticker': 'Code'}, inplace=True)
df = df[['Date',
'Code',
'industry',
'industryID',
'industrySymbol',
'industryID1',
'industryName1',
'industryID2',
'industryName2',
'industryID3',
'industryName3',
'IndustryID4',
'IndustryName4']]
engine.execute("delete from {0} where Date = '{1}'".format(table, strategy_date.strftime('%Y-%m-%d')))
data_info_log(df, table) data_info_log(df, table)
format_data(df) format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
...@@ -291,7 +448,6 @@ _ = PythonOperator( ...@@ -291,7 +448,6 @@ _ = PythonOperator(
dag=dag dag=dag
) )
task = PythonOperator( task = PythonOperator(
task_id='update_uqer_market', task_id='update_uqer_market',
provide_context=True, provide_context=True,
...@@ -306,13 +462,21 @@ sub_task1 = PythonOperator( ...@@ -306,13 +462,21 @@ sub_task1 = PythonOperator(
dag=dag dag=dag
) )
sub_task1.set_upstream(task) sub_task2 = PythonOperator(
task_id='update_uqer_industry_info',
provide_context=True,
python_callable=update_uqer_industry_info,
dag=dag
)
sub_task1.set_upstream(task)
sub_task2.set_upstream(task)
task = PythonOperator( task = PythonOperator(
task_id='update_uqer_index_components', task_id='update_uqer_index_components',
provide_context=True, provide_context=True,
python_callable=update_uqer_index_components, python_callable=update_uqer_index_components,
depends_on_past=True,
dag=dag dag=dag
) )
...@@ -330,9 +494,16 @@ sub_task2 = PythonOperator( ...@@ -330,9 +494,16 @@ sub_task2 = PythonOperator(
dag=dag dag=dag
) )
sub_task3 = PythonOperator(
task_id='update_uqer_universe_zz800',
provide_context=True,
python_callable=update_uqer_universe_zz800,
dag=dag
)
sub_task1.set_upstream(task) sub_task1.set_upstream(task)
sub_task2.set_upstream(task) sub_task2.set_upstream(task)
sub_task3.set_upstream(task)
_ = PythonOperator( _ = PythonOperator(
...@@ -342,7 +513,6 @@ _ = PythonOperator( ...@@ -342,7 +513,6 @@ _ = PythonOperator(
dag=dag dag=dag
) )
_ = PythonOperator( _ = PythonOperator(
task_id='update_uqer_halt_list', task_id='update_uqer_halt_list',
provide_context=True, provide_context=True,
...@@ -350,14 +520,5 @@ _ = PythonOperator( ...@@ -350,14 +520,5 @@ _ = PythonOperator(
dag=dag dag=dag
) )
task = PythonOperator(
task_id='update_stratgy_table',
provide_context=True,
python_callable=update_stratgy_table,
dag=dag
)
if __name__ == '__main__': if __name__ == '__main__':
update_uqer_index_components(ds='2017-06-22') update_uqer_index_components(ds='2011-01-07')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment