Commit 6fce5070 authored by Dr.李's avatar Dr.李

Merge remote-tracking branch 'origin/master'

parents d57dfaa7 f91fd4e7
arrow >= 0.10.0
cython >= 0.25.2
finance-python >= 0.5.7
mysqlclient >= 1.3.10
numpy >= 1.12.1
numba >= 0.33.0
scikit-learn >= 0.18.1
......@@ -9,4 +8,4 @@ scipy >= 0.19.0
simpleutils >= 0.1.0
sqlalchemy >= 1.1.9
pandas >= 0.19.2
Finance-Python >= 0.5.5
\ No newline at end of file
Finance-Python >= 0.5.5
This diff is collapsed.
......@@ -369,39 +369,6 @@ def update_uqer_universe_ashare_ex(ds, **kwargs):
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_pm500_mirror(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where(
and_(
Universe.trade_date == this_date,
Universe.universe == 'pm500_mirror'
)
)
engine.execute(query)
ms_user = 'sa'
ms_pwd = 'A12345678!'
db = 'PortfolioManagements500'
old_engine = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@10.63.6.219/{2}?charset=utf8'.format(ms_user, ms_pwd, db))
query = "select applyDate trade_date, Code code, 'pm500_mirror' universe from PortfolioManagements500.dbo.StockUniverse " \
"where applyDate = {date}".format(date=ref_date)
df = pd.read_sql(query, old_engine)
if df.empty:
return
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_index_components(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
......@@ -656,66 +623,13 @@ def fetch_date(table, query_date, engine):
return df
def update_legacy_factor(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
ms_user = 'sa'
ms_pwd = 'A12345678!'
db = 'MultiFactor'
old_engine = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@10.63.6.219/{2}?charset=cp936'.format(ms_user, ms_pwd, db))
df = fetch_date('FactorData', ref_date, old_engine)
del df['申万一级行业']
del df['申万二级行业']
del df['申万三级行业']
engine.execute(delete(LegacyFactor).where(LegacyFactor.trade_date == this_date))
df.to_sql(LegacyFactor.__table__.name, engine, if_exists='append', index=False)
return 0
def update_materialized_views(ds, **kwargs):
alpha_logger.info("starting refresh full_factor_view ...")
engine.execute("REFRESH MATERIALIZED VIEW full_factor_view;")
engine.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY full_factor_view;")
alpha_logger.info("starting cluster full_factor_view ...")
engine.execute("CLUSTER full_factor_view;")
def update_tiny_factor(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
ms_user = 'sa'
ms_pwd = 'A12345678!'
db = 'PortfolioManagements500'
old_engine = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@10.63.6.219/{2}?charset=utf8'.format(ms_user, ms_pwd, db))
sql = "select * from AlphaFactors_Difeiyue where Date={date}".format(date=ref_date)
df = pd.read_sql(sql, old_engine)
df = df.rename(columns={'date': 'trade_date'})
df['trade_date'] = pd.to_datetime(df.trade_date.astype(str))
cols = list(df.columns)[:2] + ['CFinc1', 'BDTO', 'RVOL', 'CHV', 'VAL']
df = df[cols]
engine.execute(delete(Tiny).where(Tiny.trade_date == this_date))
df.to_sql(Tiny.__table__.name, engine, if_exists='append', index=False)
return 0
uqer_task = PythonOperator(
task_id='update_uqer_factors',
provide_context=True,
......@@ -782,14 +696,6 @@ universe50_task = PythonOperator(
dag=dag
)
_ = PythonOperator(
task_id='update_uqer_universe_pm500_mirror',
provide_context=True,
python_callable=update_uqer_universe_pm500_mirror,
dag=dag
)
universe300_task.set_upstream(index_task)
universe500_task.set_upstream(index_task)
universe800_task.set_upstream(index_task)
......@@ -836,23 +742,6 @@ _ = PythonOperator(
dag=dag
)
legacy_factor_task = PythonOperator(
task_id='update_legacy_factor',
provide_context=True,
python_callable=update_legacy_factor,
dag=dag
)
tiny_task = PythonOperator(
task_id='update_tiny_factor',
provide_context=True,
python_callable=update_tiny_factor,
dag=dag
)
refresh_materialized_views_task = PythonOperator(
task_id='update_materialized_views',
provide_context=True,
......@@ -863,8 +752,6 @@ refresh_materialized_views_task = PythonOperator(
refresh_materialized_views_task.set_upstream(market_task)
refresh_materialized_views_task.set_upstream(uqer_task)
refresh_materialized_views_task.set_upstream(legacy_factor_task)
refresh_materialized_views_task.set_upstream(tiny_task)
refresh_materialized_views_task.set_upstream(risk_model_task)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment