Commit a7e93310 authored by Dr.李's avatar Dr.李

update airflow script

parent 5b940f54
SELECT array_to_string(ARRAY(SELECT 'u' || '.' || '"' || c.column_name || '"'
FROM information_schema.columns As c
WHERE table_name = 'uqer'
AND c.column_name NOT IN('trade_date', 'code')
), ',') || ' FROM uqer AS u' As sqlstmt2;
SELECT array_to_string(ARRAY(SELECT 'l' || '.' || '"' || c.column_name || '"'
FROM information_schema.columns As c
WHERE table_name = 'legacy_factor'
AND c.column_name NOT IN('trade_date', 'code')
), ',') || ' FROM legacy_factor AS l' As sqlstmt3;
SELECT array_to_string(ARRAY(SELECT 't' || '.' || '"' || c.column_name || '"'
FROM information_schema.columns As c
WHERE table_name = 'tiny'
AND c.column_name NOT IN('trade_date', 'code')
), ',') || ' FROM tiny AS t' As sqlstmt4;
SELECT array_to_string(ARRAY(SELECT 'r' || '.' || '"' || c.column_name || '"'
FROM information_schema.columns As c
WHERE table_name = 'risk_exposure'
AND c.column_name NOT IN('trade_date', 'code')
), ',') || ' FROM risk_exposure AS r' As sqlstmt5;
SELECT array_to_string(ARRAY(SELECT 'g' || '.' || '"' || c.column_name || '"'
FROM information_schema.columns As c
WHERE table_name = 'gogoal'
AND c.column_name NOT IN('trade_date', 'code')
), ',') || ' FROM gogoal AS g' As sqlstmt6;
create table full_factor as SELECT m."trade_date",m."code",m."secShortName",m."exchangeCD",m."preClosePrice",m."actPreClosePrice",m."openPrice",m."highestPrice",m."lowestPrice",m."closePrice",m."turnoverVol",m."turnoverValue",m."dealAmount",m."turnoverRate",m."accumAdjFactor",m."negMarketValue",m."marketValue",m."chgPct",m."isOpen",m."vwap",
u."AccountsPayablesTDays",u."AccountsPayablesTRate",u."AdminiExpenseRate",u."ARTDays",u."ARTRate",u."ASSI",u."BLEV",u."BondsPayableToAsset",u."CashRateOfSales",u."CashToCurrentLiability",u."CMRA",u."CTOP",u."CTP5",u."CurrentAssetsRatio",u."CurrentAssetsTRate",u."CurrentRatio",u."DAVOL10",u."DAVOL20",u."DAVOL5",u."DDNBT",u."DDNCR",u."DDNSR",u."DebtEquityRatio",u."DebtsAssetRatio",u."DHILO",u."DilutedEPS",u."DVRAT",u."EBITToTOR",u."EGRO",u."EMA10",u."EMA120",u."EMA20",u."EMA5",u."EMA60",u."EPS",u."EquityFixedAssetRatio",u."EquityToAsset",u."EquityTRate",u."ETOP",u."ETP5",u."FinancialExpenseRate",u."FinancingCashGrowRate",u."FixAssetRatio",u."FixedAssetsTRate",u."GrossIncomeRatio",u."HBETA",u."HSIGMA",u."IntangibleAssetRatio",u."InventoryTDays",u."InventoryTRate",u."InvestCashGrowRate",u."LCAP",u."LFLO",u."LongDebtToAsset",u."LongDebtToWorkingCapital",u."LongTermDebtToAsset",u."MA10",u."MA120",u."MA20",u."MA5",u."MA60",u."MAWVAD",u."MFI",u."MLEV",u."NetAssetGrowRate",u."NetProfitGrowRate",u."NetProfitRatio",u."NOCFToOperatingNI",u."NonCurrentAssetsRatio",u."NPParentCompanyGrowRate",u."NPToTOR",u."OperatingExpenseRate",u."OperatingProfitGrowRate",u."OperatingProfitRatio",u."OperatingProfitToTOR",u."OperatingRevenueGrowRate",u."OperCashGrowRate",u."OperCashInToCurrentLiability",u."PB",u."PCF",u."PE",u."PS",u."PSY",u."QuickRatio",u."REVS10",u."REVS20",u."REVS5",u."ROA",u."ROA5",u."ROE",u."ROE5",u."RSI",u."RSTR12",u."RSTR24",u."SalesCostRatio",u."SaleServiceCashToOR",u."SUE",u."TaxRatio",u."TOBT",u."TotalAssetGrowRate",u."TotalAssetsTRate",u."TotalProfitCostRatio",u."TotalProfitGrowRate",u."VOL10",u."VOL120",u."VOL20",u."VOL240",u."VOL5",u."VOL60",u."WVAD",u."REC",u."DAREC",u."GREC",u."FY12P",u."DAREV",u."GREV",u."SFY12P",u."DASREV",u."GSREV",u."FEARNG",u."FSALESG",u."TA2EV",u."CFO2EV",u."ACCA",u."DEGM",u."SUOI",u."EARNMOM",u."FiftyTwoWeekHigh",u."Volatility",u."Skewness",u."ILLIQUIDITY",u."BackwardADJ",u."MACD",u."ADTM",u."ATR14",u."ATR6",u."BIAS10",u."BIAS20",u."BIAS5",u."BIAS60",u."BollDown",u."BollUp",u."CCI10",u."CCI20",u."CCI5",u."CCI88",u."KDJ_K",u."KDJ_D",u."KDJ_J",u."ROC6",u."ROC20",u."SBM",u."STM",u."UpRVI",u."DownRVI",u."RVI",u."SRMI",u."ChandeSD",u."ChandeSU",u."CMO",u."DBCD",u."ARC",u."OBV",u."OBV6",u."OBV20",u."TVMA20",u."TVMA6",u."TVSTD20",u."TVSTD6",u."VDEA",u."VDIFF",u."VEMA10",u."VEMA12",u."VEMA26",u."VEMA5",u."VMACD",u."VOSC",u."VR",u."VROC12",u."VROC6",u."VSTD10",u."VSTD20",u."KlingerOscillator",u."MoneyFlow20",u."AD",u."AD20",u."AD6",u."CoppockCurve",u."ASI",u."ChaikinOscillator",u."ChaikinVolatility",u."EMV14",u."EMV6",u."plusDI",u."minusDI",u."ADX",u."ADXR",u."Aroon",u."AroonDown",u."AroonUp",u."DEA",u."DIFF",u."DDI",u."DIZ",u."DIF",u."MTM",u."MTMMA",u."PVT",u."PVT6",u."PVT12",u."TRIX5",u."TRIX10",u."UOS",u."MA10RegressCoeff12",u."MA10RegressCoeff6",u."PLRC6",u."PLRC12",u."SwingIndex",u."Ulcer10",u."Ulcer5",u."Hurst",u."ACD6",u."ACD20",u."EMA12",u."EMA26",u."APBMA",u."BBI",u."BBIC",u."TEMA10",u."TEMA5",u."MA10Close",u."AR",u."BR",u."ARBR",u."CR20",u."MassIndex",u."BearPower",u."BullPower",u."Elder",u."NVI",u."PVI",u."RC12",u."RC24",u."JDQS20",u."Variance20",u."Variance60",u."Variance120",u."Kurtosis20",u."Kurtosis60",u."Kurtosis120",u."Alpha20",u."Alpha60",u."Alpha120",u."Beta20",u."Beta60",u."Beta120",u."SharpeRatio20",u."SharpeRatio60",u."SharpeRatio120",u."TreynorRatio20",u."TreynorRatio60",u."TreynorRatio120",u."InformationRatio20",u."InformationRatio60",u."InformationRatio120",u."GainVariance20",u."GainVariance60",u."GainVariance120",u."LossVariance20",u."LossVariance60",u."LossVariance120",u."GainLossVarianceRatio20",u."GainLossVarianceRatio60",u."GainLossVarianceRatio120",u."RealizedVolatility",u."REVS60",u."REVS120",u."REVS250",u."REVS750",u."REVS5m20",u."REVS5m60",u."REVS5Indu1",u."REVS20Indu1",u."Volumn1M",u."Volumn3M",u."Price1M",u."Price3M",u."Price1Y",u."Rank1M",u."CashDividendCover",u."DividendCover",u."DividendPaidRatio",u."RetainedEarningRatio",u."CashEquivalentPS",u."DividendPS",u."EPSTTM",u."NetAssetPS",u."TORPS",u."TORPSLatest",u."OperatingRevenuePS",u."OperatingRevenuePSLatest",u."OperatingProfitPS",u."OperatingProfitPSLatest",u."CapitalSurplusFundPS",u."SurplusReserveFundPS",u."UndividedProfitPS",u."RetainedEarningsPS",u."OperCashFlowPS",u."CashFlowPS",u."NetNonOIToTP",u."NetNonOIToTPLatest",u."PeriodCostsRate",u."InterestCover",u."NetProfitGrowRate3Y",u."NetProfitGrowRate5Y",u."OperatingRevenueGrowRate3Y",u."OperatingRevenueGrowRate5Y",u."NetCashFlowGrowRate",u."NetProfitCashCover",u."OperCashInToAsset",u."CashConversionCycle",u."OperatingCycle",u."PEG3Y",u."PEG5Y",u."PEIndu",u."PBIndu",u."PSIndu",u."PCFIndu",u."PEHist20",u."PEHist60",u."PEHist120",u."PEHist250",u."StaticPE",u."ForwardPE",u."EnterpriseFCFPS",u."ShareholderFCFPS",u."ROEDiluted",u."ROEAvg",u."ROEWeighted",u."ROECut",u."ROECutWeighted",u."ROIC",u."ROAEBIT",u."ROAEBITTTM",u."OperatingNIToTP",u."OperatingNIToTPLatest",u."InvestRAssociatesToTP",u."InvestRAssociatesToTPLatest",u."NPCutToNP",u."SuperQuickRatio",u."TSEPToInterestBearDebt",u."DebtTangibleEquityRatio",u."TangibleAToInteBearDebt",u."TangibleAToNetDebt",u."NOCFToTLiability",u."NOCFToInterestBearDebt",u."NOCFToNetDebt",u."TSEPToTotalCapital",u."InteBearDebtToTotalCapital",u."NPParentCompanyCutYOY",u."SalesServiceCashToORLatest",u."CashRateOfSalesLatest",u."NOCFToOperatingNILatest",u."TotalAssets",u."MktValue",u."NegMktValue",u."TEAP",u."NIAP",u."TotalFixedAssets",u."IntFreeCL",u."IntFreeNCL",u."IntCL",u."IntDebt",u."NetDebt",u."NetTangibleAssets",u."WorkingCapital",u."NetWorkingCapital",u."TotalPaidinCapital",u."RetainedEarnings",u."OperateNetIncome",u."ValueChgProfit",u."NetIntExpense",u."EBIT",u."EBITDA",u."EBIAT",u."NRProfitLoss",u."NIAPCut",u."FCFF",u."FCFE",u."DA",u."TRevenueTTM",u."TCostTTM",u."RevenueTTM",u."CostTTM",u."GrossProfitTTM",u."SalesExpenseTTM",u."AdminExpenseTTM",u."FinanExpenseTTM",u."AssetImpairLossTTM",u."NPFromOperatingTTM",u."NPFromValueChgTTM",u."OperateProfitTTM",u."NonOperatingNPTTM",u."TProfitTTM",u."NetProfitTTM",u."NetProfitAPTTM",u."SaleServiceRenderCashTTM",u."NetOperateCFTTM",u."NetInvestCFTTM",u."NetFinanceCFTTM",u."GrossProfit",u."Beta252",u."RSTR504",u."EPIBS",u."CETOP",u."DASTD",u."CmraCNE5",u."HsigmaCNE5",u."SGRO",u."EgibsLong",u."STOM",u."STOQ",u."STOA",u."NLSIZE",
l."ROEAfterNonRecurring",l."EPSAfterNonRecurring",l."EODPrice",l."LogFloatCap",l."BPS",l."SPS",l."DebtToAsset",l."DROEAfterNonRecurring",l."LogTotalCap",l."BP",l."SP",l."EPAfterNonRecurring",l."DivToB",l."DivP",l."EBITToSales",l."EBITAToSales",l."EVToSales",l."EVToEBIT",l."EVToEBITDA",l."EVToNOPLAT",l."EVToIC",l."FCFFPS",l."FCFFToEarningAfterNonRecurring",l."FCFFP",l."ProfitToAsset",l."GrossProfitRatio",l."LATO",l."FATO",l."TATO",l."EquityTO",l."PayableTO",l."RecievableTO",l."RevenueGrowth",l."GrossProfitGrowth",l."NetProfitGrowth",l."GrossCFToRevenue",l."CFToRevenue",l."CFToProfit",l."CFToAsset",l."GrossCFGrowth",l."CFGrowth",l."ICFGrowth",l."AveAmount60",l."PeriodReturn60",l."AmountRatio60to250",l."CFPS",l."CFP",l."NetCFGrowth",l."NetCFGrowthP",l."NetCash",l."NetCashP",l."BVPSGrowth",l."EquityPSGrowth",l."WholeSales",l."WholeProfitAfterNonRecurring",l."ExpenseRatio",l."AcidTestRatio",l."TimeInterestEarnedRatio",l."DepositReceivedVsSale",l."DebtRatioExcemptDepRec",l."SNBARatio",
t."CFinc1",t."BDTO",t."RVOL",t."CHV",t."VAL",
r."BETA",r."MOMENTUM",r."SIZE",r."EARNYILD",r."RESVOL",r."GROWTH",r."BTOP",r."LEVERAGE",r."LIQUIDTY",r."SIZENL",r."Bank",r."RealEstate",r."Health",r."Transportation",r."Mining",r."NonFerMetal",r."HouseApp",r."LeiService",r."MachiEquip",r."BuildDeco",r."CommeTrade",r."CONMAT",r."Auto",r."Textile",r."FoodBever",r."Electronics",r."Computer",r."LightIndus",r."Utilities",r."Telecom",r."AgriForest",r."CHEM",r."Media",r."IronSteel",r."NonBankFinan",r."ELECEQP",r."AERODEF",r."Conglomerates",r."COUNTRY",
e."DROE",e."IVR",e."xueqiu_hotness",
g."con_eps",g."con_eps_rolling",g."con_na",g."con_na_rolling",g."con_np",g."con_npcgrate_1w",g."con_npcgrate_4w",g."con_npcgrate_13w",g."con_npcgrate_26w",g."con_npcgrate_52w",g."con_npcgrate_2y",g."con_np_rolling",g."con_np_yoy",g."con_pb",g."con_pb_order",g."con_pb_rolling",g."con_pb_rolling_order",g."con_or",g."con_pe",g."con_pe_order",g."con_pe_rolling",g."con_pe_rolling_order",g."con_peg",g."con_peg_order",g."con_peg_rolling",g."con_peg_rolling_order",g."con_roe",g."con_target_price",g."market_confidence_5d",g."market_confidence_10d",g."market_confidence_15d",g."market_confidence_25d",g."market_confidence_75d",g."mcap",g."optimism_confidence_5d",g."optimism_confidence_10d",g."optimism_confidence_15d",g."optimism_confidence_25d",g."optimism_confidence_75d",g."pessimism_confidence_5d",g."pessimism_confidence_10d",g."pessimism_confidence_15d",g."pessimism_confidence_25d",g."pessimism_confidence_75d",g."tcap",
s1."SRISK" as d_srisk, s2."SRISK" as s_srisk, s3."SRISK" as l_srisk
FROM market AS m left join uqer AS u on m.trade_date = u.trade_date and m.code = u.code
inner join risk_exposure AS r on m.trade_date = r.trade_date and m.code = r.code
inner join specific_risk_day as s1 on m.trade_date = s1.trade_date and m.code = s1.code
inner join specific_risk_short as s2 on m.trade_date = s2.trade_date and m.code = s2.code
inner join specific_risk_long as s3 on m.trade_date = s3.trade_date and m.code = s3.code
left join legacy_factor as l on m.trade_date = l.trade_date and m.code = l.code
left join tiny as t on m.trade_date = t.trade_date and m.code = t.code
left join experimental as e on m.trade_date = e.trade_date and m.code = e.code
left join gogoal as g on m.trade_date = g.trade_date and m.code = g.code;
create UNIQUE index on full_factor (trade_date, code);
\ No newline at end of file
...@@ -5,23 +5,32 @@ Created on 2017-5-20 ...@@ -5,23 +5,32 @@ Created on 2017-5-20
@author: cheng.li @author: cheng.li
""" """
import os
import sys
import arrow
import datetime as dt import datetime as dt
import uqer import uqer
import sqlalchemy import sqlalchemy
from sqlalchemy import delete import numpy as np
import pandas as pd import pandas as pd
from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.models import DAG from airflow.models import DAG
from uqer import DataAPI as api from uqer import DataAPI as api
from alphamind.utilities import alpha_logger from alphamind.utilities import alpha_logger
from sqlalchemy import select, and_, or_ from sqlalchemy import select, and_, or_, MetaData, delete
from PyFin.api import advanceDateByCalendar from PyFin.api import advanceDateByCalendar
from PyFin.api import isBizDay from PyFin.api import isBizDay
from alphamind.api import SqlEngine
from alphamind.data.dbmodel.models import * from alphamind.data.dbmodel.models import *
from alphamind.api import Universe as UniversProxy
from alphamind.api import industry_styles
from alphamind.api import risk_styles
uqer.DataAPI.api_base.timeout = 300 uqer.DataAPI.api_base.timeout = 300
start_date = dt.datetime(2010, 1, 1) start_date = dt.datetime(2018, 5, 4)
dag_name = 'update_uqer_data_postgres' dag_name = 'update_uqer_data_postgres'
default_args = { default_args = {
...@@ -33,11 +42,12 @@ default_args = { ...@@ -33,11 +42,12 @@ default_args = {
dag = DAG( dag = DAG(
dag_id=dag_name, dag_id=dag_name,
default_args=default_args, default_args=default_args,
schedule_interval='0 6 * * 1,2,3,4,5' schedule_interval='0 1 * * 1,2,3,4,5'
) )
_ = uqer.Client(token='') _ = uqer.Client(token=os.environ['DATAYES_TOKEN'])
engine = sqlalchemy.create_engine('') engine = sqlalchemy.create_engine(os.environ['DB_URI'])
alpha_engine = SqlEngine(os.environ['DB_URI'])
def process_date(ds): def process_date(ds):
...@@ -71,41 +81,6 @@ def data_info_log(df, table): ...@@ -71,41 +81,6 @@ def data_info_log(df, table):
raise ValueError(msg) raise ValueError(msg)
def update_uqer_index_market(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
df = api.MktIdxdGet(tradeDate=ref_date)
df = df[df.exchangeCD.isin(['XSHE', 'XSHG', 'ZICN'])]
df = df[df.ticker <= '999999']
df.rename(columns={'tradeDate': 'trade_date',
'ticker': 'indexCode',
'CHGPct': 'chgPct',
'secShortName': 'indexShortName'}, inplace=True)
df = df[['trade_date',
'indexCode',
'preCloseIndex',
'openIndex',
'highestIndex',
'lowestIndex',
'closeIndex',
'turnoverVol',
'turnoverValue',
'chgPct']]
df['indexCode'] = df.indexCode.astype(int)
query = delete(IndexMarket).where(IndexMarket.trade_date == this_date)
engine.execute(query)
data_info_log(df, Market)
format_data(df, format='%Y-%m-%d')
df.to_sql(IndexMarket.__table__.name, engine, index=False, if_exists='append')
def update_uqer_factors(ds, **kwargs): def update_uqer_factors(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date) flag = check_holiday(this_date)
...@@ -146,161 +121,64 @@ def update_uqer_market(ds, **kwargs): ...@@ -146,161 +121,64 @@ def update_uqer_market(ds, **kwargs):
df.to_sql(Market.__table__.name, engine, index=False, if_exists='append') df.to_sql(Market.__table__.name, engine, index=False, if_exists='append')
def update_uqer_halt_list(ds, **kwargs): def update_uqer_index_market(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
df = api.SecHaltGet(beginDate=ref_date, endDate=ref_date)
df = df[df.assetClass == 'E']
df['trade_date'] = ref_date
df.rename(columns={'ticker': 'code'}, inplace=True)
df.code = df.code.astype(int)
del df['secID']
query = delete(HaltList).where(HaltList.trade_date == this_date)
engine.execute(query)
data_info_log(df, HaltList)
format_data(df)
df.to_sql(HaltList.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_hs300(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date) flag = check_holiday(this_date)
if not flag: if not flag:
return return
query = delete(Universe).where( df = api.MktIdxdGet(tradeDate=ref_date)
and_( df = df[df.exchangeCD.isin(['XSHE', 'XSHG', 'ZICN'])]
Universe.trade_date == this_date, df = df[df.ticker <= '999999']
Universe.universe == 'hs300' df.rename(columns={'tradeDate': 'trade_date',
) 'ticker': 'indexCode',
) 'CHGPct': 'chgPct',
engine.execute(query) 'secShortName': 'indexShortName'}, inplace=True)
df = df[['trade_date',
query = select([IndexComponent.trade_date, IndexComponent.code]).where( 'indexCode',
and_( 'preCloseIndex',
IndexComponent.trade_date == this_date, 'openIndex',
IndexComponent.indexCode == 300 'highestIndex',
) 'lowestIndex',
) 'closeIndex',
df = pd.read_sql(query, engine) 'turnoverVol',
'turnoverValue',
if df.empty: 'chgPct']]
return
df['universe'] = 'hs300'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_sh50(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag: df['indexCode'] = df.indexCode.astype(int)
return
query = delete(Universe).where( query = delete(IndexMarket).where(IndexMarket.trade_date == this_date)
and_(
Universe.trade_date == this_date,
Universe.universe == 'sh50'
)
)
engine.execute(query) engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where( data_info_log(df, Market)
and_( format_data(df, format='%Y-%m-%d')
IndexComponent.trade_date == this_date, df.to_sql(IndexMarket.__table__.name, engine, index=False, if_exists='append')
IndexComponent.indexCode == 16
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'sh50'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zz500(ds, **kwargs): def update_uqer_halt_list(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date) flag = check_holiday(this_date)
if not flag: if not flag:
return return
query = delete(Universe).where( df = api.SecHaltGet(beginDate=ref_date, endDate=ref_date)
and_( df = df[df.assetClass == 'E']
Universe.trade_date == this_date, df['trade_date'] = ref_date
Universe.universe == 'zz500' df.rename(columns={'ticker': 'code'}, inplace=True)
) df.code = df.code.astype(int)
) del df['secID']
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 905
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'zz500'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zz800(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where( query = delete(HaltList).where(HaltList.trade_date == this_date)
and_(
Universe.trade_date == this_date,
Universe.universe == 'zz800'
)
)
engine.execute(query) engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where( data_info_log(df, HaltList)
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 906
)
)
df = pd.read_sql(query, engine)
if df.empty:
return
df['universe'] = 'zz800'
data_info_log(df, Universe)
format_data(df) format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append') df.to_sql(HaltList.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zz1000(ds, **kwargs): def update_universe(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date) flag = check_holiday(this_date)
...@@ -308,95 +186,77 @@ def update_uqer_universe_zz1000(ds, **kwargs): ...@@ -308,95 +186,77 @@ def update_uqer_universe_zz1000(ds, **kwargs):
return return
query = delete(Universe).where( query = delete(Universe).where(
and_( Universe.trade_date == this_date,
Universe.trade_date == this_date,
Universe.universe == 'zz1000'
)
) )
engine.execute(query) engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where( # indexed universe
and_( universe_map = {'hs300': 300,
IndexComponent.trade_date == this_date, 'sh50': 16,
IndexComponent.indexCode == 852 'zz500': 905,
'zz800': 906,
'zz1000': 852,
'zxb': 399005,
'cyb': 399006}
total_df = None
for u in universe_map:
query = select([IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == universe_map[u]
)
) )
)
df = pd.read_sql(query, engine)
if df.empty: df = pd.read_sql(query, engine)
return df[u] = 1
if total_df is None:
df['universe'] = 'zz1000' total_df = df
else:
data_info_log(df, Universe) total_df = pd.merge(total_df, df, on=['code'], how='outer')
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_zxb(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where( # ashare
query = select([SecurityMaster.code]).where(
and_( and_(
Universe.trade_date == this_date, SecurityMaster.listDate <= this_date,
Universe.universe == 'zxb' or_(
SecurityMaster.listStatusCD == 'L',
SecurityMaster.delistDate > this_date
)
) )
) )
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where(
and_(
IndexComponent.trade_date == this_date,
IndexComponent.indexCode == 399005
)
)
df = pd.read_sql(query, engine) df = pd.read_sql(query, engine)
df['ashare'] = 1
total_df = pd.merge(total_df, df, on=['code'], how='outer')
if df.empty: # ashare_ex
return ex_date = advanceDateByCalendar('china.sse', this_date, '-3m')
df['universe'] = 'zxb'
data_info_log(df, Universe)
format_data(df)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_cyb(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = delete(Universe).where(
and_(
Universe.trade_date == this_date,
Universe.universe == 'cyb'
)
)
engine.execute(query)
query = select([IndexComponent.trade_date, IndexComponent.code]).where( query = select([SecurityMaster.code]).where(
and_( and_(
IndexComponent.trade_date == this_date, SecurityMaster.listDate <= ex_date,
IndexComponent.indexCode == 399006 or_(
SecurityMaster.listStatusCD == "L",
SecurityMaster.delistDate > this_date
)
) )
) )
df = pd.read_sql(query, engine) df = pd.read_sql(query, engine)
df['ashare_ex'] = 1
total_df = pd.merge(total_df, df, on=['code'], how='outer')
if df.empty: # industry universe
return codes = total_df.code.tolist()
risk_models = alpha_engine.fetch_risk_model(ref_date, codes)[1]
df = risk_models[['code'] + industry_styles]
df['universe'] = 'cyb' df.columns = [i.lower() for i in df.columns]
total_df = pd.merge(total_df, df, on=['code'], how='outer')
data_info_log(df, Universe) total_df['trade_date'] = this_date
format_data(df) total_df.fillna(0, inplace=True)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append') total_df.to_sql('universe', engine, if_exists='append', index=False)
def update_uqer_universe_security_master(ds, **kwargs): def update_uqer_universe_security_master(ds, **kwargs):
...@@ -406,7 +266,7 @@ def update_uqer_universe_security_master(ds, **kwargs): ...@@ -406,7 +266,7 @@ def update_uqer_universe_security_master(ds, **kwargs):
if not flag: if not flag:
return return
df = api.EquGet(equTypeCD='A') df = api.EquGet(equTypeCD='A').drop_duplicates()
if df.empty: if df.empty:
return return
...@@ -427,80 +287,147 @@ def update_uqer_universe_security_master(ds, **kwargs): ...@@ -427,80 +287,147 @@ def update_uqer_universe_security_master(ds, **kwargs):
df.to_sql(SecurityMaster.__table__.name, engine, index=False, if_exists='append') df.to_sql(SecurityMaster.__table__.name, engine, index=False, if_exists='append')
def update_uqer_universe_ashare(ds, **kwargs): def update_sw1_adj_industry(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date) flag = check_holiday(this_date)
if not flag: if not flag:
return return
query = delete(Universe).where( industry = '申万行业分类'
and_( query = select([Industry]).where(
Universe.trade_date == this_date,
Universe.universe == 'ashare'
)
)
engine.execute(query)
query = select([SecurityMaster.code]).where(
and_( and_(
SecurityMaster.listDate <= this_date, Industry.trade_date == ref_date,
or_( Industry.industry == industry
SecurityMaster.listStatusCD == 'L',
SecurityMaster.delistDate > this_date
)
) )
) )
df = pd.read_sql(query, engine) df = pd.read_sql(query, engine)
df['industry'] = '申万行业分类修订'
df['industryID'] = 10303330102
df['industrySymbol'] = '440102'
if df.empty: ids = df[df.industryName2 == '证券'].index
return df.loc[ids, 'industryName1'] = df.loc[ids, 'industryName2']
df.loc[ids, 'industryID1'] = df.loc[ids, 'industryID2']
df['universe'] = 'ashare' ids = df[df.industryName2 == '银行'].index
df['trade_date'] = this_date df.loc[ids, 'industryName1'] = df.loc[ids, 'industryName2']
df.loc[ids, 'industryID1'] = df.loc[ids, 'industryID2']
data_info_log(df, Universe) ids = df[df.industryName2 == '保险'].index
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append') df.loc[ids, 'industryName1'] = df.loc[ids, 'industryName2']
df.loc[ids, 'industryID1'] = df.loc[ids, 'industryID2']
ids = df[df.industryName2 == '多元金融'].index
df.loc[ids, 'industryName1'] = df.loc[ids, 'industryName2']
df.loc[ids, 'industryID1'] = df.loc[ids, 'industryID2']
def update_uqer_universe_ashare_ex(ds, **kwargs): query = delete(Industry).where(
and_(
Industry.trade_date == ref_date,
Industry.industry == industry + "修订"
)
)
engine.execute(query)
df.to_sql(Industry.__table__.name, engine, if_exists='append', index=False)
def update_dx_industry(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date) flag = check_holiday(this_date)
if not flag: if not flag:
return return
query = delete(Universe).where( barra_sector_dict = {
'Energy':
[],
'Materials':
['建筑建材', '化工', '有色金属', '钢铁', '建筑材料'],
'Industrials':
['采掘', '机械设备', '综合', '建筑装饰', '电子', '交通运输', '轻工制造', '商业贸易', '农林牧渔', '电气设备', '国防军工', '纺织服装', '交运设备'],
'ConsumerDiscretionary':
['休闲服务', '汽车', '传媒'],
'ConsumerStaples':
['食品饮料', '家用电器'],
'HealthCare':
['医药生物'],
'Financials':
['银行', '非银金融', '金融服务'],
'IT':
['计算机', '通信', '信息设备', '信息服务'],
'Utilities':
['公用事业'],
'RealEstate':
['房地产'],
}
# ref: https://en.wikipedia.org/wiki/Global_Industry_Classification_Standard
barra_sector_id_dict = {
'Energy': 10,
'Materials': 15,
'Industrials': 20,
'ConsumerDiscretionary': 25,
'ConsumerStaples': 30,
'HealthCare': 35,
'Financials': 40,
'IT': 45,
'Utilities': 55,
'RealEstate': 60
}
# ref: Morningstar Global Equity Classification Structure
ms_supersector_dict = {
'Cyclical': ['Materials', 'Financials', 'RealEstate', 'ConsumerDiscretionary'],
'Defensive': ['ConsumerStaples', 'HealthCare', 'Utilities'],
'Sensitive': ['Energy', 'Industrials', 'IT']
}
ms_supersector_id_dict = {
'Cyclical': 1,
'Defensive': 2,
'Sensitive': 3
}
barra_sector_rev_dict = {}
for x in barra_sector_dict:
for y in barra_sector_dict[x]:
barra_sector_rev_dict[y] = x
ms_supersector_rev_dict = {}
for x in ms_supersector_dict:
for y in ms_supersector_dict[x]:
ms_supersector_rev_dict[y] = x
industry = '申万行业分类'
query = select([Industry]).where(
and_( and_(
Universe.trade_date == this_date, Industry.trade_date == ref_date,
Universe.universe == 'ashare_ex' Industry.industry == industry
) )
) )
engine.execute(query)
ex_date = advanceDateByCalendar('china.sse', this_date, '-3m') df = pd.read_sql(query, engine)
df['industry'] = '东兴行业分类'
query = select([SecurityMaster.code]).where( df['industryID'] = 0
df['industrySymbol'] = '0'
df['industryID3'] = df['industryID1']
df['industryName3'] = df['industryName1']
df['industryName2'] = [barra_sector_rev_dict[x] for x in df['industryName3']]
df['industryName1'] = [ms_supersector_rev_dict[x] for x in df['industryName2']]
df['industryID1'] = [ms_supersector_id_dict[x] for x in df['industryName1']]
df['industryID2'] = [barra_sector_id_dict[x] for x in df['industryName2']]
query = delete(Industry).where(
and_( and_(
SecurityMaster.listDate <= ex_date, Industry.trade_date == ref_date,
or_( Industry.industry == "东兴行业分类"
SecurityMaster.listStatusCD == "L",
SecurityMaster.delistDate > this_date
)
) )
) )
df = pd.read_sql(query, engine) engine.execute(query)
df.to_sql(Industry.__table__.name, engine, if_exists='append', index=False)
if df.empty:
return
df['universe'] = 'ashare_ex'
df['trade_date'] = this_date
data_info_log(df, Universe)
df.to_sql(Universe.__table__.name, engine, index=False, if_exists='append')
def update_uqer_index_components(ds, **kwargs): def update_uqer_index_components(ds, **kwargs):
...@@ -618,6 +545,33 @@ def update_uqer_index_components(ds, **kwargs): ...@@ -618,6 +545,33 @@ def update_uqer_index_components(ds, **kwargs):
total_data.to_sql(IndexComponent.__table__.name, engine, index=False, if_exists='append') total_data.to_sql(IndexComponent.__table__.name, engine, index=False, if_exists='append')
def update_dummy_index_components(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
query = select([IndexComponent]).where(
and_(
IndexComponent.trade_date == '2018-05-04',
IndexComponent.indexCode.in_([900300, 900905])
)
)
df = pd.read_sql(query, con=engine)
df['trade_date'] = ref_date
query = delete(IndexComponent).where(
and_(
IndexComponent.trade_date == ref_date,
IndexComponent.indexCode.in_([900300, 900905])
)
)
engine.execute(query)
df.to_sql(IndexComponent.__table__.name, engine, index=False, if_exists='append')
def update_uqer_risk_model(ds, **kwargs): def update_uqer_risk_model(ds, **kwargs):
ref_date, this_date = process_date(ds) ref_date, this_date = process_date(ds)
flag = check_holiday(this_date) flag = check_holiday(this_date)
...@@ -629,6 +583,9 @@ def update_uqer_risk_model(ds, **kwargs): ...@@ -629,6 +583,9 @@ def update_uqer_risk_model(ds, **kwargs):
df.rename(columns={'tradeDate': 'trade_date', 'ticker': 'code'}, inplace=True) df.rename(columns={'tradeDate': 'trade_date', 'ticker': 'code'}, inplace=True)
df.code = df.code.astype(int) df.code = df.code.astype(int)
del df['secID'] del df['secID']
del df['exchangeCD']
del df['secShortName']
del df['updateTime']
engine.execute(delete(RiskExposure).where(RiskExposure.trade_date == this_date)) engine.execute(delete(RiskExposure).where(RiskExposure.trade_date == this_date))
data_info_log(df, RiskExposure) data_info_log(df, RiskExposure)
format_data(df) format_data(df)
...@@ -738,6 +695,37 @@ def update_uqer_industry_info(ds, **kwargs): ...@@ -738,6 +695,37 @@ def update_uqer_industry_info(ds, **kwargs):
df.to_sql(Industry.__table__.name, engine, index=False, if_exists='append') df.to_sql(Industry.__table__.name, engine, index=False, if_exists='append')
def update_category(ds, **kwargs):
ref_date, this_date = process_date(ds)
flag = check_holiday(this_date)
if not flag:
return
codes = alpha_engine.fetch_codes(ref_date, UniversProxy('ashare'))
industry_matrix1 = alpha_engine.fetch_industry_matrix(ref_date, codes, 'sw', 1)
industry_matrix2 = alpha_engine.fetch_industry_matrix(ref_date, codes, 'sw_adj', 1)
cols1 = sorted(industry_matrix1.columns[2:].tolist())
vals1 = (industry_matrix1[cols1].values * np.array(range(1, len(cols1)+1))).sum(axis=1)
cols2 = sorted(industry_matrix2.columns[2:].tolist())
vals2 = (industry_matrix2[cols2].values * np.array(range(1, len(cols2) + 1))).sum(axis=1)
df = pd.DataFrame()
df['code'] = industry_matrix1.code.tolist()
df['trade_date'] = ref_date
df['sw1'] = vals1
df['sw1_adj'] = vals2
query = delete(Categories).where(
Categories.trade_date == ref_date
)
engine.execute(query)
df.to_sql(Categories.__table__.name, con=engine, if_exists='append', index=False)
def fetch_date(table, query_date, engine): def fetch_date(table, query_date, engine):
query_date = query_date.replace('-', '') query_date = query_date.replace('-', '')
sql = "select * from {0} where Date = {1}".format(table, query_date) sql = "select * from {0} where Date = {1}".format(table, query_date)
...@@ -754,12 +742,41 @@ def fetch_date(table, query_date, engine): ...@@ -754,12 +742,41 @@ def fetch_date(table, query_date, engine):
return df return df
index_market_task = PythonOperator( def update_factor_master(ds, **kwargs):
task_id='update_uqer_index_market', ref_date, this_date = process_date(ds)
provide_context=True, flag = check_holiday(this_date)
python_callable=update_uqer_index_market,
dag=dag if not flag:
) return
tables = [Uqer, Gogoal, Experimental, RiskExposure]
meta = MetaData(bind=engine, reflect=True)
df = pd.DataFrame(columns=['factor', 'source', 'alias', 'updateTime', 'description'])
for t in tables:
source = t.__table__.name
table = meta.tables[source]
columns = table.columns.keys()
columns = list(set(columns).difference({'trade_date',
'code',
'secShortName',
'exchangeCD',
'updateTime',
'COUNTRY'}))
col_alias = [c + '_' + source for c in columns]
new_df = pd.DataFrame({'factor': columns,
'source': [source] * len(columns),
'alias': col_alias})
df = df.append(new_df)
query = delete(FactorMaster)
engine.execute(query)
df['updateTime'] = arrow.now().format('YYYY-MM-DD, HH:mm:ss')
df.to_sql(FactorMaster.__table__.name, engine, if_exists='append', index=False)
uqer_task = PythonOperator( uqer_task = PythonOperator(
...@@ -776,79 +793,61 @@ market_task = PythonOperator( ...@@ -776,79 +793,61 @@ market_task = PythonOperator(
dag=dag dag=dag
) )
industry_task = PythonOperator( universe_task = PythonOperator(
task_id='update_uqer_industry_info', task_id='update_universe',
provide_context=True, provide_context=True,
python_callable=update_uqer_industry_info, python_callable=update_universe,
dag=dag dag=dag
) )
industry_task.set_upstream(market_task) index_market_task = PythonOperator(
task_id='update_uqer_index_market',
index_task = PythonOperator(
task_id='update_uqer_index_components',
provide_context=True, provide_context=True,
python_callable=update_uqer_index_components, python_callable=update_uqer_index_market,
dag=dag dag=dag
) )
universe300_task = PythonOperator( industry_task = PythonOperator(
task_id='update_uqer_universe_hs300', task_id='update_uqer_industry_info',
provide_context=True, provide_context=True,
python_callable=update_uqer_universe_hs300, python_callable=update_uqer_industry_info,
dag=dag dag=dag
) )
universe500_task = PythonOperator( sw1_adj_industry_task = PythonOperator(
task_id='update_uqer_universe_zz500', task_id='update_sw1_adj_industry',
provide_context=True, provide_context=True,
python_callable=update_uqer_universe_zz500, python_callable=update_sw1_adj_industry,
dag=dag dag=dag
) )
universe800_task = PythonOperator( dx_industry_task = PythonOperator(
task_id='update_uqer_universe_zz800', task_id='update_dx_industry',
provide_context=True, provide_context=True,
python_callable=update_uqer_universe_zz800, python_callable=update_dx_industry,
dag=dag dag=dag
) )
universe1000_task = PythonOperator( industry_task.set_upstream(market_task)
task_id='update_uqer_universe_zz1000', sw1_adj_industry_task.set_upstream(industry_task)
provide_context=True, dx_industry_task.set_upstream(industry_task)
python_callable=update_uqer_universe_zz1000,
dag=dag
)
universe50_task = PythonOperator( categories_task = PythonOperator(
task_id='update_uqer_universe_sh50', task_id='update_categories',
provide_context=True, provide_context=True,
python_callable=update_uqer_universe_sh50, python_callable=update_category,
dag=dag dag=dag
) )
universe_zxb_task = PythonOperator( categories_task.set_upstream(sw1_adj_industry_task)
task_id='update_uqer_universe_zxb',
provide_context=True,
python_callable=update_uqer_universe_zxb,
dag=dag
)
universe_cyb_task = PythonOperator( index_task = PythonOperator(
task_id='update_uqer_universe_cyb', task_id='update_uqer_index_components',
provide_context=True, provide_context=True,
python_callable=update_uqer_universe_cyb, python_callable=update_uqer_index_components,
dag=dag dag=dag
) )
universe300_task.set_upstream(index_task)
universe500_task.set_upstream(index_task)
universe800_task.set_upstream(index_task)
universe1000_task.set_upstream(index_task)
universe50_task.set_upstream(index_task)
universe_zxb_task.set_upstream(index_task)
universe_cyb_task.set_upstream(index_task)
security_master_task = PythonOperator( security_master_task = PythonOperator(
task_id='update_uqer_universe_security_master', task_id='update_uqer_universe_security_master',
provide_context=True, provide_context=True,
...@@ -856,24 +855,8 @@ security_master_task = PythonOperator( ...@@ -856,24 +855,8 @@ security_master_task = PythonOperator(
dag=dag dag=dag
) )
universe_ashare_task = PythonOperator( universe_task.set_upstream(security_master_task)
task_id='update_uqer_universe_ashare', universe_task.set_upstream(index_task)
provide_context=True,
python_callable=update_uqer_universe_ashare,
dag=dag
)
universe_ashare_ex_task = PythonOperator(
task_id='update_uqer_universe_ashare_ex',
provide_context=True,
python_callable=update_uqer_universe_ashare_ex,
dag=dag
)
universe_ashare_task.set_upstream(security_master_task)
universe_ashare_ex_task.set_upstream(security_master_task)
risk_model_task = PythonOperator( risk_model_task = PythonOperator(
task_id='update_uqer_risk_model', task_id='update_uqer_risk_model',
...@@ -882,6 +865,8 @@ risk_model_task = PythonOperator( ...@@ -882,6 +865,8 @@ risk_model_task = PythonOperator(
dag=dag dag=dag
) )
universe_task.set_upstream(risk_model_task)
_ = PythonOperator( _ = PythonOperator(
task_id='update_uqer_halt_list', task_id='update_uqer_halt_list',
provide_context=True, provide_context=True,
...@@ -890,5 +875,16 @@ _ = PythonOperator( ...@@ -890,5 +875,16 @@ _ = PythonOperator(
) )
factor_master_task = PythonOperator(
task_id='update_factor_master',
provide_context=True,
python_callable=update_factor_master,
dag=dag
)
factor_master_task.set_upstream(uqer_task)
if __name__ == '__main__': if __name__ == '__main__':
update_uqer_index_components(ds='2017-11-10') update_universe(ds='2018-05-09')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment