Commit 48b029f8 authored by Dr.李's avatar Dr.李

update scripts to handle the data format

parent 964935b1
...@@ -43,6 +43,10 @@ def process_date(ds): ...@@ -43,6 +43,10 @@ def process_date(ds):
return ref_date, this_date return ref_date, this_date
def format_data(df):
df['Date'] = pd.to_datetime(df['Date'], format='%Y%m%d')
def data_info_log(df, table): def data_info_log(df, table):
data_len = len(df) data_len = len(df)
...@@ -65,6 +69,7 @@ def update_uqer_factors(ds, **kwargs): ...@@ -65,6 +69,7 @@ def update_uqer_factors(ds, **kwargs):
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
...@@ -80,6 +85,7 @@ def update_uqer_market(ds, **kwargs): ...@@ -80,6 +85,7 @@ def update_uqer_market(ds, **kwargs):
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
...@@ -97,6 +103,7 @@ def update_uqer_halt_list(ds, **kwargs): ...@@ -97,6 +103,7 @@ def update_uqer_halt_list(ds, **kwargs):
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
...@@ -110,6 +117,7 @@ def update_uqer_universe_hs300(ds, **kwargs): ...@@ -110,6 +117,7 @@ def update_uqer_universe_hs300(ds, **kwargs):
df['universe'] = 'hs300' df['universe'] = 'hs300'
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
...@@ -123,6 +131,7 @@ def update_uqer_universe_zz500(ds, **kwargs): ...@@ -123,6 +131,7 @@ def update_uqer_universe_zz500(ds, **kwargs):
df['universe'] = 'zz500' df['universe'] = 'zz500'
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
...@@ -153,6 +162,7 @@ def update_uqer_index_components(ds, **kwargs): ...@@ -153,6 +162,7 @@ def update_uqer_index_components(ds, **kwargs):
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(total_data, table) data_info_log(total_data, table)
format_data(total_data)
total_data.to_sql(table, engine, index=False, if_exists='append') total_data.to_sql(table, engine, index=False, if_exists='append')
...@@ -167,6 +177,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -167,6 +177,7 @@ def update_uqer_risk_model(ds, **kwargs):
del df['secID'] del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
table = 'risk_return' table = 'risk_return'
...@@ -174,6 +185,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -174,6 +185,7 @@ def update_uqer_risk_model(ds, **kwargs):
df.rename(columns={'tradeDate': 'Date'}, inplace=True) df.rename(columns={'tradeDate': 'Date'}, inplace=True)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
table = 'specific_return' table = 'specific_return'
...@@ -183,6 +195,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -183,6 +195,7 @@ def update_uqer_risk_model(ds, **kwargs):
del df['secID'] del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
table = 'risk_cov_day' table = 'risk_cov_day'
...@@ -190,6 +203,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -190,6 +203,7 @@ def update_uqer_risk_model(ds, **kwargs):
df.rename(columns={'tradeDate': 'Date'}, inplace=True) df.rename(columns={'tradeDate': 'Date'}, inplace=True)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
table = 'risk_cov_short' table = 'risk_cov_short'
...@@ -197,6 +211,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -197,6 +211,7 @@ def update_uqer_risk_model(ds, **kwargs):
df.rename(columns={'tradeDate': 'Date'}, inplace=True) df.rename(columns={'tradeDate': 'Date'}, inplace=True)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
table = 'risk_cov_long' table = 'risk_cov_long'
...@@ -204,6 +219,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -204,6 +219,7 @@ def update_uqer_risk_model(ds, **kwargs):
df.rename(columns={'tradeDate': 'Date'}, inplace=True) df.rename(columns={'tradeDate': 'Date'}, inplace=True)
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
table = 'specific_risk_day' table = 'specific_risk_day'
...@@ -213,6 +229,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -213,6 +229,7 @@ def update_uqer_risk_model(ds, **kwargs):
del df['secID'] del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
table = 'specific_risk_short' table = 'specific_risk_short'
...@@ -222,6 +239,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -222,6 +239,7 @@ def update_uqer_risk_model(ds, **kwargs):
del df['secID'] del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
table = 'specific_risk_long' table = 'specific_risk_long'
...@@ -231,6 +249,7 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -231,6 +249,7 @@ def update_uqer_risk_model(ds, **kwargs):
del df['secID'] del df['secID']
engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date)) engine.execute("delete from {0} where Date = '{1}';".format(table, ref_date))
data_info_log(df, table) data_info_log(df, table)
format_data(df)
df.to_sql(table, engine, index=False, if_exists='append') df.to_sql(table, engine, index=False, if_exists='append')
...@@ -314,4 +333,4 @@ _ = PythonOperator( ...@@ -314,4 +333,4 @@ _ = PythonOperator(
if __name__ == '__main__': if __name__ == '__main__':
update_uqer_risk_model(ds='2017-06-22') update_uqer_index_components(ds='2017-06-22')
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment