Commit 91ea8d99 authored by Dr.李's avatar Dr.李

update scripts

parent 9e9d23d1
......@@ -21,7 +21,7 @@ from alphamind.data.dbmodel.models import *
uqer.DataAPI.api_base.timeout = 300
start_date = dt.datetime(2017, 8, 22)
start_date = dt.datetime(2010, 1, 1)
dag_name = 'update_uqer_data_postgres'
default_args = {
......@@ -71,6 +71,41 @@ def data_info_log(df, table):
raise ValueError(msg)
def update_uqer_index_market(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
df = api.MktIdxdGet(tradeDate=ref_date)
df = df[df.exchangeCD.isin(['XSHE', 'XSHG', 'ZICN'])]
df = df[df.ticker <= '999999']
df.rename(columns={'tradeDate': 'trade_date',
'ticker': 'indexCode',
'CHGPct': 'chgPct',
'secShortName': 'indexShortName'}, inplace=True)
df = df[['trade_date',
'indexCode',
'preCloseIndex',
'openIndex',
'highestIndex',
'lowestIndex',
'closeIndex',
'turnoverVol',
'turnoverValue',
'chgPct']]
df['indexCode'] = df.indexCode.astype(int)
query = delete(IndexMarket).where(IndexMarket.trade_date == this_date)
engine.execute(query)
data_info_log(df, Market)
format_data(df, format='%Y-%m-%d')
df.to_sql(IndexMarket.__table__.name, engine, index=False, if_exists='append')
def update_uqer_factors(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -265,6 +300,105 @@ def update_uqer_universe_zz800(ds, **kwargs):
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zz1000(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where(
and_(
Universe.trade_date == this_date,
Universe.universe == 'zz1000'
)
)
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 852
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'zz1000'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zxb(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where(
and_(
Universe.trade_date == this_date,
Universe.universe == 'zxb'
)
)
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 399005
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'zxb'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_cyb(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where(
and_(
Universe.trade_date == this_date,
Universe.universe == 'cyb'
)
)
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 399006
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'cyb'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_security_master(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -429,19 +563,21 @@ def update_uqer_index_components(ds, **kwargs):
endDate=ref_date)
if df.empty:
ref_previous_date = advanceDateByCalendar('china.sse', this_date, '-1b')
ref_previous_date = advanceDateByCalendar('china.sse', this_date, '-9m')
query = select([IndexComponent]).where(
and_(
IndexComponent.trade_date == ref_previous_date,
IndexComponent.trade_date.between(ref_previous_date, this_date),
IndexComponent.indexCode == int(index)
)
)
df = pd.read_sql(query, engine)
df = df[df.trade_date == df.trade_date.iloc[-1]]
df['trade_date'] = this_date
if df.empty:
continue
alpha_logger.info('{0} is finished with previous data'.format(index))
else:
df.rename(columns={'ticker': 'indexCode',
'secShortName': 'indexShortName',
......@@ -453,8 +589,9 @@ def update_uqer_index_components(ds, **kwargs):
df['trade_date'] = this_date
del df['secID']
del df['consID']
alpha_logger.info('{0} is finished with new data'.format(index))
total_data = total_data.append(df)
index_codes = total_data.indexCode.unique()
index_codes = [int(index) for index in index_codes]
......@@ -551,23 +688,6 @@ def update_uqer_risk_model(ds, **kwargs):
df.to_sql(SpecificRiskLong.__table__.name, engine, index=False, if_exists='append')
def update_uqer_daily_return(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
previous_date = advanceDateByCalendar('china.sse', this_date, '-1b').strftime('%Y-%m-%d')
query = select([Market.code, Market.chgPct.label('d1')]).where(Market.trade_date == this_date)
df = pd.read_sql(query, engine)
df['trade_date'] = previous_date
engine.execute(delete(DailyReturn).where(DailyReturn.trade_date == previous_date))
data_info_log(df, DailyReturn)
df.to_sql(DailyReturn.__table__.name, engine, index=False, if_exists='append')
def update_uqer_industry_info(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -623,11 +743,12 @@ def fetch_date(table, query_date, engine):
return df
def update_materialized_views(ds, **kwargs):
alpha_logger.info("starting refresh full_factor_view ...")
engine.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY full_factor_view;")
alpha_logger.info("starting cluster full_factor_view ...")
engine.execute("CLUSTER full_factor_view;")
index_market_task = PythonOperator(
task_id='update_uqer_index_market',
provide_context=True,
python_callable=update_uqer_index_market,
dag=dag
)
uqer_task = PythonOperator(
......@@ -644,13 +765,6 @@ market_task = PythonOperator(
dag=dag
)
return_task = PythonOperator(
task_id='update_uqer_daily_return',
provide_context=True,
python_callable=update_uqer_daily_return,
dag=dag
)
industry_task = PythonOperator(
task_id='update_uqer_industry_info',
provide_context=True,
......@@ -658,7 +772,6 @@ industry_task = PythonOperator(
dag=dag
)
return_task.set_upstream(market_task)
industry_task.set_upstream(market_task)
index_task = PythonOperator(
......@@ -689,6 +802,13 @@ universe800_task = PythonOperator(
dag=dag
)
universe1000_task = PythonOperator(
task_id='update_uqer_universe_zz1000',
provide_context=True,
python_callable=update_uqer_universe_zz1000,
dag=dag
)
universe50_task = PythonOperator(
task_id='update_uqer_universe_sh50',
provide_context=True,
......@@ -696,16 +816,32 @@ universe50_task = PythonOperator(
dag=dag
)
universe_zxb_task = PythonOperator(
task_id='update_uqer_universe_zxb',
provide_context=True,
python_callable=update_uqer_universe_zxb,
dag=dag
)
universe_cyb_task = PythonOperator(
task_id='update_uqer_universe_cyb',
provide_context=True,
python_callable=update_uqer_universe_cyb,
dag=dag
)
universe300_task.set_upstream(index_task)
universe500_task.set_upstream(index_task)
universe800_task.set_upstream(index_task)
universe1000_task.set_upstream(index_task)
universe50_task.set_upstream(index_task)
universe_zxb_task.set_upstream(index_task)
universe_cyb_task.set_upstream(index_task)
security_master_task = PythonOperator(
task_id='update_uqer_universe_security_master',
provide_context=True,
python_callable=update_uqer_universe_security_master,
dependes_on_past=True,
dag=dag
)
......@@ -742,18 +878,6 @@ _ = PythonOperator(
dag=dag
)
refresh_materialized_views_task = PythonOperator(
task_id='update_materialized_views',
provide_context=True,
python_callable=update_materialized_views,
dag=dag
)
refresh_materialized_views_task.set_upstream(market_task)
refresh_materialized_views_task.set_upstream(uqer_task)
refresh_materialized_views_task.set_upstream(risk_model_task)
if __name__ == '__main__':
update_uqer_index_components(ds='2010-02-01')
update_uqer_index_components(ds='2017-11-10')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment