Commit a47c93a0 authored by Dr.李's avatar Dr.李

update script

parent e0fe7c31
......@@ -13,12 +13,14 @@ from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from uqer import DataAPI as api
from alphamind.utilities import alpha_logger
from alphamind.data.dbmodel.models import SecurityMaster
from sqlalchemy import select, and_, or_
from PyFin.api import advanceDateByCalendar
from PyFin.api import isBizDay
uqer.DataAPI.api_base.timeout = 300
start_date = dt.datetime(2017, 2, 3)
start_date = dt.datetime(2017, 8, 22)
dag_name = 'update_uqer_data'
default_args = {
......@@ -30,11 +32,11 @@ default_args = {
dag = DAG(
dag_id=dag_name,
default_args=default_args,
schedule_interval='0 18 * * 1,2,3,4,5'
schedule_interval='0 6 * * 1,2,3,4,5'
)
_ = uqer.Client(token='')
engine = sqlalchemy.create_engine('')
_ = uqer.Client()
engine = sqlalchemy.create_engine()
def process_date(ds):
......@@ -154,6 +156,29 @@ def update_uqer_universe_hs300(ds, **kwargs):
df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_universe_sh50(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'sh50';".format(table, ref_date))
df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 16".format(ref_date),
engine)
if df.empty:
return
df['universe'] = 'sh50'
data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_universe_zz500(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -200,6 +225,100 @@ def update_uqer_universe_zz800(ds, **kwargs):
df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_universe_security_master(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'security_master'
df = api.EquGet(equTypeCD='A')
if df.empty:
return
engine.execute("DELETE from {0}".format(table))
df = df[df.ticker.str.len() <= 6]
df['Code'] = df.ticker.astype(int)
df['listDate'] = pd.to_datetime(df['listDate'], format='%Y-%m-%d')
df['endDate'] = pd.to_datetime(df['endDate'], format='%Y-%m-%d')
df['delistDate'] = pd.to_datetime(df['delistDate'], format='%Y-%m-%d')
del df['ticker']
del df['secID']
data_info_log(df, table)
df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_universe_ashare(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'ashare';".format(table, ref_date))
query = select([SecurityMaster.Code]).where(
and_(
SecurityMaster.listDate <= this_date,
or_(
SecurityMaster.listStatusCD == 'L',
SecurityMaster.delistDate > this_date
)
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'ashare'
df['Date'] = this_date
data_info_log(df, table)
df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_universe_ashare_ex(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'ashare_ex';".format(table, ref_date))
ex_date = advanceDateByCalendar('china.sse', this_date, '-3m')
query = select([SecurityMaster.Code]).where(
and_(
SecurityMaster.listDate <= ex_date,
or_(
SecurityMaster.listStatusCD == "L",
SecurityMaster.delistDate > this_date
)
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'ashare_ex'
df['Date'] = this_date
data_info_log(df, table)
df.to_sql(table, engine, index=False, if_exists='append')
def update_uqer_index_components(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -441,6 +560,21 @@ def update_uqer_industry_info(ds, **kwargs):
df.to_sql(table, engine, index=False, if_exists='append')
def fetch_date(table, query_date, engine):
query_date = query_date.replace('-', '')
sql = "select * from {0} where Date = {1}".format(table, query_date)
df = pd.read_sql_query(sql, engine)
cols = df.columns.tolist()
cols[2] = '申万一级行业'
cols[3] = '申万二级行业'
cols[4] = '申万三级行业'
df.columns = cols
df['Date'] = pd.to_datetime(df.Date.astype(str))
return df
_ = PythonOperator(
task_id='update_uqer_factors',
provide_context=True,
......@@ -501,9 +635,42 @@ sub_task3 = PythonOperator(
dag=dag
)
sub_task4 = PythonOperator(
task_id='update_uqer_universe_sh50',
provide_context=True,
python_callable=update_uqer_universe_sh50,
dag=dag
)
sub_task1.set_upstream(task)
sub_task2.set_upstream(task)
sub_task3.set_upstream(task)
sub_task4.set_upstream(task)
task = PythonOperator(
task_id='update_uqer_universe_security_master',
provide_context=True,
python_callable=update_uqer_universe_security_master,
dag=dag
)
sub_task1 = PythonOperator(
task_id='update_uqer_universe_ashare',
provide_context=True,
python_callable=update_uqer_universe_ashare,
dag=dag
)
sub_task2 = PythonOperator(
task_id='update_uqer_universe_ashare_ex',
provide_context=True,
python_callable=update_uqer_universe_ashare_ex,
dag=dag
)
sub_task1.set_upstream(task)
sub_task2.set_upstream(task)
_ = PythonOperator(
......@@ -520,5 +687,6 @@ _ = PythonOperator(
dag=dag
)
if __name__ == '__main__':
update_uqer_index_components(ds='2011-01-07')
update_uqer_risk_model(ds='2017-08-18')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment