Commit 6cc78e2c authored by Dr.李's avatar Dr.李

update script

parent a0912c91
......@@ -13,15 +13,15 @@ from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from uqer import DataAPI as api
from alphamind.utilities import alpha_logger
from alphamind.data.dbmodel.models import SecurityMaster
from sqlalchemy import select, and_, or_
from sqlalchemy import select, and_, or_, delete
from PyFin.api import advanceDateByCalendar
from PyFin.api import isBizDay
from alphamind.data.dbmodel.models import *
uqer.DataAPI.api_base.timeout = 300
start_date = dt.datetime(2017, 8, 22)
dag_name = 'update_uqer_data'
dag_name = 'update_uqer_data_postgres'
default_args = {
'owner': 'wegamekinglc',
......@@ -35,8 +35,8 @@ dag = DAG(
schedule_interval='0 6 * * 1,2,3,4,5'
)
_ = uqer.Client()
engine = sqlalchemy.create_engine()
_ = uqer.Client(token='')
engine = sqlalchemy.create_engine('')
def process_date(ds):
......@@ -82,13 +82,12 @@ def update_uqer_factors(ds, **kwargs):
df.Code = df.Code.astype(int)
del df['secID']
table = 'uqer'
query = delete(Uqer).where(Uqer.Date == this_date)
engine.execute(query)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
data_info_log(df, Uqer)
format_data(df, format='%Y-%m-%d')
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(Uqer.__table__.name, engine, index=False, if_exists='append')
def update_uqer_market(ds, **kwargs):
......@@ -98,17 +97,17 @@ def update_uqer_market(ds, **kwargs):
if not flag:
return
table = 'market'
df = api.MktEqudGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date', 'ticker': 'Code'}, inplace=True)
df.Code = df.Code.astype(int)
del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
query = delete(Market).where(Market.Date == this_date)
engine.execute(query)
data_info_log(df, Market)
format_data(df, format='%Y-%m-%d')
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(Market.__table__.name, engine, index=False, if_exists='append')
def update_uqer_halt_list(ds, **kwargs):
......@@ -118,19 +117,19 @@ def update_uqer_halt_list(ds, **kwargs):
if not flag:
return
table = 'halt_list'
df = api.SecHaltGet(beginDate=ref_date, endDate=ref_date)
df = df[df.assetClass == 'E']
df['Date'] = ref_date
df.rename(columns={'ticker': 'Code'}, inplace=True)
df.Code = df.Code.astype(int)
del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
query = delete(HaltList).where(HaltList.Date == this_date)
engine.execute(query)
data_info_log(df, HaltList)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(HaltList.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_hs300(ds, **kwargs):
......@@ -140,20 +139,30 @@ def update_uqer_universe_hs300(ds, **kwargs):
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'hs300';".format(table, ref_date))
query = delete(Universe).where(
and_(
Universe.Date == this_date,
Universe.universe == 'hs300'
)
)
engine.execute(query)
df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 300".format(ref_date),
engine)
query = select([IndexComponent.Date, IndexComponent.Code]).where(
and_(
IndexComponent.Date == this_date,
IndexComponent.indexCode == 300
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'hs300'
data_info_log(df, table)
data_info_log(df, Universe)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_sh50(ds, **kwargs):
......@@ -163,20 +172,30 @@ def update_uqer_universe_sh50(ds, **kwargs):
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'sh50';".format(table, ref_date))
query = delete(Universe).where(
and_(
Universe.Date == this_date,
Universe.universe == 'sh50'
)
)
engine.execute(query)
df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 16".format(ref_date),
engine)
query = select([IndexComponent.Date, IndexComponent.Code]).where(
and_(
IndexComponent.Date == this_date,
IndexComponent.indexCode == 16
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'sh50'
data_info_log(df, table)
data_info_log(df, Universe)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zz500(ds, **kwargs):
......@@ -186,20 +205,30 @@ def update_uqer_universe_zz500(ds, **kwargs):
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'zz500';".format(table, ref_date))
query = delete(Universe).where(
and_(
Universe.Date == this_date,
Universe.universe == 'zz500'
)
)
engine.execute(query)
df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 905".format(ref_date),
engine)
query = select([IndexComponent.Date, IndexComponent.Code]).where(
and_(
IndexComponent.Date == this_date,
IndexComponent.indexCode == 905
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'zz500'
data_info_log(df, table)
data_info_log(df, Universe)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zz800(ds, **kwargs):
......@@ -209,20 +238,30 @@ def update_uqer_universe_zz800(ds, **kwargs):
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'zz800';".format(table, ref_date))
query = delete(Universe).where(
and_(
Universe.Date == this_date,
Universe.universe == 'zz800'
)
)
engine.execute(query)
df = pd.read_sql("select Date, Code from index_components where Date = '{0}' and indexCode = 906".format(ref_date),
engine)
query = select([IndexComponent.Date, IndexComponent.Code]).where(
and_(
IndexComponent.Date == this_date,
IndexComponent.indexCode == 906
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'zz800'
data_info_log(df, table)
data_info_log(df, Universe)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_security_master(ds, **kwargs):
......@@ -232,13 +271,13 @@ def update_uqer_universe_security_master(ds, **kwargs):
if not flag:
return
table = 'security_master'
df = api.EquGet(equTypeCD='A')
if df.empty:
return
engine.execute("DELETE from {0}".format(table))
query = delete(SecurityMaster)
engine.execute(query)
df = df[df.ticker.str.len() <= 6]
df['Code'] = df.ticker.astype(int)
......@@ -249,8 +288,8 @@ def update_uqer_universe_security_master(ds, **kwargs):
del df['ticker']
del df['secID']
data_info_log(df, table)
df.to_sql(table, engine, index=False, if_exists='append')
data_info_log(df, SecurityMaster)
df.to_sql(SecurityMaster.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_ashare(ds, **kwargs):
......@@ -260,8 +299,13 @@ def update_uqer_universe_ashare(ds, **kwargs):
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'ashare';".format(table, ref_date))
query = delete(Universe).where(
and_(
Universe.Date == this_date,
Universe.universe == 'ashare'
)
)
engine.execute(query)
query = select([SecurityMaster.Code]).where(
and_(
......@@ -281,8 +325,8 @@ def update_uqer_universe_ashare(ds, **kwargs):
df['universe'] = 'ashare'
df['Date'] = this_date
data_info_log(df, table)
df.to_sql(table, engine, index=False, if_exists='append')
data_info_log(df, Universe)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_ashare_ex(ds, **kwargs):
......@@ -292,8 +336,13 @@ def update_uqer_universe_ashare_ex(ds, **kwargs):
if not flag:
return
table = 'universe'
engine.execute("delete from {0} where Date = '{1}' and universe = 'ashare_ex';".format(table, ref_date))
query = delete(Universe).where(
and_(
Universe.Date == this_date,
Universe.universe == 'ashare_ex'
)
)
engine.execute(query)
ex_date = advanceDateByCalendar('china.sse', this_date, '-3m')
......@@ -315,8 +364,8 @@ def update_uqer_universe_ashare_ex(ds, **kwargs):
df['universe'] = 'ashare_ex'
df['Date'] = this_date
data_info_log(df, table)
df.to_sql(table, engine, index=False, if_exists='append')
data_info_log(df, Universe)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_index_components(ds, **kwargs):
......@@ -326,7 +375,6 @@ def update_uqer_index_components(ds, **kwargs):
if not flag:
return
table = 'index_components'
index_codes = ['000001',
'000002',
'000003',
......@@ -381,10 +429,14 @@ def update_uqer_index_components(ds, **kwargs):
if df.empty:
ref_previous_date = advanceDateByCalendar('china.sse', this_date, '-1b')
df = pd.read_sql("select * from {0} where Date = '{1}' and indexCode = {2}".format(table,
ref_previous_date,
int(index)),
engine)
query = select([IndexComponent]).where(
and_(
IndexComponent.Date == ref_previous_date,
IndexComponent.indexCode == int(index)
)
)
df = pd.read_sql(query, engine)
df['Date'] = this_date
if df.empty:
......@@ -402,14 +454,15 @@ def update_uqer_index_components(ds, **kwargs):
del df['consID']
total_data = total_data.append(df)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
query = delete(IndexComponent).where(IndexComponent.Date == this_date)
engine.execute(query)
if total_data.empty:
return
data_info_log(total_data, table)
data_info_log(total_data, IndexComponent)
format_data(total_data)
total_data.to_sql(table, engine, index=False, if_exists='append')
total_data.to_sql(IndexComponent.__table__.name, engine, index=False, if_exists='append')
def update_uqer_risk_model(ds, **kwargs):
......@@ -419,88 +472,78 @@ def update_uqer_risk_model(ds, **kwargs):
if not flag:
return
table = 'risk_exposure'
df = api.RMExposureDayGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date', 'ticker': 'Code'}, inplace=True)
df.Code = df.Code.astype(int)
del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(RiskExposure).where(RiskExposure.Date == this_date))
data_info_log(df, RiskExposure)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(RiskExposure.__table__.name, engine, index=False, if_exists='append')
table = 'risk_return'
df = api.RMFactorRetDayGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date'}, inplace=True)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(RiskReturn).where(RiskReturn.Date == this_date))
data_info_log(df, RiskReturn)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(RiskReturn.__table__.name, engine, index=False, if_exists='append')
table = 'specific_return'
df = api.RMSpecificRetDayGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date', 'ticker': 'Code'}, inplace=True)
df.Code = df.Code.astype(int)
del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(SpecificReturn).where(SpecificReturn.Date == this_date))
data_info_log(df, SpecificReturn)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(SpecificReturn.__table__.name, engine, index=False, if_exists='append')
table = 'risk_cov_day'
df = api.RMCovarianceDayGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date'}, inplace=True)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(RiskCovDay).where(RiskCovDay.Date == this_date))
data_info_log(df, RiskCovDay)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(RiskCovDay.__table__.name, engine, index=False, if_exists='append')
table = 'risk_cov_short'
df = api.RMCovarianceShortGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date'}, inplace=True)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(RiskCovShort).where(RiskCovShort.Date == this_date))
data_info_log(df, RiskCovShort)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(RiskCovShort.__table__.name, engine, index=False, if_exists='append')
table = 'risk_cov_long'
df = api.RMCovarianceLongGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date'}, inplace=True)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(RiskCovLong).where(RiskCovLong.Date == this_date))
data_info_log(df, RiskCovLong)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(RiskCovLong.__table__.name, engine, index=False, if_exists='append')
table = 'specific_risk_day'
df = api.RMSriskDayGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date', 'ticker': 'Code'}, inplace=True)
df.Code = df.Code.astype(int)
del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(SpecificRiskDay).where(SpecificRiskDay.Date == this_date))
data_info_log(df, SpecificRiskDay)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(SpecificRiskDay.__table__.name, engine, index=False, if_exists='append')
table = 'specific_risk_short'
df = api.RMSriskShortGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date', 'ticker': 'Code'}, inplace=True)
df.Code = df.Code.astype(int)
del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(SpecificRiskShort).where(SpecificRiskShort.Date == this_date))
data_info_log(df, SpecificRiskShort)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(SpecificRiskShort.__table__.name, engine, index=False, if_exists='append')
table = 'specific_risk_long'
df = api.RMSriskLongGet(tradeDate=ref_date)
df.rename(columns={'tradeDate': 'Date', 'ticker': 'Code'}, inplace=True)
df.Code = df.Code.astype(int)
del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table)
engine.execute(delete(SpecificRiskLong).where(SpecificRiskLong.Date == this_date))
data_info_log(df, SpecificRiskLong)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(SpecificRiskLong.__table__.name, engine, index=False, if_exists='append')
def update_uqer_daily_return(ds, **kwargs):
......@@ -512,12 +555,12 @@ def update_uqer_daily_return(ds, **kwargs):
previous_date = advanceDateByCalendar('china.sse', this_date, '-1b').strftime('%Y-%m-%d')
table = 'daily_return'
df = pd.read_sql("select Code, chgPct as d1 from market where Date = '{0}'".format(this_date), engine)
query = select([Market.Code, Market.chgPct.label('d1')]).where(Market.Date == this_date)
df = pd.read_sql(query, engine)
df['Date'] = previous_date
engine.execute("delete from {0} where Date = '{1}'".format(table, previous_date))
data_info_log(df, table)
df.to_sql(table, engine, index=False, if_exists='append')
engine.execute(delete(DailyReturn).where(DailyReturn.Date == this_date))
data_info_log(df, DailyReturn)
df.to_sql(DailyReturn.__table__.name, engine, index=False, if_exists='append')
def update_uqer_industry_info(ds, **kwargs):
......@@ -527,12 +570,11 @@ def update_uqer_industry_info(ds, **kwargs):
if not flag:
return
table = 'market'
df = pd.read_sql("select Code from {0} where Date = '{1}'".format(table, this_date), engine)
query = select([Market.Code]).where(Market.Date == this_date)
df = pd.read_sql(query, engine)
codes = df.Code.astype(str).str.zfill(6)
table = 'industry'
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
engine.execute(delete(Industry).where(Industry.Date == this_date))
df = api.EquIndustryGet(intoDate=ref_date)
df = df[df.ticker.isin(codes)]
......@@ -555,9 +597,9 @@ def update_uqer_industry_info(ds, **kwargs):
'IndustryID4',
'IndustryName4']]
data_info_log(df, table)
data_info_log(df, Industry)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append')
df.to_sql(Industry.__table__.name, engine, index=False, if_exists='append')
def fetch_date(table, query_date, engine):
......@@ -575,6 +617,31 @@ def fetch_date(table, query_date, engine):
return df
def update_legacy_factor(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
ms_user = 'sa'
ms_pwd = 'A12345678!'
db = 'MultiFactor'
old_engine = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@10.63.6.219/{2}?charset=cp936'.format(ms_user, ms_pwd, db))
df = fetch_date('FactorData', ref_date, old_engine)
del df['申万一级行业']
del df['申万二级行业']
del df['申万三级行业']
engine.execute(delete(LegacyFactor).where(LegacyFactor.Date == this_date))
df.to_sql(LegacyFactor.__table__.name, engine, if_exists='append', index=False)
return 0
_ = PythonOperator(
task_id='update_uqer_factors',
provide_context=True,
......@@ -688,5 +755,13 @@ _ = PythonOperator(
)
_ = PythonOperator(
task_id='update_legacy_factor',
provide_context=True,
python_callable=update_legacy_factor,
dag=dag
)
if __name__ == '__main__':
update_uqer_risk_model(ds='2017-08-18')
update_uqer_index_components(ds='2017-08-17')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment