Commit d50e2048 authored by 李煜's avatar 李煜

code update

parent 74ec7081
......@@ -13,8 +13,8 @@
- factor_earning # 收益质量
- factor_per_share_indicators # 收益质量
### client
程序执行入口
### /client
程序执行入口, 文件目录下包含单类因子计算, 以及合并计算
### sumbit
分布式计算,提交入口
......@@ -22,7 +22,8 @@
### init
初始化分布式计算redis数据库
### q5_cluster_work
分布式引擎启动
### cluster_work
分布式引擎节点启动程序
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: __init__.py.py
@time: 2019-07-16 20:00
"""
\ No newline at end of file
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: all_factor_cal.py
@time: 2019-07-16 20:00
"""
import sys
sys.path.append('../')
sys.path.append('../../')
sys.path.append('../../../')
import time
import argparse
import collections
from datetime import datetime, timedelta
from factor import factor_growth, historical_value, factor_per_share_indicators, factor_cash_flow, factor_constrain, factor_earning
from factor.ttm_fundamental import *
from vision.file_unit.balance import Balance
from vision.file_unit.cash_flow import CashFlow
from vision.file_unit.income import Income
from vision.file_unit.valuation import Valuation
from vision.file_unit.industry import Industry
from vision.file_unit.indicator import Indicator
from factor.utillities.trade_date import TradeDate
from ultron.cluster.invoke.cache_data import cache_data
def get_trade_date(trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
_trade_date = TradeDate()
trade_date_sets = collections.OrderedDict(
sorted(_trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while date_time not in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_growth_data(trade_date):
"""
获取基础数据
按天获取当天交易日所有股票的基础数据
:param trade_date: 交易日
:return:
"""
trade_date_pre_year = get_trade_date(trade_date, 1)
trade_date_pre_year_2 = get_trade_date(trade_date, 2)
trade_date_pre_year_3 = get_trade_date(trade_date, 3)
trade_date_pre_year_4 = get_trade_date(trade_date, 4)
trade_date_pre_year_5 = get_trade_date(trade_date, 5)
# print('trade_date %s' % trade_date)
# print('trade_date_pre_year %s' % trade_date_pre_year)
# print('trade_date_pre_year_2 %s' % trade_date_pre_year_2)
# print('trade_date_pre_year_3 %s' % trade_date_pre_year_3)
# print('trade_date_pre_year_4 %s' % trade_date_pre_year_4)
# print('trade_date_pre_year_5 %s' % trade_date_pre_year_5)
balance_sets = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.total_assets, # 总资产(资产合计)
Balance.total_owner_equities]), # 股东权益合计
[trade_date]))
balance_sets_pre_year = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.total_assets,
Balance.total_owner_equities]),
[trade_date_pre_year]))
balance_sets_pre_year = balance_sets_pre_year.rename(columns={"total_assets": "total_assets_pre_year",
"total_owner_equities": "total_owner_equities_pre_year"})
balance_sets = pd.merge(balance_sets, balance_sets_pre_year, on='symbol')
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.operating_revenue, # 营业收入
Income.operating_profit, # 营业利润
Income.total_profit, # 利润总额
Income.net_profit, # 净利润
Income.operating_cost, # 营业成本
Income.np_parent_company_owners # 归属于母公司所有者的净利润
],
CashFlow._name_: [CashFlow.symbol,
CashFlow.net_finance_cash_flow, # 筹资活动产生的现金流量净额
CashFlow.net_operate_cash_flow, # 经营活动产生的现金流量净额
CashFlow.net_invest_cash_flow, # 投资活动产生的现金流量净额
]
}
# TTM计算连续
ttm_factor_continue = {Income._name_: [Income.symbol,
Income.net_profit, # 净利润
Income.operating_revenue, # 营业收入
Income.operating_cost, # 营业成本
Income.np_parent_company_owners, # 归属于母公司所有者的净利润
]
}
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factor_sets = ttm_factor_sets.drop(columns={"trade_date"})
ttm_factor_sets_pre_year = get_ttm_fundamental([], ttm_factors, trade_date_pre_year).reset_index()
ttm_factor_sets_pre_year = ttm_factor_sets_pre_year.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_1 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year).reset_index()
ttm_factor_sets_pre_year_1 = ttm_factor_sets_pre_year_1.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_2 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year_2).reset_index()
ttm_factor_sets_pre_year_2 = ttm_factor_sets_pre_year_2.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_3 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year_3).reset_index()
ttm_factor_sets_pre_year_3 = ttm_factor_sets_pre_year_3.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_4 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year_4).reset_index()
ttm_factor_sets_pre_year_4 = ttm_factor_sets_pre_year_4.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_5 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year_5).reset_index()
ttm_factor_sets_pre_year_5 = ttm_factor_sets_pre_year_5.drop(columns={"trade_date"})
ttm_factor_sets_pre_year = ttm_factor_sets_pre_year.rename(
columns={"operating_revenue": "operating_revenue_pre_year",
"operating_profit": "operating_profit_pre_year",
"total_profit": "total_profit_pre_year",
"net_profit": "net_profit_pre_year",
"operating_cost": "operating_cost_pre_year",
"np_parent_company_owners": "np_parent_company_owners_pre_year",
"net_finance_cash_flow": "net_finance_cash_flow_pre_year",
"net_operate_cash_flow": "net_operate_cash_flow_pre_year",
"net_invest_cash_flow": "net_invest_cash_flow_pre_year"
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year, on="symbol")
ttm_factor_sets_pre_year_1 = ttm_factor_sets_pre_year_1.rename(
columns={"operating_revenue": "operating_revenue_pre_year_1",
"operating_cost": "operating_cost_pre_year_1",
"net_profit": "net_profit_pre_year_1",
"np_parent_company_owners": "np_parent_company_owners_pre_year_1",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_1, on="symbol")
ttm_factor_sets_pre_year_2 = ttm_factor_sets_pre_year_2.rename(
columns={"operating_revenue": "operating_revenue_pre_year_2",
"operating_cost": "operating_cost_pre_year_2",
"net_profit": "net_profit_pre_year_2",
"np_parent_company_owners": "np_parent_company_owners_pre_year_2",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_2, on="symbol")
ttm_factor_sets_pre_year_3 = ttm_factor_sets_pre_year_3.rename(
columns={"operating_revenue": "operating_revenue_pre_year_3",
"operating_cost": "operating_cost_pre_year_3",
"net_profit": "net_profit_pre_year_3",
"np_parent_company_owners": "np_parent_company_owners_pre_year_3",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_3, on="symbol")
ttm_factor_sets_pre_year_4 = ttm_factor_sets_pre_year_4.rename(
columns={"operating_revenue": "operating_revenue_pre_year_4",
"operating_cost": "operating_cost_pre_year_4",
"net_profit": "net_profit_pre_year_4",
"np_parent_company_owners": "np_parent_company_owners_pre_year_4",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_4, on="symbol")
ttm_factor_sets_pre_year_5 = ttm_factor_sets_pre_year_5.rename(
columns={"operating_revenue": "operating_revenue_pre_year_5",
"operating_cost": "operating_cost_pre_year_5",
"net_profit": "net_profit_pre_year_5",
"np_parent_company_owners": "np_parent_company_owners_pre_year_5",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_5, on="symbol")
return ttm_factor_sets, balance_sets
def get_basic_history_value_data(trade_date):
"""
获取基础数据
按天获取当天交易日所有股票的基础数据
:param trade_date: 交易日
:return:
"""
# PS, PE, PB, PCF
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation._name_,
[Valuation.symbol,
Valuation.pe,
Valuation.ps,
Valuation.pb,
Valuation.pcf,
Valuation.market_cap,
Valuation.circulating_market_cap]), [trade_date]))
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow._name_,
[CashFlow.symbol,
CashFlow.goods_sale_and_service_render_cash]), [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income._name_,
[Income.symbol,
Income.net_profit]), [trade_date]))
industry_set = ['801010', '801020', '801030', '801040', '801050', '801080', '801110', '801120', '801130',
'801140', '801150', '801160', '801170', '801180', '801200', '801210', '801230', '801710',
'801720', '801730', '801740', '801750', '801760', '801770', '801780', '801790', '801880',
'801890']
sw_industry = get_fundamentals(add_filter_trade(query(Industry._name_,
[Industry.symbol,
Industry.isymbol]), [trade_date]))
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.np_parent_company_owners],
CashFlow._name_:[CashFlow.symbol,
CashFlow.net_operate_cash_flow]
}
ttm_factors_sum_list = {Income._name_:[Income.symbol,
Income.net_profit, # 净利润
],}
trade_date_2y = get_trade_date(trade_date, 2)
trade_date_3y = get_trade_date(trade_date, 3)
trade_date_4y = get_trade_date(trade_date, 4)
trade_date_5y = get_trade_date(trade_date, 5)
# print(trade_date_2y, trade_date_3y, trade_date_4y, trade_date_5y)
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factor_sets_3 = get_ttm_fundamental([], ttm_factors, trade_date_3y).reset_index()
ttm_factor_sets_5 = get_ttm_fundamental([], ttm_factors, trade_date_5y).reset_index()
# ttm 周期内计算需要优化
# ttm_factor_sets_sum = get_ttm_fundamental([], ttm_factors_sum_list, trade_date, 5).reset_index()
factor_sets_sum = get_fundamentals(add_filter_trade(query(Valuation._name_,
[Valuation.symbol,
Valuation.market_cap,
Valuation.circulating_market_cap,
Valuation.trade_date]),
[trade_date_2y, trade_date_3y, trade_date_4y, trade_date_5y]))
factor_sets_sum_1 = factor_sets_sum.groupby('symbol')['market_cap'].sum().reset_index().rename(columns={"market_cap": "market_cap_sum",})
factor_sets_sum_2 = factor_sets_sum.groupby('symbol')['circulating_market_cap'].sum().reset_index().rename(columns={"circulating_market_cap": "circulating_market_cap_sum",})
# print(factor_sets_sum_1)
# 根据申万一级代码筛选
sw_industry = sw_industry[sw_industry['isymbol'].isin(industry_set)]
# 合并价值数据和申万一级行业
valuation_sets = pd.merge(valuation_sets, sw_industry, on='symbol')
# valuation_sets = pd.merge(valuation_sets, sw_industry, on='symbol', how="outer")
ttm_factor_sets = ttm_factor_sets.drop(columns={"trade_date"})
ttm_factor_sets_3 = ttm_factor_sets_3.rename(columns={"np_parent_company_owners": "np_parent_company_owners_3"})
ttm_factor_sets_3 = ttm_factor_sets_3.drop(columns={"trade_date"})
ttm_factor_sets_5 = ttm_factor_sets_5.rename(columns={"np_parent_company_owners": "np_parent_company_owners_5"})
ttm_factor_sets_5 = ttm_factor_sets_5.drop(columns={"trade_date"})
# ttm_factor_sets_sum = ttm_factor_sets_sum.rename(columns={"net_profit": "net_profit_5"})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_3, on='symbol')
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_5, on='symbol')
# ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_sum, on='symbol')
ttm_factor_sets = pd.merge(ttm_factor_sets, factor_sets_sum_1, on='symbol')
ttm_factor_sets = pd.merge(ttm_factor_sets, factor_sets_sum_2, on='symbol')
# ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_3, on='symbol', how='outer')
# ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_5, on='symbol', how='outer')
return valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets
def get_basic_scale_data(trade_date):
"""
获取基础数据
按天获取当天交易日所有股票的基础数据
:param trade_date: 交易日
:return:
"""
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation._name_,
[Valuation.symbol,
Valuation.market_cap,
Valuation.capitalization, # 总股本
Valuation.circulating_market_cap]), #
[trade_date]))
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow._name_,
[CashFlow.symbol,
CashFlow.cash_and_equivalents_at_end, # 现金及现金等价物净增加额
CashFlow.cash_equivalent_increase]), # 期末现金及现金等价物余额(元)
[trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income._name_,
[Income.symbol,
Income.basic_eps, # 基本每股收益
Income.diluted_eps, # 稀释每股收益
Income.net_profit,
Income.operating_revenue, # 营业收入
Income.operating_profit, # 营业利润
Income.total_operating_revenue]), # 营业总收入
[trade_date]))
balance_sets = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.capital_reserve_fund, # 资本公积
Balance.surplus_reserve_fund, # 盈余公积
Balance.total_assets, # 总资产(资产合计)
Balance.dividend_receivable, # 股利
Balance.retained_profit, # 未分配利润
Balance.total_owner_equities]), # 归属于母公司的所有者权益
[trade_date]))
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.operating_revenue, # 营业收入
Income.operating_profit, # 营业利润
Income.np_parent_company_owners, # 归属于母公司所有者股东的净利润
Income.total_operating_revenue], # 营业总收入
CashFlow._name_: [CashFlow.symbol,
CashFlow.net_operate_cash_flow] # 经营活动产生的现金流量净额
}
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factor_sets = ttm_factor_sets.rename(columns={"np_parent_company_owners": "np_parent_company_owners_ttm"})
ttm_factor_sets = ttm_factor_sets.rename(columns={"net_operate_cash_flow": "net_operate_cash_flow_ttm"})
ttm_factor_sets = ttm_factor_sets.rename(columns={"operating_revenue": "operating_revenue_ttm"})
ttm_factor_sets = ttm_factor_sets.rename(columns={"operating_profit": "operating_profit_ttm"})
ttm_factor_sets = ttm_factor_sets.rename(columns={"total_operating_revenue": "total_operating_revenue_ttm"})
ttm_factor_sets = ttm_factor_sets.drop(columns={"trade_date"})
return valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets
def get_basic_cash_flow(trade_date):
"""
获取cash flow所需要的因子
:param trade_date:
:return:
"""
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow.__name__,
[CashFlow.symbol,
CashFlow.net_operate_cash_flow,
CashFlow.goods_sale_and_service_render_cash])
, [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income.__name__,
[Income.symbol,
Income.operating_revenue,
Income.total_operating_cost,
Income.total_operating_revenue]), [trade_date]))
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation.__name__,
[Valuation.symbol,
Valuation.market_cap,
Valuation.circulating_market_cap]), [trade_date]))
# 合并
tp_cash_flow = pd.merge(cash_flow_sets, income_sets, on="symbol")
tp_cash_flow = tp_cash_flow[-tp_cash_flow.duplicated()]
ttm_factors = {Balance.__name__: [Balance.symbol,
Balance.total_liability,
Balance.shortterm_loan,
Balance.longterm_loan,
Balance.total_current_liability,
Balance.net_liability,
Balance.total_current_assets,
Balance.interest_bearing_liability,
Balance.total_assets],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.net_operate_cash_flow,
CashFlow.goods_sale_and_service_render_cash,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol,
Income.operating_revenue,
Income.total_operating_revenue,
Income.total_operating_cost,
Income.net_profit,
Income.np_parent_company_owners]
}
ttm_cash_flow_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_cash_flow_sets = ttm_cash_flow_sets[-ttm_cash_flow_sets.duplicated()]
# 合并
ttm_cash_flow_sets = pd.merge(ttm_cash_flow_sets, valuation_sets, on="symbol")
return tp_cash_flow, ttm_cash_flow_sets
def get_basic_constrain(trade_date):
# 读取当前因子
# 资产负债
balance_sets = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.total_current_liability,
Balance.total_liability,
Balance.total_assets,
Balance.total_current_assets,
Balance.fixed_assets,
Balance.interest_bearing_liability
]), [trade_date]))
balance_sets = balance_sets[-balance_sets.duplicated()]
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.operating_cost,
Income.operating_revenue,
Income.operating_tax_surcharges,
Income.total_operating_revenue,
Income.total_operating_cost,
Income.financial_expense,
Income.sale_expense,
Income.administration_expense
]}
ttm_constrain_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_constrain_sets = ttm_constrain_sets[-ttm_constrain_sets.duplicated()]
return balance_sets, ttm_constrain_sets
def get_basic_earning(trade_date):
# 读取目前涉及到的因子
# 当期数据
# pdb.set_trace()
balance_sets = get_fundamentals(add_filter_trade(query(Balance.__name__,
[Balance.symbol,
Balance.equities_parent_company_owners])
, [trade_date]))
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow.__name__,
[CashFlow.symbol,
CashFlow.goods_sale_and_service_render_cash])
, [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income.__name__,
[Income.symbol,
Income.total_operating_revenue,
Income.total_operating_cost,
Income.invest_income_associates,
Income.non_operating_revenue,
Income.non_operating_expense,
Income.total_profit,
Income.net_profit,
Income.np_parent_company_owners
])
, [trade_date]))
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation.__name__,
[Valuation.symbol,
Valuation.circulating_market_cap])
, [trade_date]))
indicator_sets = get_fundamentals(add_filter_trade(query(Indicator.__name__,
[Indicator.symbol,
Indicator.adjusted_profit])
, [trade_date]))
# 合并
tp_earning = pd.merge(cash_flow_sets, balance_sets, on="symbol")
tp_earning = pd.merge(tp_earning, income_sets, on="symbol")
tp_earning = pd.merge(tp_earning, valuation_sets, on="symbol")
tp_earning = pd.merge(tp_earning, indicator_sets, on="symbol")
tp_earning = tp_earning[-tp_earning.duplicated()]
# tp_earning.set_index('symbol', inplace=True)
# TTM数据
ttm_factors = {Balance.__name__: [Balance.symbol,
Balance.total_assets,
Balance.equities_parent_company_owners,
Balance.total_owner_equities
],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol,
Income.total_operating_revenue,
Income.operating_revenue,
Income.interest_income,
Income.total_operating_cost,
Income.operating_cost,
Income.financial_expense,
Income.invest_income_associates,
Income.operating_profit,
Income.non_operating_revenue,
Income.non_operating_expense,
Income.total_profit,
Income.net_profit,
Income.np_parent_company_owners
]
}
ttm_earning = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_earning = ttm_earning[-ttm_earning.duplicated()]
## 5年TTM数据
ttm_factors = {Balance.__name__: [Balance.symbol,
Balance.total_assets,
Balance.total_owner_equities],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol,
Income.net_profit,]
}
# 通过cache_data.set_cache, 会使得index的name丢失
ttm_earning_5y = get_ttm_fundamental([], ttm_factors, trade_date, year=5).reset_index()
ttm_earning_5y = ttm_earning_5y[-ttm_earning_5y.duplicated()]
return tp_earning, ttm_earning_5y, ttm_earning
def cash_flow_calculate(trade_date):
# cash flow
tp_cash_flow, ttm_cash_flow_sets = get_basic_cash_flow(trade_date)
if len(tp_cash_flow) <= 0 or len(ttm_cash_flow_sets) <=0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date) + "1", trade_date, tp_cash_flow.to_json(orient='records'))
cache_data.set_cache(session + str(trade_date) + "2", trade_date, ttm_cash_flow_sets.to_json(orient='records'))
factor_cash_flow.factor_calculate.delay(date_index=trade_date, session=session)
time4 = time.time()
print('cash_flow_cal_time:{}'.format(time4 - tic))
def constrain_calculate(trade_date):
# cash flow
balance_sets, ttm_factors_sets = get_basic_constrain(trade_date)
if len(balance_sets) <= 0 or len(ttm_factors_sets) <= 0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date) + '1', trade_date, balance_sets.to_json(orient='records'))
cache_data.set_cache(session + str(trade_date) + '2', trade_date, ttm_factors_sets.to_json(orient='records'))
factor_constrain.factor_calculate.delay(date_index=trade_date, session=session)
time5 = time.time()
print('constrain_cal_time:{}'.format(time5 - tic))
def growth_calculate(trade_date):
# cash flow
ttm_factor_sets, balance_sets = get_basic_growth_data(trade_date)
growth_sets = pd.merge(ttm_factor_sets, balance_sets, on='symbol')
if len(growth_sets) <= 0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date), trade_date, growth_sets.to_json(orient='records'))
factor_growth.factor_calculate.delay(date_index=trade_date, session=session)
time1 = time.time()
print('growth_cal_time:{}'.format(time1 - tic))
def earning_calculate(trade_date):
# cash flow
tp_earning, ttm_earning_5y, ttm_earning = get_basic_earning(trade_date)
if len(tp_earning) <= 0 or len(ttm_earning_5y) <= 0 or len(ttm_earning) <= 0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date) + "1", trade_date, tp_earning.to_json(orient='records'))
cache_data.set_cache(session + str(trade_date) + "2", trade_date, ttm_earning_5y.to_json(orient='records'))
cache_data.set_cache(session + str(trade_date) + "3", trade_date, ttm_earning.to_json(orient='records'))
factor_earning.factor_calculate.delay(date_index=trade_date, session=session)
time6 = time.time()
print('earning_cal_time:{}'.format(time6 - tic))
def historical_value_calculate(trade_date):
# history_value
valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets = get_basic_history_value_data(trade_date)
valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol')
if len(valuation_sets) <= 0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date), trade_date, valuation_sets.to_json(orient='records'))
historical_value.factor_calculate.delay(date_index=trade_date, session=session)
time2 = time.time()
print('history_cal_time:{}'.format(time2 - tic))
def per_share_calculate(trade_date):
# per share indicators
valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets = get_basic_scale_data(trade_date)
valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, balance_sets, on='symbol')
if len(valuation_sets) <= 0 :
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date), trade_date, valuation_sets.to_json(orient='records'))
factor_per_share_indicators.factor_calculate.delay(date_index=trade_date, session=session)
time3 = time.time()
print('per_share_cal_time:{}'.format(time3 - tic))
def do_update(start_date, end_date, count):
# 读取本地交易日
_trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
cash_flow_calculate(trade_date)
constrain_calculate(trade_date)
growth_calculate(trade_date)
earning_calculate(trade_date)
historical_value_calculate(trade_date)
per_share_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
factor_per_share = factor_per_share_indicators.PerShareIndicators('factor_per_share')
factor_per_share.create_dest_tables()
factor_cash_flow = factor_cash_flow.FactorCashFlow('factor_cash_flow')
factor_cash_flow.create_dest_tables()
factor_constrain = factor_constrain.FactorConstrain('factor_constrain')
factor_constrain.create_dest_tables()
factor_growth = factor_growth.Growth('factor_growth')
factor_growth.create_dest_tables()
factor_earning = factor_earning.FactorEarning('factor_earning')
factor_earning.create_dest_tables()
factor_historical_value = historical_value.HistoricalValue('factor_historical_value')
factor_historical_value.create_dest_tables()
do_update(args.start_date, end_date, args.count)
if args.update:
do_update(args.start_date, end_date, args.count)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: cash_flow.py
@time: 2019-07-16 17:31
"""
import sys
sys.path.append('../')
sys.path.append('../../')
sys.path.append('../../../')
import time
import collections
import argparse
from datetime import datetime, timedelta
from factor import factor_cash_flow
from factor.ttm_fundamental import *
from vision.file_unit.balance import Balance
from vision.file_unit.cash_flow import CashFlow
from vision.file_unit.income import Income
from vision.file_unit.valuation import Valuation
from factor.utillities.trade_date import TradeDate
from ultron.cluster.invoke.cache_data import cache_data
def get_trade_date(trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
_trade_date = TradeDate()
trade_date_sets = collections.OrderedDict(
sorted(_trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while date_time not in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_cash_flow(trade_date):
"""
获取cash flow所需要的因子
:param trade_date:
:return:
"""
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow.__name__,
[CashFlow.symbol,
CashFlow.net_operate_cash_flow,
CashFlow.goods_sale_and_service_render_cash])
, [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income.__name__,
[Income.symbol,
Income.operating_revenue,
Income.total_operating_cost,
Income.total_operating_revenue]), [trade_date]))
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation.__name__,
[Valuation.symbol,
Valuation.market_cap,
Valuation.circulating_market_cap]), [trade_date]))
# 合并
tp_cash_flow = pd.merge(cash_flow_sets, income_sets, on="symbol")
tp_cash_flow = tp_cash_flow[-tp_cash_flow.duplicated()]
ttm_factors = {Balance.__name__: [Balance.symbol,
Balance.total_liability,
Balance.shortterm_loan,
Balance.longterm_loan,
Balance.total_current_liability,
Balance.net_liability,
Balance.total_current_assets,
Balance.interest_bearing_liability,
Balance.total_assets],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.net_operate_cash_flow,
CashFlow.goods_sale_and_service_render_cash,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol,
Income.operating_revenue,
Income.total_operating_revenue,
Income.total_operating_cost,
Income.net_profit,
Income.np_parent_company_owners]
}
ttm_cash_flow_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_cash_flow_sets = ttm_cash_flow_sets[-ttm_cash_flow_sets.duplicated()]
# 合并
ttm_cash_flow_sets = pd.merge(ttm_cash_flow_sets, valuation_sets, on="symbol")
return tp_cash_flow, ttm_cash_flow_sets
def prepare_calculate(trade_date):
# cash flow
tp_cash_flow, ttm_cash_flow_sets = get_basic_cash_flow(trade_date)
if len(tp_cash_flow) <= 0 or len(ttm_cash_flow_sets) <=0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date) + "1", trade_date, tp_cash_flow.to_json(orient='records'))
cache_data.set_cache(session + str(trade_date) + "2", trade_date, ttm_cash_flow_sets.to_json(orient='records'))
factor_cash_flow.factor_calculate.delay(date_index=trade_date, session=session)
time4 = time.time()
print('cash_flow_cal_time:{}'.format(time4 - tic))
def do_update(start_date, end_date, count):
# 读取本地交易日
_trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
prepare_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = factor_cash_flow.FactorCashFlow('factor_cash_flow')
processor.create_dest_tables()
do_update(args.start_date, end_date, args.count)
if args.update:
do_update(args.start_date, end_date, args.count)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: constrain.py
@time: 2019-07-16 19:22
"""
import sys
sys.path.append('../')
sys.path.append('../../')
sys.path.append('../../../')
import time
import collections
import argparse
from datetime import datetime, timedelta
from factor import factor_constrain
from factor.ttm_fundamental import *
from vision.file_unit.balance import Balance
from vision.file_unit.income import Income
from factor.utillities.trade_date import TradeDate
from ultron.cluster.invoke.cache_data import cache_data
def get_trade_date(trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
_trade_date = TradeDate()
trade_date_sets = collections.OrderedDict(
sorted(_trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while date_time not in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_constrain(trade_date):
# 读取当前因子
# 资产负债
balance_sets = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.total_current_liability,
Balance.total_liability,
Balance.total_assets,
Balance.total_current_assets,
Balance.fixed_assets,
Balance.interest_bearing_liability
]), [trade_date]))
balance_sets = balance_sets[-balance_sets.duplicated()]
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.operating_cost,
Income.operating_revenue,
Income.operating_tax_surcharges,
Income.total_operating_revenue,
Income.total_operating_cost,
Income.financial_expense,
Income.sale_expense,
Income.administration_expense
]}
ttm_constrain_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_constrain_sets = ttm_constrain_sets[-ttm_constrain_sets.duplicated()]
return balance_sets, ttm_constrain_sets
def prepare_calculate(trade_date):
# factor_constrain
balance_sets, ttm_factors_sets = get_basic_constrain(trade_date)
if len(balance_sets) <= 0 or len(ttm_factors_sets) <=0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date) + '1', trade_date, balance_sets.to_json(orient='records'))
cache_data.set_cache(session + str(trade_date) + '2', trade_date, ttm_factors_sets.to_json(orient='records'))
factor_constrain.factor_calculate.delay(date_index=trade_date, session=session)
time5 = time.time()
print('constrain_cal_time:{}'.format(time5 - tic))
def do_update(start_date, end_date, count):
# 读取本地交易日
_trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
prepare_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = factor_constrain.FactorConstrain('factor_constrain')
processor.create_dest_tables()
do_update(args.start_date, end_date, args.count)
if args.update:
do_update(args.start_date, end_date, args.count)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: earning.py
@time: 2019-07-16 19:44
"""
import sys
sys.path.append('../')
sys.path.append('../../')
sys.path.append('../../../')
import time
import collections
import argparse
from datetime import datetime, timedelta
from factor import factor_earning
from factor.ttm_fundamental import *
from vision.file_unit.balance import Balance
from vision.file_unit.cash_flow import CashFlow
from vision.file_unit.income import Income
from vision.file_unit.valuation import Valuation
from vision.file_unit.indicator import Indicator
from factor.utillities.trade_date import TradeDate
from ultron.cluster.invoke.cache_data import cache_data
def get_trade_date(trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
_trade_date = TradeDate()
trade_date_sets = collections.OrderedDict(
sorted(_trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while date_time not in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_earning(trade_date):
# 读取目前涉及到的因子
# 当期数据
# pdb.set_trace()
balance_sets = get_fundamentals(add_filter_trade(query(Balance.__name__,
[Balance.symbol,
Balance.equities_parent_company_owners])
, [trade_date]))
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow.__name__,
[CashFlow.symbol,
CashFlow.goods_sale_and_service_render_cash])
, [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income.__name__,
[Income.symbol,
Income.total_operating_revenue,
Income.total_operating_cost,
Income.invest_income_associates,
Income.non_operating_revenue,
Income.non_operating_expense,
Income.total_profit,
Income.net_profit,
Income.np_parent_company_owners
])
, [trade_date]))
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation.__name__,
[Valuation.symbol,
Valuation.circulating_market_cap])
, [trade_date]))
indicator_sets = get_fundamentals(add_filter_trade(query(Indicator.__name__,
[Indicator.symbol,
Indicator.adjusted_profit])
, [trade_date]))
# 合并
tp_earning = pd.merge(cash_flow_sets, balance_sets, on="symbol")
tp_earning = pd.merge(tp_earning, income_sets, on="symbol")
tp_earning = pd.merge(tp_earning, valuation_sets, on="symbol")
tp_earning = pd.merge(tp_earning, indicator_sets, on="symbol")
tp_earning = tp_earning[-tp_earning.duplicated()]
# tp_earning.set_index('symbol', inplace=True)
# TTM数据
ttm_factors = {Balance.__name__: [Balance.symbol,
Balance.total_assets,
Balance.equities_parent_company_owners,
Balance.total_owner_equities
],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol,
Income.total_operating_revenue,
Income.operating_revenue,
Income.interest_income,
Income.total_operating_cost,
Income.operating_cost,
Income.financial_expense,
Income.invest_income_associates,
Income.operating_profit,
Income.non_operating_revenue,
Income.non_operating_expense,
Income.total_profit,
Income.net_profit,
Income.np_parent_company_owners
]
}
ttm_earning = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_earning = ttm_earning[-ttm_earning.duplicated()]
## 5年TTM数据
ttm_factors = {Balance.__name__: [Balance.symbol,
Balance.total_assets,
Balance.total_owner_equities],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol,
Income.net_profit,]
}
# 通过cache_data.set_cache, 会使得index的name丢失
ttm_earning_5y = get_ttm_fundamental([], ttm_factors, trade_date, year=5).reset_index()
ttm_earning_5y = ttm_earning_5y[-ttm_earning_5y.duplicated()]
return tp_earning, ttm_earning_5y, ttm_earning
def prepare_calculate(trade_date):
# cash flow
tp_earning, ttm_earning_5y, ttm_earning = get_basic_earning(trade_date)
if len(tp_earning) <= 0 or len(ttm_earning_5y) <= 0 or len(ttm_earning) <= 0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date) + "1", trade_date, tp_earning.to_json(orient='records'))
cache_data.set_cache(session + str(trade_date) + "2", trade_date, ttm_earning_5y.to_json(orient='records'))
cache_data.set_cache(session + str(trade_date) + "3", trade_date, ttm_earning.to_json(orient='records'))
factor_earning.factor_calculate.delay(date_index=trade_date, session=session)
time6 = time.time()
print('earning_cal_time:{}'.format(time6 - tic))
def do_update(start_date, end_date, count):
# 读取本地交易日
_trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
prepare_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = factor_earning.FactorEarning('factor_earning')
processor.create_dest_tables()
do_update(args.start_date, end_date, args.count)
if args.update:
do_update(args.start_date, end_date, args.count)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: 0.1
@author: zzh
@file: factor_scale_value.py
@time: 2019-01-28 11:33
"""
import sys
from vision.file_unit.balance import Balance
from jpy.factor.ttm_fundamental import get_ttm_fundamental
sys.path.append("../")
sys.path.append("../../")
sys.path.append("../../../")
import argparse
import time
import collections
import pandas as pd
from datetime import datetime, timedelta
from jpy.factor.factor_base import FactorBase
from vision.fm.signletion_engine import *
from vision.file_unit.income import Income
from vision.file_unit.valuation import Valuation
from ultron.cluster.invoke.cache_data import cache_data
from factor import factor_scale_value_task
from factor.utillities.trade_date import TradeDate
class FactorScaleValue(FactorBase):
def __init__(self, name):
super(FactorScaleValue, self).__init__(name)
self._trade_date = TradeDate()
# 构建因子表
def create_dest_tables(self):
"""
创建数据库表
:return:
"""
drop_sql = """drop table if exists `{0}`""".format(self._name)
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`mkt_value` decimal(19,4) NOT NULL,
`cir_mkt_value` decimal(19,4),
`sales_ttm` decimal(19,4),
`total_assets` decimal(19,4),
`log_of_mkt_value` decimal(19, 4),
`log_of_neg_mkt_value` decimal(19,4),
`nl_size` decimal(19,4),
`log_sales_ttm` decimal(19,4),
`log_total_last_qua_assets` decimal(19,4),
PRIMARY KEY(`id`,`trade_date`,`symbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self._name)
super(FactorScaleValue, self)._create_tables(create_sql, drop_sql)
def get_trade_date(self, trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
# print("trade_date %s" % trade_date)
trade_date_sets = collections.OrderedDict(
sorted(self._trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while not date_time in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_data(self, trade_date):
"""
获取基础数据
按天获取当天交易日所有股票的基础数据
:param trade_date: 交易日
:return:
"""
# market_cap,circulating_market_cap,total_operating_revenue
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation._name_,
[Valuation.symbol,
Valuation.market_cap,
Valuation.circulating_market_cap]), [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income._name_,
[Income.symbol,
Income.total_operating_revenue]), [trade_date]))
balance_set = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.total_assets]), [trade_date]))
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.total_operating_revenue]
}
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
# ttm 周期内计算需要优化
# ttm_factor_sets_sum = get_ttm_fundamental([], ttm_factors_sum_list, trade_date, 5).reset_index()
ttm_factor_sets = ttm_factor_sets.drop(columns={"trade_date"})
return valuation_sets, ttm_factor_sets, income_sets, balance_set
def prepaer_calculate(self, trade_date):
valuation_sets, ttm_factor_sets, income_sets, balance_set = self.get_basic_data(trade_date)
# valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, balance_set, on='symbol')
if len(valuation_sets) <= 0:
print("%s has no data" % trade_date)
return
else:
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session, 'scale' + str(trade_date), valuation_sets.to_json(orient='records'))
factor_scale_value_task.calculate.delay(factor_name='scale' + str(trade_date), trade_date=trade_date,
session=session)
def do_update(self, start_date, end_date, count):
# 读取本地交易日
trade_date_sets = self._trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
self.prepaer_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = FactorScaleValue('factor_scale_value')
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
if args.update:
processor = FactorScaleValue('factor_scale_value')
processor.do_update(args.start_date, end_date, args.count)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: 0.1
@author: zzh
@file: factor_volatility_value.py
@time: 2019-01-28 11:33
"""
import sys
sys.path.append("../")
sys.path.append("../../")
sys.path.append("../../../")
import argparse
import time
import collections
from datetime import datetime, timedelta
from factor.factor_base import FactorBase
from vision.fm.signletion_engine import *
from vision.file_unit.sk_daily_price import SKDailyPrice
from ultron.cluster.invoke.cache_data import cache_data
from factor import factor_volatility_value_task
import json
from factor.utillities.trade_date import TradeDate
class FactorVolatilityValue(FactorBase):
def __init__(self, name):
super(FactorVolatilityValue, self).__init__(name)
self._trade_date = TradeDate()
# 构建因子表
def create_dest_tables(self):
"""
创建数据库表
:return:
"""
drop_sql = """drop table if exists `{0}`""".format(self._name)
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`variance_20d` decimal(19,4) NOT NULL,
`variance_60d` decimal(19,4) NOT NULL,
`variance_120d` decimal(19,4) NOT NULL,
`kurtosis_20d` decimal(19,4) NOT NULL,
`kurtosis_60d` decimal(19,4) NOT NULL,
`kurtosis_120d` decimal(19,4) NOT NULL,
`alpha_20d` decimal(19,4) NOT NULL,
`alpha_60d` decimal(19,4) NOT NULL,
`alpha_120d` decimal(19,4) NOT NULL,
`beta_20d` decimal(19,4) NOT NULL,
`beta_60d` decimal(19,4) NOT NULL,
`beta_120d` decimal(19,4) NOT NULL,
`sharp_20d` decimal(19,4) NOT NULL,
`sharp_60d` decimal(19,4) NOT NULL,
`sharp_120d` decimal(19,4) NOT NULL,
`tr_20d` decimal(19,4) NOT NULL,
`tr_60d` decimal(19,4) NOT NULL,
`tr_120d` decimal(19,4) NOT NULL,
`ir_20d` decimal(19,4) NOT NULL,
`ir_60d` decimal(19,4) NOT NULL,
`ir_120d` decimal(19,4) NOT NULL,
`gain_variance_20d` decimal(19,4) NOT NULL,
`gain_variance_60d` decimal(19,4) NOT NULL,
`gain_variance_120d` decimal(19,4) NOT NULL,
`loss_variance_20d` decimal(19,4) NOT NULL,
`loss_variance_60d` decimal(19,4) NOT NULL,
`loss_variance_120d` decimal(19,4) NOT NULL,
`gain_loss_variance_ratio_20d` decimal(19,4) NOT NULL,
`gain_loss_variance_ratio_60d` decimal(19,4) NOT NULL,
`gain_loss_variance_ratio_120d` decimal(19,4) NOT NULL,
`dastd_252d` decimal(19,4) NOT NULL,
`ddnsr_12m` decimal(19,4) NOT NULL,
`ddncr_12m` decimal(19,4) NOT NULL,
`dvrat` decimal(19,4) NOT NULL,
PRIMARY KEY(`id`,`trade_date`,`symbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self._name)
super(FactorVolatilityValue, self)._create_tables(create_sql, drop_sql)
def get_trade_date(self, trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
# print("trade_date %s" % trade_date)
trade_date_sets = collections.OrderedDict(
sorted(self._trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while not date_time in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_data(self, trade_date):
"""
获取基础数据
按天获取当天交易日所有股票的基础数据
:param trade_date: 交易日
:return:
"""
# market_cap,circulating_market_cap,total_operating_revenue
count = 300
sk_daily_price_sets = get_sk_history_price([], trade_date, count, [SKDailyPrice.symbol,
SKDailyPrice.trade_date, SKDailyPrice.open,
SKDailyPrice.close, SKDailyPrice.high,
SKDailyPrice.low])
index_daily_price_sets = get_index_history_price(["000300.XSHG"], trade_date, count,
["symbol", "trade_date", "close"])
temp_price_sets = index_daily_price_sets[index_daily_price_sets.trade_date <= trade_date]
return sk_daily_price_sets, temp_price_sets[:count]
def prepaer_calculate(self, trade_date):
self.trade_date = trade_date
tp_price_return, temp_price_sets = self.get_basic_data(trade_date)
# tp_price_return.set_index('symbol', inplace=True)
# tp_price_return['symbol'] = tp_price_return.index
# symbol_sets = list(set(tp_price_return['symbol']))
# tp_price_return_list = pd.DataFrame()
#
# for symbol in symbol_sets:
# if len(tp_price_return[tp_price_return['symbol'] == symbol]) < 3:
# continue
# tp_price_return_list = tp_price_return_list.append(
# tp_price_return.loc[symbol].sort_values(by='trade_date', ascending=True))
if len(tp_price_return) <= 0:
print("%s has no data" % trade_date)
return
else:
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
data = {
'total_data': tp_price_return.to_json(orient='records'),
'index_daily_price_sets': temp_price_sets.to_json(orient='records')
}
cache_data.set_cache(session, 'volatility' + str(trade_date),
json.dumps(data))
# cache_data.set_cache(session, 'volatility' + str(trade_date) + '_a',
# tp_price_return_list.to_json(orient='records'))
# cache_data.set_cache(session, 'volatility' + str(trade_date) + '_b',
# temp_price_sets.to_json(orient='records'))
factor_volatility_value_task.calculate.delay(factor_name='volatility' + str(trade_date), trade_date=trade_date,
session=session)
def do_update(self, start_date, end_date, count):
# 读取本地交易日
trade_date_sets = self._trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
self.prepaer_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = FactorVolatilityValue('factor_volatility_value')
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
if args.update:
processor = FactorVolatilityValue('factor_volatility_value')
processor.do_update(args.start_date, end_date, args.count)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: growth.py
@time: 2019-07-16 19:38
"""
import sys
sys.path.append('../')
sys.path.append('../../')
sys.path.append('../../../')
import time
import collections
import argparse
from datetime import datetime, timedelta
from factor import factor_growth
from factor.ttm_fundamental import *
from vision.file_unit.balance import Balance
from vision.file_unit.cash_flow import CashFlow
from vision.file_unit.income import Income
from factor.utillities.trade_date import TradeDate
from ultron.cluster.invoke.cache_data import cache_data
def get_trade_date(trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
_trade_date = TradeDate()
trade_date_sets = collections.OrderedDict(
sorted(_trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while date_time not in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_growth_data(trade_date):
"""
获取基础数据
按天获取当天交易日所有股票的基础数据
:param trade_date: 交易日
:return:
"""
trade_date_pre_year = get_trade_date(trade_date, 1)
trade_date_pre_year_2 = get_trade_date(trade_date, 2)
trade_date_pre_year_3 = get_trade_date(trade_date, 3)
trade_date_pre_year_4 = get_trade_date(trade_date, 4)
trade_date_pre_year_5 = get_trade_date(trade_date, 5)
# print('trade_date %s' % trade_date)
# print('trade_date_pre_year %s' % trade_date_pre_year)
# print('trade_date_pre_year_2 %s' % trade_date_pre_year_2)
# print('trade_date_pre_year_3 %s' % trade_date_pre_year_3)
# print('trade_date_pre_year_4 %s' % trade_date_pre_year_4)
# print('trade_date_pre_year_5 %s' % trade_date_pre_year_5)
balance_sets = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.total_assets, # 总资产(资产合计)
Balance.total_owner_equities]), # 股东权益合计
[trade_date]))
balance_sets_pre_year = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.total_assets,
Balance.total_owner_equities]),
[trade_date_pre_year]))
balance_sets_pre_year = balance_sets_pre_year.rename(columns={"total_assets": "total_assets_pre_year",
"total_owner_equities": "total_owner_equities_pre_year"})
balance_sets = pd.merge(balance_sets, balance_sets_pre_year, on='symbol')
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.operating_revenue, # 营业收入
Income.operating_profit, # 营业利润
Income.total_profit, # 利润总额
Income.net_profit, # 净利润
Income.operating_cost, # 营业成本
Income.np_parent_company_owners # 归属于母公司所有者的净利润
],
CashFlow._name_: [CashFlow.symbol,
CashFlow.net_finance_cash_flow, # 筹资活动产生的现金流量净额
CashFlow.net_operate_cash_flow, # 经营活动产生的现金流量净额
CashFlow.net_invest_cash_flow, # 投资活动产生的现金流量净额
]
}
# TTM计算连续
ttm_factor_continue = {Income._name_: [Income.symbol,
Income.net_profit, # 净利润
Income.operating_revenue, # 营业收入
Income.operating_cost, # 营业成本
Income.np_parent_company_owners, # 归属于母公司所有者的净利润
]
}
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factor_sets = ttm_factor_sets.drop(columns={"trade_date"})
ttm_factor_sets_pre_year = get_ttm_fundamental([], ttm_factors, trade_date_pre_year).reset_index()
ttm_factor_sets_pre_year = ttm_factor_sets_pre_year.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_1 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year).reset_index()
ttm_factor_sets_pre_year_1 = ttm_factor_sets_pre_year_1.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_2 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year_2).reset_index()
ttm_factor_sets_pre_year_2 = ttm_factor_sets_pre_year_2.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_3 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year_3).reset_index()
ttm_factor_sets_pre_year_3 = ttm_factor_sets_pre_year_3.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_4 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year_4).reset_index()
ttm_factor_sets_pre_year_4 = ttm_factor_sets_pre_year_4.drop(columns={"trade_date"})
ttm_factor_sets_pre_year_5 = get_ttm_fundamental([], ttm_factor_continue, trade_date_pre_year_5).reset_index()
ttm_factor_sets_pre_year_5 = ttm_factor_sets_pre_year_5.drop(columns={"trade_date"})
ttm_factor_sets_pre_year = ttm_factor_sets_pre_year.rename(
columns={"operating_revenue": "operating_revenue_pre_year",
"operating_profit": "operating_profit_pre_year",
"total_profit": "total_profit_pre_year",
"net_profit": "net_profit_pre_year",
"operating_cost": "operating_cost_pre_year",
"np_parent_company_owners": "np_parent_company_owners_pre_year",
"net_finance_cash_flow": "net_finance_cash_flow_pre_year",
"net_operate_cash_flow": "net_operate_cash_flow_pre_year",
"net_invest_cash_flow": "net_invest_cash_flow_pre_year"
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year, on="symbol")
ttm_factor_sets_pre_year_1 = ttm_factor_sets_pre_year_1.rename(
columns={"operating_revenue": "operating_revenue_pre_year_1",
"operating_cost": "operating_cost_pre_year_1",
"net_profit": "net_profit_pre_year_1",
"np_parent_company_owners": "np_parent_company_owners_pre_year_1",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_1, on="symbol")
ttm_factor_sets_pre_year_2 = ttm_factor_sets_pre_year_2.rename(
columns={"operating_revenue": "operating_revenue_pre_year_2",
"operating_cost": "operating_cost_pre_year_2",
"net_profit": "net_profit_pre_year_2",
"np_parent_company_owners": "np_parent_company_owners_pre_year_2",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_2, on="symbol")
ttm_factor_sets_pre_year_3 = ttm_factor_sets_pre_year_3.rename(
columns={"operating_revenue": "operating_revenue_pre_year_3",
"operating_cost": "operating_cost_pre_year_3",
"net_profit": "net_profit_pre_year_3",
"np_parent_company_owners": "np_parent_company_owners_pre_year_3",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_3, on="symbol")
ttm_factor_sets_pre_year_4 = ttm_factor_sets_pre_year_4.rename(
columns={"operating_revenue": "operating_revenue_pre_year_4",
"operating_cost": "operating_cost_pre_year_4",
"net_profit": "net_profit_pre_year_4",
"np_parent_company_owners": "np_parent_company_owners_pre_year_4",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_4, on="symbol")
ttm_factor_sets_pre_year_5 = ttm_factor_sets_pre_year_5.rename(
columns={"operating_revenue": "operating_revenue_pre_year_5",
"operating_cost": "operating_cost_pre_year_5",
"net_profit": "net_profit_pre_year_5",
"np_parent_company_owners": "np_parent_company_owners_pre_year_5",
})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_pre_year_5, on="symbol")
return ttm_factor_sets, balance_sets
def prepare_calculate(trade_date):
# cash flow
ttm_factor_sets, balance_sets = get_basic_growth_data(trade_date)
growth_sets = pd.merge(ttm_factor_sets, balance_sets, on='symbol')
if len(growth_sets) <= 0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date), trade_date, growth_sets.to_json(orient='records'))
factor_growth.factor_calculate.delay(date_index=trade_date, session=session)
time1 = time.time()
print('growth_cal_time:{}'.format(time1 - tic))
def do_update(start_date, end_date, count):
# 读取本地交易日
_trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
prepare_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = factor_growth.Growth('factor_growth')
processor.create_dest_tables()
do_update(args.start_date, end_date, args.count)
if args.update:
do_update(args.start_date, end_date, args.count)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: historical_value.py
@time: 2019-07-16 19:48
"""
import sys
sys.path.append('../')
sys.path.append('../../')
sys.path.append('../../../')
import time
import collections
import argparse
from datetime import datetime, timedelta
from factor import historical_value
from factor.ttm_fundamental import *
from vision.file_unit.balance import Balance
from vision.file_unit.cash_flow import CashFlow
from vision.file_unit.income import Income
from vision.file_unit.valuation import Valuation
from vision.file_unit.industry import Industry
from vision.file_unit.indicator import Indicator
from factor.utillities.trade_date import TradeDate
from ultron.cluster.invoke.cache_data import cache_data
def get_trade_date(trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
_trade_date = TradeDate()
trade_date_sets = collections.OrderedDict(
sorted(_trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while date_time not in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_history_value_data(trade_date):
"""
获取基础数据
按天获取当天交易日所有股票的基础数据
:param trade_date: 交易日
:return:
"""
# PS, PE, PB, PCF
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation._name_,
[Valuation.symbol,
Valuation.pe,
Valuation.ps,
Valuation.pb,
Valuation.pcf,
Valuation.market_cap,
Valuation.circulating_market_cap]), [trade_date]))
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow._name_,
[CashFlow.symbol,
CashFlow.goods_sale_and_service_render_cash]), [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income._name_,
[Income.symbol,
Income.net_profit]), [trade_date]))
industry_set = ['801010', '801020', '801030', '801040', '801050', '801080', '801110', '801120', '801130',
'801140', '801150', '801160', '801170', '801180', '801200', '801210', '801230', '801710',
'801720', '801730', '801740', '801750', '801760', '801770', '801780', '801790', '801880',
'801890']
sw_industry = get_fundamentals(add_filter_trade(query(Industry._name_,
[Industry.symbol,
Industry.isymbol]), [trade_date]))
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.np_parent_company_owners],
CashFlow._name_:[CashFlow.symbol,
CashFlow.net_operate_cash_flow]
}
ttm_factors_sum_list = {Income._name_:[Income.symbol,
Income.net_profit, # 净利润
],}
trade_date_2y = get_trade_date(trade_date, 2)
trade_date_3y = get_trade_date(trade_date, 3)
trade_date_4y = get_trade_date(trade_date, 4)
trade_date_5y = get_trade_date(trade_date, 5)
# print(trade_date_2y, trade_date_3y, trade_date_4y, trade_date_5y)
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factor_sets_3 = get_ttm_fundamental([], ttm_factors, trade_date_3y).reset_index()
ttm_factor_sets_5 = get_ttm_fundamental([], ttm_factors, trade_date_5y).reset_index()
# ttm 周期内计算需要优化
# ttm_factor_sets_sum = get_ttm_fundamental([], ttm_factors_sum_list, trade_date, 5).reset_index()
factor_sets_sum = get_fundamentals(add_filter_trade(query(Valuation._name_,
[Valuation.symbol,
Valuation.market_cap,
Valuation.circulating_market_cap,
Valuation.trade_date]),
[trade_date_2y, trade_date_3y, trade_date_4y, trade_date_5y]))
factor_sets_sum_1 = factor_sets_sum.groupby('symbol')['market_cap'].sum().reset_index().rename(columns={"market_cap": "market_cap_sum",})
factor_sets_sum_2 = factor_sets_sum.groupby('symbol')['circulating_market_cap'].sum().reset_index().rename(columns={"circulating_market_cap": "circulating_market_cap_sum",})
# print(factor_sets_sum_1)
# 根据申万一级代码筛选
sw_industry = sw_industry[sw_industry['isymbol'].isin(industry_set)]
# 合并价值数据和申万一级行业
valuation_sets = pd.merge(valuation_sets, sw_industry, on='symbol')
# valuation_sets = pd.merge(valuation_sets, sw_industry, on='symbol', how="outer")
ttm_factor_sets = ttm_factor_sets.drop(columns={"trade_date"})
ttm_factor_sets_3 = ttm_factor_sets_3.rename(columns={"np_parent_company_owners": "np_parent_company_owners_3"})
ttm_factor_sets_3 = ttm_factor_sets_3.drop(columns={"trade_date"})
ttm_factor_sets_5 = ttm_factor_sets_5.rename(columns={"np_parent_company_owners": "np_parent_company_owners_5"})
ttm_factor_sets_5 = ttm_factor_sets_5.drop(columns={"trade_date"})
# ttm_factor_sets_sum = ttm_factor_sets_sum.rename(columns={"net_profit": "net_profit_5"})
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_3, on='symbol')
ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_5, on='symbol')
# ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_sum, on='symbol')
ttm_factor_sets = pd.merge(ttm_factor_sets, factor_sets_sum_1, on='symbol')
ttm_factor_sets = pd.merge(ttm_factor_sets, factor_sets_sum_2, on='symbol')
# ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_3, on='symbol', how='outer')
# ttm_factor_sets = pd.merge(ttm_factor_sets, ttm_factor_sets_5, on='symbol', how='outer')
return valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets
def prepare_calculate(trade_date):
# history_value
valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets = get_basic_history_value_data(trade_date)
valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol')
if len(valuation_sets) <= 0:
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date), trade_date, valuation_sets.to_json(orient='records'))
historical_value.factor_calculate.delay(date_index=trade_date, session=session)
time2 = time.time()
print('history_cal_time:{}'.format(time2 - tic))
def do_update(start_date, end_date, count):
# 读取本地交易日
_trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
prepare_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = historical_value.HistoricalValue('factor_historical_value')
processor.create_dest_tables()
do_update(args.start_date, end_date, args.count)
if args.update:
do_update(args.start_date, end_date, args.count)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: per_share_indicator.py
@time: 2019-07-16 19:51
"""
import sys
sys.path.append('../')
sys.path.append('../../')
sys.path.append('../../../')
import time
import collections
import argparse
from datetime import datetime, timedelta
from factor import factor_per_share_indicators
from factor.ttm_fundamental import *
from vision.file_unit.balance import Balance
from vision.file_unit.cash_flow import CashFlow
from vision.file_unit.income import Income
from vision.file_unit.valuation import Valuation
from vision.file_unit.industry import Industry
from vision.file_unit.indicator import Indicator
from factor.utillities.trade_date import TradeDate
from ultron.cluster.invoke.cache_data import cache_data
def get_trade_date(trade_date, n):
"""
获取当前时间前n年的时间点,且为交易日,如果非交易日,则往前提取最近的一天。
:param trade_date: 当前交易日
:param n:
:return:
"""
_trade_date = TradeDate()
trade_date_sets = collections.OrderedDict(
sorted(_trade_date._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
time_array = datetime.strptime(str(trade_date), "%Y%m%d")
time_array = time_array - timedelta(days=365) * n
date_time = int(datetime.strftime(time_array, "%Y%m%d"))
if date_time < min(trade_date_sets.keys()):
# print('date_time %s is outof trade_date_sets' % date_time)
return date_time
else:
while date_time not in trade_date_sets:
date_time = date_time - 1
# print('trade_date pre %s year %s' % (n, date_time))
return date_time
def get_basic_scale_data(trade_date):
"""
获取基础数据
按天获取当天交易日所有股票的基础数据
:param trade_date: 交易日
:return:
"""
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation._name_,
[Valuation.symbol,
Valuation.market_cap,
Valuation.capitalization, # 总股本
Valuation.circulating_market_cap]), #
[trade_date]))
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow._name_,
[CashFlow.symbol,
CashFlow.cash_and_equivalents_at_end, # 现金及现金等价物净增加额
CashFlow.cash_equivalent_increase]), # 期末现金及现金等价物余额(元)
[trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income._name_,
[Income.symbol,
Income.basic_eps, # 基本每股收益
Income.diluted_eps, # 稀释每股收益
Income.net_profit,
Income.operating_revenue, # 营业收入
Income.operating_profit, # 营业利润
Income.total_operating_revenue]), # 营业总收入
[trade_date]))
balance_sets = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.capital_reserve_fund, # 资本公积
Balance.surplus_reserve_fund, # 盈余公积
Balance.total_assets, # 总资产(资产合计)
Balance.dividend_receivable, # 股利
Balance.retained_profit, # 未分配利润
Balance.total_owner_equities]), # 归属于母公司的所有者权益
[trade_date]))
# TTM计算
ttm_factors = {Income._name_: [Income.symbol,
Income.operating_revenue, # 营业收入
Income.operating_profit, # 营业利润
Income.np_parent_company_owners, # 归属于母公司所有者股东的净利润
Income.total_operating_revenue], # 营业总收入
CashFlow._name_: [CashFlow.symbol,
CashFlow.net_operate_cash_flow] # 经营活动产生的现金流量净额
}
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factor_sets = ttm_factor_sets.rename(columns={"np_parent_company_owners": "np_parent_company_owners_ttm"})
ttm_factor_sets = ttm_factor_sets.rename(columns={"net_operate_cash_flow": "net_operate_cash_flow_ttm"})
ttm_factor_sets = ttm_factor_sets.rename(columns={"operating_revenue": "operating_revenue_ttm"})
ttm_factor_sets = ttm_factor_sets.rename(columns={"operating_profit": "operating_profit_ttm"})
ttm_factor_sets = ttm_factor_sets.rename(columns={"total_operating_revenue": "total_operating_revenue_ttm"})
ttm_factor_sets = ttm_factor_sets.drop(columns={"trade_date"})
return valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets
def prepare_calculate(trade_date):
# per share indicators
valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets = get_basic_scale_data(trade_date)
valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, balance_sets, on='symbol')
if len(valuation_sets) <= 0 :
print("%s has no data" % trade_date)
return
else:
tic = time.time()
session = str(int(time.time() * 1000000 + datetime.now().microsecond))
cache_data.set_cache(session + str(trade_date), trade_date, valuation_sets.to_json(orient='records'))
factor_per_share_indicators.factor_calculate.delay(date_index=trade_date, session=session)
time3 = time.time()
print('per_share_cal_time:{}'.format(time3 - tic))
def do_update(start_date, end_date, count):
# 读取本地交易日
_trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
print('因子计算日期: %s' % trade_date)
prepare_calculate(trade_date)
print('----->')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = factor_per_share_indicators.PerShareIndicators('factor_per_share')
processor.create_dest_tables()
do_update(args.start_date, end_date, args.count)
if args.update:
do_update(args.start_date, end_date, args.count)
......@@ -14,4 +14,6 @@ app = create_app('factor', ['factor.factor_growth',
'factor.factor_cash_flow',
'factor.factor_constrain',
'factor.factor_earning',
'factor.factor_per_share_indicators'])
'factor.factor_per_share_indicators',
'factor.factor_volatility_value_task',
'factor.factor_scale_value_task'])
# coding=utf-8
import time
from pandas.io.json import json_normalize, json
import pandas as pd
import math
import sys
sys.path.append("../")
sys.path.append("../../")
sys.path.append("../../../")
from factor import app
from factor.factor_base import FactorBase
from ultron.cluster.invoke.cache_data import cache_data
def lcap(tp_historical_value, factor_scale_value):
"""
:param tp_historical_value:
:param factor_scale_value:
:return:
"""
columns_lists = ['symbol', 'market_cap']
historical_value = tp_historical_value.loc[:, columns_lists]
historical_value['log_of_mkt_value'] = historical_value['market_cap'].map(lambda x: math.log(abs(x)))
historical_value = historical_value.drop(columns=['market_cap'], axis=1)
factor_scale_value = pd.merge(factor_scale_value, historical_value, on="symbol")
return factor_scale_value
def lflo(tp_historical_value, factor_scale_value):
"""
:param tp_historical_value:
:param factor_scale_value:
:return:
"""
columns_lists = ['symbol', 'circulating_market_cap']
historical_value = tp_historical_value.loc[:, columns_lists]
historical_value['log_of_neg_mkt_value'] = historical_value['circulating_market_cap'].map(
lambda x: math.log(abs(x)))
historical_value = historical_value.drop(columns=['circulating_market_cap'], axis=1)
factor_scale_value = pd.merge(factor_scale_value, historical_value, on="symbol")
return factor_scale_value
def nlsize(tp_historical_value, factor_scale_value):
"""
:param tp_historical_value:
:param factor_scale_value:
:return:
"""
columns_lists = ['symbol', 'log_of_mkt_value']
historical_value = tp_historical_value.loc[:, columns_lists]
historical_value['nl_size'] = historical_value['log_of_mkt_value'].map(
lambda x: pow(x, 3))
historical_value = historical_value.drop(columns=['log_of_mkt_value'], axis=1)
factor_scale_value = pd.merge(factor_scale_value, historical_value, on="symbol")
return factor_scale_value
def lst(tp_historical_value, factor_scale_value):
"""
:param tp_historical_value:
:param factor_scale_value:
:return:
"""
columns_lists = ['symbol', 'total_operating_revenue']
historical_value = tp_historical_value.loc[:, columns_lists]
historical_value['log_sales_ttm'] = historical_value['total_operating_revenue'].map(
lambda x: math.log(abs(x)))
historical_value = historical_value.drop(columns=['total_operating_revenue'], axis=1)
factor_scale_value = pd.merge(factor_scale_value, historical_value, on="symbol")
return factor_scale_value
def ltlqa(tp_historical_value, factor_scale_value):
"""
:param tp_historical_value:
:param factor_scale_value:
:return:
"""
columns_lists = ['symbol', 'total_assets']
historical_value = tp_historical_value.loc[:, columns_lists]
historical_value['log_total_last_qua_assets'] = historical_value['total_assets'].map(
lambda x: math.log(abs(x)))
historical_value = historical_value.drop(columns=['total_assets'], axis=1)
factor_scale_value = pd.merge(factor_scale_value, historical_value, on="symbol")
return factor_scale_value
@app.task(ignore_result=True)
def calculate(**kwargs):
"""
:param trade_date:
:return:
"""
fb = FactorBase('factor_scale_value')
print(kwargs)
factor_name = kwargs['factor_name']
session = kwargs['session']
trade_date = kwargs['trade_date']
content = cache_data.get_cache(session, factor_name)
total_data = json_normalize(json.loads(content))
print(len(total_data))
factor_scale_value = lcap(total_data, total_data)
factor_scale_value = lflo(factor_scale_value, factor_scale_value)
factor_scale_value = nlsize(factor_scale_value, factor_scale_value)
factor_scale_value = lst(factor_scale_value, factor_scale_value)
factor_scale_value = ltlqa(factor_scale_value, factor_scale_value)
factor_scale_value.rename(columns={'market_cap': 'mkt_value',
'circulating_market_cap': 'cir_mkt_value',
'total_operating_revenue': 'sales_ttm'},
inplace=True)
factor_scale_value = factor_scale_value[['symbol',
'mkt_value',
'cir_mkt_value',
'sales_ttm',
'total_assets',
'log_of_mkt_value',
'log_of_neg_mkt_value',
'nl_size',
'log_total_last_qua_assets',
'log_sales_ttm'
]]
factor_scale_value['id'] = factor_scale_value['symbol'] + str(trade_date)
factor_scale_value['trade_date'] = str(trade_date)
# super(HistoricalValue, self)._storage_data(factor_scale_value, trade_date)
fb._storage_data(factor_scale_value, trade_date)
# calculate(factor_name='scale20180202', trade_date=20180202, session='1562054216473773')
# calculate(factor_name='scale20180202', trade_date=20180202, session='1562901137622956')
# coding=utf-8
from pandas.io.json import json_normalize, json
import numpy as np
from scipy import stats
import sys
sys.path.append("..")
from factor import app
from factor.factor_base import FactorBase
from ultron.cluster.invoke.cache_data import cache_data
from vision.fm.signletion_engine import *
_columns_list = ['variance_20d', 'variance_60d', 'variance_120d', 'kurtosis_20d',
'kurtosis_60d', 'kurtosis_120d', 'alpha_20d', 'alpha_60d',
'alpha_120d', 'beta_20d', 'beta_60d', 'beta_120d', 'sharp_20d',
'sharp_60d', 'sharp_120d', 'tr_20d', 'tr_60d',
'tr_120d', 'ir_20d', 'ir_60d', 'ir_120d',
'gain_variance_20d', 'gain_variance_60d', 'gain_variance_120d', 'loss_variance_20d',
'loss_variance_60d',
'loss_variance_120d', 'gain_loss_variance_ratio_20d', 'gain_loss_variance_ratio_60d',
'gain_loss_variance_ratio_120d',
'dastd_252d', 'ddnsr_12m', 'ddncr_12m', 'dvrat']
rf = 0.04
dayrf = rf / 252
trade_date = None
tp_index = None
golbal_obj = {
'rf': 0.04,
'dayrf': dayrf,
'tp_index': tp_index,
'trade_date': trade_date
}
def get_index_dict():
tp_index_dict = {}
tp_index = golbal_obj['tp_index']
index_sets = list(set(tp_index.index))
for index in index_sets:
if len(tp_index[tp_index.index == index]) < 3:
continue
tp_index_dict[index] = tp_index.loc[index].sort_values(by="trade_date", ascending=True)
return tp_index_dict
def variancex(tp_price_flow, x):
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
variance = np.var(price.close.pct_change().iloc[-x:]) * 250
return variance
def variance_20d(tp_price_flow):
return variancex(tp_price_flow, 20)
def variance_60d(tp_price_flow):
return variancex(tp_price_flow, 60)
def variance_120d(tp_price_flow):
return variancex(tp_price_flow, 120)
def kurtosis_xd(tp_price_flow, x):
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
kurt = stats.kurtosis(price.close.pct_change().iloc[-x:])
return kurt
def kurtosis_20d(tp_price_flow):
return kurtosis_xd(tp_price_flow, 20)
def kurtosis_60d(tp_price_flow):
return kurtosis_xd(tp_price_flow, 60)
def kurtosis_120d(tp_price_flow):
return kurtosis_xd(tp_price_flow, 120)
def alpha_xd(tp_price_flow, x):
tp_index_dict = get_index_dict()
columns_list = ["trade_date", "close"]
price = tp_price_flow.loc[:, columns_list].reset_index()
price_close_len = len(price.close.pct_change())
if (x >= price_close_len):
x = price_close_len - 1
index = tp_index_dict["000300.XSHG"].loc[:, columns_list].reset_index()
# index["trade_date"] = index['trade_date'].apply(lambda x: int(x[0:4] + x[5:7] + x[8:]))
total_df = pd.merge(index, price, on="trade_date")
total_df["close_x"] = total_df["close_x"].pct_change()
total_df["close_y"] = total_df["close_y"].pct_change()
total_df = total_df[-x:]
beta = beta_xd(tp_price_flow, x)
er = np.mean(total_df["close_y"])
total_df["close_x"] = total_df["close_x"].map(lambda x: x - dayrf)
erm_rf = np.mean(total_df["close_x"])
alpha = (er - dayrf) - beta * erm_rf
return alpha
def alpha_20d(tp_price_flow):
return alpha_xd(tp_price_flow, 20)
def alpha_60d(tp_price_flow):
return alpha_xd(tp_price_flow, 60)
def alpha_120d(tp_price_flow):
return alpha_xd(tp_price_flow, 120)
def beta_xd(tp_price_flow, x):
tp_index_dict = get_index_dict()
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
price = price[price.close > 0]
index = tp_index_dict["000300.XSHG"].loc[:, columns_list]
price_close_len = len(price.close.pct_change())
if (x >= price_close_len):
x = price_close_len - 1
cov_mat = np.cov(price.close.pct_change().iloc[-x:], index.close.pct_change().iloc[-x:])
beta = cov_mat[0][1] / cov_mat[1][1]
return beta
def beta_20d(tp_price_flow):
return beta_xd(tp_price_flow, 20)
def beta_60d(tp_price_flow):
return beta_xd(tp_price_flow, 60)
def beta_120d(tp_price_flow):
return beta_xd(tp_price_flow, 120)
def sharp_xd(tp_price_flow, x):
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
avg = np.mean(price.close.iloc[-x:].pct_change())
stdValue = np.std(price.close.iloc[-x:])
if (stdValue > 0.001 or stdValue < -0.001):
result = (avg - rf) / stdValue
else:
result = 0.0
return result
def sharp_20d(tp_price_flow):
return sharp_xd(tp_price_flow, 20)
def sharp_60d(tp_price_flow):
return sharp_xd(tp_price_flow, 60)
def sharp_120d(tp_price_flow):
return sharp_xd(tp_price_flow, 120)
def tr_xd(tp_price_flow, x):
# tp_index_dict = get_index_dict()
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
avg = np.mean(price.close.iloc[-x:])
bValue = beta_xd(tp_price_flow, x)
result = (avg - rf) / bValue
return result
def tr_20d(tp_price_flow):
return tr_xd(tp_price_flow, 20)
def tr_60d(tp_price_flow):
return tr_xd(tp_price_flow, 60)
def tr_120d(tp_price_flow):
return tr_xd(tp_price_flow, 120)
def ir_xd(tp_price_flow, x):
tp_index_dict = get_index_dict()
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
price_close_len = len(price.close.pct_change())
if (x >= price_close_len):
x = price_close_len - 1
index = tp_index_dict["000300.XSHG"].loc[:, columns_list]
diff = price.close.pct_change().values[-x:] - index.close.pct_change().values[-x:]
diff_mean = np.mean(diff[-x:])
diff_std = np.std(diff[-x:])
if (diff_std > 0.001 or diff_std < -0.001):
ir = diff_mean / diff_std
else:
ir = 0.0
return ir
def ir_20d(tp_price_flow):
return ir_xd(tp_price_flow, 20)
def ir_60d(tp_price_flow):
return ir_xd(tp_price_flow, 60)
def ir_120d(tp_price_flow):
return ir_xd(tp_price_flow, 120)
def gain_variance_xd(tp_price_flow, x):
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
price = price[price.close > 0]
close_series = price.close.pct_change()
close_series = close_series[close_series >= 0]
close_series2 = np.power(close_series, 2)
close_mean = np.mean(close_series.values[-x:])
close_mean2 = np.mean(close_series2.values[-x:])
result = (close_mean2 - close_mean * close_mean) * 250
return result
def gain_variance_20d(tp_price_flow):
return gain_variance_xd(tp_price_flow, 20)
def gain_variance_60d(tp_price_flow):
return gain_variance_xd(tp_price_flow, 60)
def gain_variance_120d(tp_price_flow):
return gain_variance_xd(tp_price_flow, 120)
def loss_variance_xd(tp_price_flow, x):
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
close_series = price.close.pct_change()
close_series = close_series[close_series <= 0]
close_series2 = np.power(close_series, 2)
close_mean = np.mean(close_series.values[-x:])
close_mean2 = np.mean(close_series2.values[-x:])
result = (close_mean2 - close_mean * close_mean) * 250
return result
def loss_variance_20d(tp_price_flow):
return loss_variance_xd(tp_price_flow, 20)
def loss_variance_60d(tp_price_flow):
return loss_variance_xd(tp_price_flow, 60)
def loss_variance_120d(tp_price_flow):
return loss_variance_xd(tp_price_flow, 120)
def gain_loss_variance_ratio_20d(tp_price_flow):
return gain_variance_20d(tp_price_flow) / loss_variance_20d(tp_price_flow)
def gain_loss_variance_ratio_60d(tp_price_flow):
return gain_variance_60d(tp_price_flow) / loss_variance_60d(tp_price_flow)
def gain_loss_variance_ratio_120d(tp_price_flow):
return gain_variance_120d(tp_price_flow) / loss_variance_120d(tp_price_flow)
def dastd_252d(tp_price_flow):
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
price['close_pct_change'] = price.close.pct_change()
price['close_pct_change'] = price['close_pct_change'].map(lambda x: x - dayrf)
close_pct_change = price['close_pct_change'].iloc[-252:]
std_value = np.std(close_pct_change)
return std_value
def ddnsr_12m(tp_price_flow):
tp_index_dict = get_index_dict()
columns_list = ["trade_date", "close"]
price = tp_price_flow.loc[:, columns_list].reset_index()
index = tp_index_dict["000300.XSHG"].loc[:, columns_list].reset_index()
# index["trade_date"] = index['trade_date'].apply(lambda x: int(x[0:4] + x[5:7] + x[8:]))
total_df = pd.merge(index, price, on="trade_date")
total_df["close_x"] = total_df["close_x"].pct_change()
total_df["close_y"] = total_df["close_y"].pct_change()
total_df = total_df[-252:]
total_df = total_df[total_df['close_x'] < 0]
stdr = np.std(total_df["close_y"])
stdrm = np.std(total_df["close_x"])
if (stdrm > 0.001 or stdrm < -0.001):
result = stdr / stdrm
else:
result = 0.0
return result
def ddncr_12m(tp_price_flow):
tp_index_dict = get_index_dict()
columns_list = ["trade_date", "close"]
price = tp_price_flow.loc[:, columns_list].reset_index()
index = tp_index_dict["000300.XSHG"].loc[:, columns_list].reset_index()
# index["trade_date"] = index['trade_date'].apply(lambda x: int(x[0:4] + x[5:7] + x[8:]))
total_df = pd.merge(index, price, on="trade_date")
total_df["close_x"] = total_df["close_x"].pct_change()
total_df["close_y"] = total_df["close_y"].pct_change()
total_df = total_df[-252:]
total_df = total_df[total_df['close_x'] < 0]
stdr = np.std(total_df["close_y"])
stdrm = np.std(total_df["close_x"])
cov_mat = np.cov(total_df["close_y"], total_df["close_x"])
stdValue = stdr * stdrm
if (stdValue > 0.001 or stdValue < -0.001):
result = cov_mat[0][1] / stdValue
else:
result = 0.0
return result
def dvrat(tp_price_flow):
q = 10
t = 252 * 2
m = q * (t - q + 1) * (1 - q / t)
columns_list = ["close"]
price = tp_price_flow.loc[:, columns_list]
price = price[price.close > 0]
price['close_pct_change'] = price.close.pct_change()
close_pct_change = price['close_pct_change'].iloc[-t:]
count = 0
temp_list = []
temp_items = []
temp_e2s = []
for item in close_pct_change:
if (str(item) == "nan"):
continue
temp_e2s.append(item * item)
temp_items.append(item)
count = count + 1
if (count >= 10):
value = np.sum(temp_items)
if (str(value) != "nan"):
temp_list.append(value * value)
temp_items = temp_items[1:]
e2q = np.sum(temp_list) / m
e2s = np.sum(temp_e2s) / (t - 1)
result = e2q / e2s - 1
return result
def symbol_calcu(tp_price):
tp_price.set_index('symbol', inplace=True)
tp_price = tp_price[tp_price['close'] > 0]
price_return = {}
for func_name in _columns_list:
# func = getattr(func_name, None)
func = globals().get(func_name)
if (func):
price_return[func_name] = func(tp_price)
price_return['symbol'] = tp_price.index[0] # .decode("unicode_escape").encode("utf8")
return price_return
def get_index_history_price_local(universe, end_date, count, entities=None):
global index_daily_price_sets
if (index_daily_price_sets is None):
index_daily_price_sets = get_index_history_price(universe, end_date, 450, entities)
temp_price_sets = index_daily_price_sets[index_daily_price_sets.trade_date <= end_date]
return temp_price_sets[:count]
@app.task(ignore_result=True)
def calculate(**kwargs):
fb = FactorBase('factor_volatility_value')
print(kwargs)
factor_name = kwargs['factor_name']
session = kwargs['session']
trade_date = kwargs['trade_date']
golbal_obj['trade_date'] = trade_date
content = cache_data.get_cache(session, factor_name)
data = json.loads(content)
total_data = json_normalize(json.loads(data['total_data']))
index_daily_price_sets = json_normalize(json.loads(data['index_daily_price_sets']))
index_daily_price_sets.set_index("symbol", inplace=True)
golbal_obj['tp_index'] = index_daily_price_sets
# content_a = cache_data.get_cache(session, factor_name+'_a')
# content_b = cache_data.get_cache(session, factor_name+'_b')
# total_data = json_normalize(json.loads(content_a))
# golbal_obj['index_daily_price_sets'] = json_normalize(json.loads(content_b))
print(len(total_data))
print(len(golbal_obj['tp_index']))
total_data.sort_values(by=['symbol', 'trade_date'], ascending=True, inplace=True)
symbol_sets = list(set(total_data['symbol']))
symbol_sets.sort()
factor_list = []
for symbol in symbol_sets:
tp_price = total_data[total_data['symbol'] == symbol]
if (tp_price.iloc[-1]['close'] != 0 and len(tp_price) > 120):
factor_list.append(symbol_calcu(tp_price))
factor_volatility_value = pd.DataFrame(factor_list)
factor_volatility_value['id'] = factor_volatility_value['symbol'] + str(trade_date)
factor_volatility_value['trade_date'] = str(trade_date)
# factor_price_momentum.set_index('symbol', inplace=True)
fb._storage_data(factor_volatility_value, trade_date)
# calculate(factor_name='volatility20180202', trade_date=20180202, session='1562216985610666')
# calculate(factor_name='volatility20180202', trade_date=20180202, session='1562224570816022')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment