Commit 5a11cfb6 authored by 李煜's avatar 李煜

add Eaning Equality factor

parent d42da92f
# FactorCalculate # FactorCalculate
- /factor
该文件目录下保存的是因子计算的task文件, 以及每个task所依赖的因子计算文件。
- client.py
...@@ -5,13 +5,15 @@ import time ...@@ -5,13 +5,15 @@ import time
from pandas.io.json import json_normalize from pandas.io.json import json_normalize
from datetime import datetime, timedelta from datetime import datetime, timedelta
from factor import factor_growth, historical_value, factor_per_share_indicators from factor import factor_growth, historical_value, factor_per_share_indicators, factor_cash_flow, factor_contrarian, factor_earning
from factor.ttm_fundamental import * from factor.ttm_fundamental import *
from vision.file_unit.balance import Balance from vision.file_unit.balance import Balance
from vision.file_unit.cash_flow import CashFlow from vision.file_unit.cash_flow import CashFlow
from vision.file_unit.income import Income from vision.file_unit.income import Income
from vision.file_unit.valuation import Valuation from vision.file_unit.valuation import Valuation
from vision.file_unit.industry import Industry from vision.file_unit.industry import Industry
from vision.file_unit.indicator import Indicator
from factor.utillities.trade_date import TradeDate from factor.utillities.trade_date import TradeDate
from ultron.cluster.invoke.cache_data import cache_data from ultron.cluster.invoke.cache_data import cache_data
from ultron.utilities.short_uuid import unique_machine, decode from ultron.utilities.short_uuid import unique_machine, decode
...@@ -337,14 +339,181 @@ def get_basic_scale_data(trade_date): ...@@ -337,14 +339,181 @@ def get_basic_scale_data(trade_date):
return valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets return valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets
def get_basic_cash_flow(trade_date):
"""
获取cash flow所需要的因子
:param trade_date:
:return:
"""
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow.__name__,
[CashFlow.symbol, CashFlow.net_operate_cash_flow,
CashFlow.goods_sale_and_service_render_cash])
, [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income.__name__,
[Income.symbol, Income.operating_revenue,
Income.total_operating_cost,
Income.total_operating_revenue]), [trade_date]))
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation.__name__,
[Valuation.symbol, Valuation.market_cap,
Valuation.circulating_market_cap]), [trade_date]))
# 合并
tp_cash_flow = pd.merge(cash_flow_sets, income_sets, on="symbol")
tp_cash_flow = tp_cash_flow[-tp_cash_flow.duplicated()]
ttm_factors = {Balance.__name__: [Balance.symbol, Balance.total_liability,
Balance.shortterm_loan,
Balance.longterm_loan,
Balance.total_current_liability,
Balance.net_liability,
Balance.total_current_assets,
Balance.interest_bearing_liability,
Balance.total_assets],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.net_operate_cash_flow,
CashFlow.goods_sale_and_service_render_cash,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol, Income.operating_revenue, Income.total_operating_revenue,
Income.total_operating_cost, Income.net_profit,
Income.np_parent_company_owners]
}
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factor_sets = ttm_factor_sets[-ttm_factor_sets.duplicated()]
# 合并
ttm_factor_sets = pd.merge(ttm_factor_sets, valuation_sets, on="symbol")
return tp_cash_flow, ttm_factor_sets
def get_basic_constrain(trade_date):
# 读取当前因子
# 资产负债
balance_sets = get_fundamentals(add_filter_trade(query(Balance._name_,
[Balance.symbol,
Balance.total_current_liability,
Balance.total_liability,
Balance.total_assets,
Balance.total_current_assets,
Balance.fixed_assets,
Balance.interest_bearing_liability
]), [trade_date]))
balance_sets = balance_sets[-balance_sets.duplicated()]
# TTM计算
ttm_factors = {Income._name_: [Income.symbol, Income.operating_cost,
Income.operating_revenue,
Income.operating_tax_surcharges,
Income.total_operating_revenue,
Income.total_operating_cost,
Income.financial_expense,
Income.sale_expense,
Income.administration_expense
]}
ttm_factors_sets = get_ttm_fundamental([], ttm_factors, trade_date)
ttm_factors_sets = ttm_factors_sets[-ttm_factors_sets.duplicated()]
return balance_sets, ttm_factors_sets
def get_basic_earning(trade_date):
# 读取目前涉及到的因子
# 当期数据
# pdb.set_trace()
balance_sets = get_fundamentals(add_filter_trade(query(Balance.__name__,
[Balance.symbol,
Balance.equities_parent_company_owners])
, [trade_date]))
cash_flow_sets = get_fundamentals(add_filter_trade(query(CashFlow.__name__,
[CashFlow.symbol,
CashFlow.goods_sale_and_service_render_cash])
, [trade_date]))
income_sets = get_fundamentals(add_filter_trade(query(Income.__name__,
[Income.symbol,
Income.total_operating_revenue,
Income.total_operating_cost,
Income.invest_income_associates,
Income.non_operating_revenue,
Income.non_operating_expense,
Income.total_profit,
Income.net_profit,
Income.np_parent_company_owners
])
, [trade_date]))
valuation_sets = get_fundamentals(add_filter_trade(query(Valuation.__name__,
[Valuation.symbol,
Valuation.circulating_market_cap])
, [trade_date]))
indicator_sets = get_fundamentals(add_filter_trade(query(Indicator.__name__,
[Indicator.symbol,
Indicator.adjusted_profit])
, [trade_date]))
# 合并
tp_earning = pd.merge(cash_flow_sets, balance_sets, on="symbol")
tp_earning = pd.merge(tp_earning, income_sets, on="symbol")
tp_earning = pd.merge(tp_earning, valuation_sets, on="symbol")
tp_earning = pd.merge(tp_earning, indicator_sets, on="symbol")
tp_earning = tp_earning[-tp_earning.duplicated()]
# tp_earning.set_index('symbol', inplace=True)
# TTM数据
ttm_factors = {Balance.__name__: [Balance.symbol,
Balance.total_assets,
Balance.equities_parent_company_owners,
Balance.total_owner_equities
],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol,
Income.total_operating_revenue,
Income.operating_revenue,
Income.interest_income,
Income.total_operating_cost,
Income.operating_cost,
Income.financial_expense,
Income.invest_income_associates,
Income.operating_profit,
Income.non_operating_revenue,
Income.non_operating_expense,
Income.total_profit,
Income.net_profit,
Income.np_parent_company_owners
]
}
ttm_earning = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_earning = ttm_earning[-ttm_earning.duplicated()]
## 5年TTM数据
ttm_factors = {Balance.__name__: [Balance.symbol,
Balance.total_assets,
Balance.total_owner_equities
],
CashFlow.__name__: [CashFlow.symbol,
CashFlow.cash_and_equivalents_at_end],
Income.__name__: [Income.symbol,
Income.net_profit,
]
}
# 通过cache_data.set_cache, 会使得index的name丢失
ttm_earning_5y = get_ttm_fundamental([], ttm_factors, trade_date, year=5).reset_index()
ttm_earning_5y = ttm_earning_5y[-ttm_earning_5y.duplicated()]
return tp_earning, ttm_earning_5y, ttm_earning
if __name__ == '__main__': if __name__ == '__main__':
session1 = str('156099868869460811') session1 = str('156099868869460811')
session2 = str('156099868869460812') session2 = str('156099868869460812')
# session3 = str('156099868869460813') # session3 = str('156099868869460813')
session4 = str('156099868869460814')
session5 = str('156099868869460815')
session6 = str('156099868869460816')
# session = str(int(time.time() * 1000000 + datetime.now().microsecond)) # session = str(int(time.time() * 1000000 + datetime.now().microsecond))
start_date = 20100101 start_date = 20100101
end_date = 20190101 end_date = 20190101
count = 10 count = 5
rebuild = True rebuild = True
_trade_date = TradeDate() _trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count) trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
...@@ -352,21 +521,34 @@ if __name__ == '__main__': ...@@ -352,21 +521,34 @@ if __name__ == '__main__':
# growth # growth
growth = factor_growth.Growth('factor_growth') growth = factor_growth.Growth('factor_growth')
growth.create_dest_tables() growth.create_dest_tables()
# historical value # historical value
history_value = historical_value.HistoricalValue('factor_historical_value') history_value = historical_value.HistoricalValue('factor_historical_value')
history_value.create_dest_tables() history_value.create_dest_tables()
# scale # scale
# scale = factor_per_share_indicators.PerShareIndicators('factor_scale') # scale = factor_per_share_indicators.PerShareIndicators('factor_scale')
# scale.create_dest_tables() # scale.create_dest_tables()
# cash flow
cash_flow = factor_cash_flow.FactorCashFlow('factor_cash_flow')
cash_flow.create_dest_tables()
# constrain
constrain = factor_contrarian.FactorContrarian('factor_constrain')
constrain.create_dest_tables()
# earning
earning = factor_earning.FactorEarning('factor_earning')
earning.create_dest_tables()
for date_index in trade_date_sets: for date_index in trade_date_sets:
# factor_growth # factor_growth
start_time = time.time() start_time = time.time()
ttm_factor_sets, balance_sets = get_basic_growth_data(date_index) ttm_factor_sets, balance_sets = get_basic_growth_data(date_index)
growth_sets = pd.merge(ttm_factor_sets, balance_sets, on='symbol') growth_sets = pd.merge(ttm_factor_sets, balance_sets, on='symbol')
cache_data.set_cache(session1, date_index, growth_sets.to_json(orient='records')) cache_data.set_cache(session1, date_index, growth_sets.to_json(orient='records'))
factor_growth.factor_calculate(date_index=date_index, factor_growth.factor_calculate(date_index=date_index, session=session1)
session=session1)
time1 = time.time() time1 = time.time()
print('growth_cal_time:{}'.format(time1 - start_time)) print('growth_cal_time:{}'.format(time1 - start_time))
...@@ -376,18 +558,41 @@ if __name__ == '__main__': ...@@ -376,18 +558,41 @@ if __name__ == '__main__':
valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol') valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol') valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol')
cache_data.set_cache(session2, date_index, valuation_sets.to_json(orient='records')) cache_data.set_cache(session2, date_index, valuation_sets.to_json(orient='records'))
historical_value.factor_calculate(date_index=date_index, historical_value.factor_calculate(date_index=date_index, session=session2)
session=session2) time2 = time.time()
print('history_cal_time:{}'.format(time.time() - time1)) print('history_cal_time:{}'.format(time2 - time1))
# scale # # scale
# valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets = get_basic_scale_data(date_index) # valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets = get_basic_scale_data(date_index)
# valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol') # valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol')
# valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol') # valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol')
# valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol') # valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol')
# valuation_sets = pd.merge(valuation_sets, balance_sets, on='symbol') # valuation_sets = pd.merge(valuation_sets, balance_sets, on='symbol')
# cache_data.set_cache(session3, date_index, valuation_sets.to_json(orient='records')) # cache_data.set_cache(session3, date_index, valuation_sets.to_json(orient='records'))
# factor_per_share_indicators.factor_calculate(date_index=date_index, # factor_per_share_indicators.factor_calculate(date_index=date_index, session=session3)
# session=session3)
# cash flow
tp_cash_flow, ttm_factor_sets = get_basic_cash_flow(date_index)
cache_data.set_cache(session4 + "1", date_index, tp_cash_flow.to_json(orient='records'))
cache_data.set_cache(session4 + "2", date_index, ttm_factor_sets.to_json(orient='records'))
factor_cash_flow.factor_calculate(date_index=date_index, session=session4)
time4 = time.time()
print('cash_flow_cal_time:{}'.format(time4 - time2))
# constrain
balance_sets, ttm_factors_sets = get_basic_constrain(date_index)
constrain_sets = pd.merge(balance_sets, ttm_factors_sets, on='symbol')
cache_data.set_cache(session5, date_index, constrain_sets.to_json(orient='records'))
factor_contrarian.factor_calculate(date_index=date_index, session=session5)
time5 = time.time()
print('constrain_cal_time:{}'.format(time5 - time4))
# earning
tp_earning, ttm_earning_5y, ttm_earning = get_basic_earning(date_index)
cache_data.set_cache(session6 + "1", date_index, tp_earning.to_json(orient='records'))
cache_data.set_cache(session6 + "2", date_index, ttm_earning_5y.to_json(orient='records'))
cache_data.set_cache(session6 + "3", date_index, ttm_earning.to_json(orient='records'))
factor_earning.factor_calculate(date_index=date_index, session=session6)
time6 = time.time()
print('earning_cal_time:{}'.format(time6 - time5))
print('---------------------->') print('---------------------->')
...@@ -340,7 +340,7 @@ def get_basic_scale_data(trade_date): ...@@ -340,7 +340,7 @@ def get_basic_scale_data(trade_date):
if __name__ == '__main__': if __name__ == '__main__':
session1 = str('156099868869460811') session1 = str('156099868869460811')
session2 = str('156099868869460812') session2 = str('156099868869460812')
session3 = str('156099868869460813') # session3 = str('156099868869460813')
# session = str(int(time.time() * 1000000 + datetime.now().microsecond)) # session = str(int(time.time() * 1000000 + datetime.now().microsecond))
start_date = 20100101 start_date = 20100101
end_date = 20190101 end_date = 20190101
...@@ -349,14 +349,15 @@ if __name__ == '__main__': ...@@ -349,14 +349,15 @@ if __name__ == '__main__':
_trade_date = TradeDate() _trade_date = TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count) trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
if rebuild is True: if rebuild is True:
# growth
growth = factor_growth.Growth('factor_growth') growth = factor_growth.Growth('factor_growth')
growth.create_dest_tables() growth.create_dest_tables()
# historical value
history_value = historical_value.HistoricalValue('factor_historical_value') history_value = historical_value.HistoricalValue('factor_historical_value')
history_value.create_dest_tables() history_value.create_dest_tables()
# scale
scale = factor_per_share_indicators.PerShareIndicators('factor_scale') # scale = factor_per_share_indicators.PerShareIndicators('factor_scale')
scale.create_dest_tables() # scale.create_dest_tables()
for date_index in trade_date_sets: for date_index in trade_date_sets:
# factor_growth # factor_growth
...@@ -380,13 +381,13 @@ if __name__ == '__main__': ...@@ -380,13 +381,13 @@ if __name__ == '__main__':
print('history_cal_time:{}'.format(time.time() - time1)) print('history_cal_time:{}'.format(time.time() - time1))
# scale # scale
valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets = get_basic_scale_data(date_index) # valuation_sets, ttm_factor_sets, cash_flow_sets, income_sets, balance_sets = get_basic_scale_data(date_index)
valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol') # valuation_sets = pd.merge(valuation_sets, income_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol') # valuation_sets = pd.merge(valuation_sets, ttm_factor_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol') # valuation_sets = pd.merge(valuation_sets, cash_flow_sets, on='symbol')
valuation_sets = pd.merge(valuation_sets, balance_sets, on='symbol') # valuation_sets = pd.merge(valuation_sets, balance_sets, on='symbol')
cache_data.set_cache(session3, date_index, valuation_sets.to_json(orient='records')) # cache_data.set_cache(session3, date_index, valuation_sets.to_json(orient='records'))
factor_per_share_indicators.factor_calculate(date_index=date_index, # factor_per_share_indicators.factor_calculate(date_index=date_index,
session=session3) # session=session3)
print('---------------------->') print('---------------------->')
...@@ -4,9 +4,9 @@ ...@@ -4,9 +4,9 @@
""" """
@version: ?? @version: ??
@author: li @author: li
@file: __init__.py.py @file: __init__.py
@time: 2019-06-30 19:04 @time: 2019-06-30 19:04
""" """
from ultron.cluster.invoke.app_engine import create_app from ultron.cluster.invoke.app_engine import create_app
app = create_app('factor', ['factor.factor_growth', 'factor.historical_value']) app = create_app('factor', ['factor.factor_growth', 'factor.historical_value', 'factor.factor_cash_flow'])
#!/usr/bin/env python
# coding=utf-8
import numpy as np
import json
from factor import app
from pandas.io.json import json_normalize
from factor.ttm_fundamental import *
from factor.factor_base import FactorBase
from vision.fm.signletion_engine import *
from vision.utillities.calc_tools import CalcTools
from ultron.cluster.invoke.cache_data import cache_data
class FactorCashFlow(FactorBase):
def __init__(self, name):
super(FactorCashFlow, self).__init__(name)
# 构建因子表
def create_dest_tables(self):
drop_sql = """drop table if exists `{0}`""".format(self._name)
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`nocf_to_t_liability_ttm` decimal(19,4),
`nocf_to_interest_bear_debt_ttm` decimal(19,4),
`nocf_to_net_debt_ttm` decimal(19,4),
`sale_service_cash_to_or_ttm` decimal(19,4),
`cash_rate_of_sales_ttm` decimal(19,4),
`nocf_to_operating_ni_ttm` decimal(19,4),
`oper_cash_in_to_current_liability_ttm` decimal(19,4),
`cash_to_current_liability_ttm` decimal(19,4),
`cfo_to_ev_ttm` decimal(19,4),
`acca_ttm` decimal(19,4),
`net_profit_cash_cover_ttm` decimal(19,4),
`oper_cash_in_to_asset_ttm` decimal(19,4),
`sales_service_cash_to_or_latest` decimal(19,4),
`cash_rate_of_sales_latest` decimal(19,4),
`nocf_to_operating_ni_latest` decimal(19,4),
PRIMARY KEY(`id`,`trade_date`,`symbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self._name)
super(FactorCashFlow, self)._create_tables(create_sql, drop_sql)
# 经营活动净现金流(TTM)/负债(TTM)
def nocf_to_t_liability_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'total_liability']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_t_liability_ttm'] = np.where(
CalcTools.is_zero(cash_flow.total_liability.values), 0,
cash_flow.net_operate_cash_flow.values / cash_flow.total_liability.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动净现金流(TTM)/带息负债(TTM)
def nocf_to_interest_bear_debt_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'total_liability', 'interest_bearing_liability']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_interest_bear_debt_ttm'] = np.where(
CalcTools.is_zero(cash_flow.interest_bearing_liability.values), 0,
cash_flow.net_operate_cash_flow.values / cash_flow.interest_bearing_liability.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动净现金流(TTM)/净负债(TTM)
def nocf_to_net_debt_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'net_liability']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_net_debt_ttm'] = np.where(CalcTools.is_zero(cash_flow.net_liability.values), 0,
cash_flow.net_operate_cash_flow.values / cash_flow.net_liability.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 销售商品和提供劳务收到的现金(TTM)/营业收入(TTM)
def sale_service_cash_to_or_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['goods_sale_and_service_render_cash', 'operating_revenue']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['sale_service_cash_to_or_ttm'] = np.where(
CalcTools.is_zero(cash_flow.operating_revenue.values), 0,
cash_flow.goods_sale_and_service_render_cash.values / cash_flow.operating_revenue.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/营业收入(TTM)
def cash_rate_of_sales_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'operating_revenue']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['cash_rate_of_sales_ttm'] = np.where(
CalcTools.is_zero(cash_flow.operating_revenue.values), 0,
cash_flow.net_operate_cash_flow.values / cash_flow.operating_revenue.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/(营业总收入(TTM)-营业总成本(TTM))
def nocf_to_operating_ni_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'total_operating_revenue', 'total_operating_cost']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_operating_ni_ttm'] = np.where(
CalcTools.is_zero(cash_flow.total_operating_revenue.values - cash_flow.total_operating_cost.values),
0, cash_flow.net_operate_cash_flow.values / (
cash_flow.total_operating_revenue.values - cash_flow.total_operating_cost.values))
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/流动负债(TTM)
def oper_cash_in_to_current_liability_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'total_current_liability']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['oper_cash_in_to_current_liability_ttm'] = np.where(
CalcTools.is_zero(cash_flow.total_current_liability.values), 0,
cash_flow.net_operate_cash_flow.values / cash_flow.total_current_liability.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 期末现金及现金等价物余额(TTM)/流动负债(TTM)
def cash_to_current_liability_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['cash_and_equivalents_at_end', 'total_current_assets']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['cash_to_current_liability_ttm'] = np.where(CalcTools.is_zero(cash_flow.total_current_assets.values),
0,
cash_flow.cash_and_equivalents_at_end.values / cash_flow.total_current_assets.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/(长期借款(TTM)+ 短期借款(TTM)+ 总市值 - 期末现金及现金等价物(TTM)
def cfo_to_ev_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'longterm_loan', 'shortterm_loan', 'market_cap',
'cash_and_equivalents_at_end']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['cfo_to_ev_ttm'] = np.where(CalcTools.is_zero(
cash_flow.longterm_loan.values + cash_flow.shortterm_loan.values + \
cash_flow.market_cap.values - cash_flow.cash_and_equivalents_at_end.values), 0,
cash_flow.net_operate_cash_flow.values / (cash_flow.longterm_loan.values + cash_flow.shortterm_loan.values + \
cash_flow.market_cap.values - cash_flow.cash_and_equivalents_at_end.values))
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# (经营活动产生的金流量净额(TTM) - 净利润(TTM)) /总资产(TTM)
def acca_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'net_profit', 'total_assets']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['acca_ttm'] = np.where(CalcTools.is_zero(cash_flow.total_assets.values), 0,
(cash_flow.net_operate_cash_flow.values - cash_flow.net_profit.values) / (
cash_flow.total_assets.values))
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/归属于母公司所有者的净利润(TTM)
def net_profit_cash_cover_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'np_parent_company_owners']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['net_profit_cash_cover_ttm'] = np.where(
CalcTools.is_zero(cash_flow.np_parent_company_owners.values), 0,
cash_flow.net_operate_cash_flow.values / cash_flow.np_parent_company_owners.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/总资产(TTM)
def oper_cash_in_to_asset_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'total_assets']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['oper_cash_in_to_asset_ttm'] = np.where(CalcTools.is_zero(cash_flow.total_assets.values),
0,
cash_flow.net_operate_cash_flow.values / cash_flow.total_assets.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 销售商品和提供劳务收到的现金(Latest)/营业收入(Latest)
def sales_service_cash_to_or_latest(self, tp_cash_flow, factor_cash_flow):
columns_list = ['goods_sale_and_service_render_cash', 'operating_revenue']
cash_flow = tp_cash_flow.loc[:, columns_list]
cash_flow['sales_service_cash_to_or_latest'] = np.where(CalcTools.is_zero(cash_flow.operating_revenue.values),
0,
cash_flow.goods_sale_and_service_render_cash.values / cash_flow.operating_revenue.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经验活动产生的现金流量净额 / 营业收入
def cash_rate_of_sales_latest(self, tp_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'operating_revenue']
cash_flow = tp_cash_flow.loc[:, columns_list]
cash_flow['cash_rate_of_sales_latest'] = np.where(CalcTools.is_zero(cash_flow.operating_revenue.values),
0,
cash_flow.net_operate_cash_flow.values / cash_flow.operating_revenue.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(Latest)/(营业总收入(Latest)-营业总成本(Latest))
def nocf_to_operating_ni_latest(self, tp_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'total_operating_revenue',
'total_operating_cost']
cash_flow = tp_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_operating_ni_latest'] = np.where(
CalcTools.is_zero((cash_flow.total_operating_revenue.values - cash_flow.total_operating_cost.values)), 0,
cash_flow.net_operate_cash_flow.values / (
cash_flow.total_operating_revenue.values - cash_flow.total_operating_cost.values))
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
def calculate(trade_date, cash_flow_dic, cash_flow): # 计算对应因子
print(trade_date)
# 读取目前涉及到的因子
tp_cash_flow = cash_flow_dic['tp_cash_flow']
ttm_factor_sets = cash_flow_dic['ttm_factor_sets']
factor_cash_flow = pd.DataFrame()
factor_cash_flow['symbol'] = tp_cash_flow.index
# tp_cash_flow.set_index('symbol', inplace=True)
# 非TTM计算
factor_cash_flow = cash_flow.nocf_to_operating_ni_latest(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.cash_rate_of_sales_latest(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.sales_service_cash_to_or_latest(tp_cash_flow, factor_cash_flow)
# TTM计算
factor_cash_flow = cash_flow.nocf_to_t_liability_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.nocf_to_interest_bear_debt_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.nocf_to_net_debt_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.sale_service_cash_to_or_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.cash_rate_of_sales_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.nocf_to_operating_ni_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.oper_cash_in_to_current_liability_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.cash_to_current_liability_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.cfo_to_ev_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.acca_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.net_profit_cash_cover_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow = cash_flow.oper_cash_in_to_asset_ttm(ttm_factor_sets, factor_cash_flow)
factor_cash_flow['id'] = factor_cash_flow['symbol'] + str(trade_date)
factor_cash_flow['trade_date'] = str(trade_date)
cash_flow._storage_data(factor_cash_flow, trade_date)
def do_update(self, start_date, end_date, count):
# 读取本地交易日
trade_date_sets = self._trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
self.calculate(trade_date)
print('----->')
@app.task()
def factor_calculate(**kwargs):
print("cash_flow_kwargs: {}".format(kwargs))
date_index = kwargs['date_index']
session = kwargs['session']
cash_flow = FactorCashFlow('factor_cash_flow') # 注意, 这里的name要与client中新建table时的name一致, 不然回报错
content1 = cache_data.get_cache(session + "1", date_index)
content2 = cache_data.get_cache(session + "2", date_index)
tp_cash_flow = json_normalize(json.loads(str(content1, encoding='utf8')))
ttm_factor_sets = json_normalize(json.loads(str(content2, encoding='utf8')))
tp_cash_flow.set_index('symbol', inplace=True)
ttm_factor_sets.set_index('symbol', inplace=True)
total_cash_flow_data = {'tp_cash_flow': tp_cash_flow, 'ttm_factor_sets': ttm_factor_sets}
print("len_total_cash_flow_data {}".format(len(total_cash_flow_data)))
calculate(date_index, total_cash_flow_data, cash_flow)
#!/usr/bin/env python
# coding=utf-8
import numpy as np
import json
from factor import app
from pandas.io.json import json_normalize
from factor.ttm_fundamental import *
from factor.factor_base import FactorBase
from vision.fm.signletion_engine import *
from vision.utillities.calc_tools import CalcTools
from ultron.cluster.invoke.cache_data import cache_data
class FactorCashFlow(FactorBase):
def __init__(self, name):
super(FactorCashFlow, self).__init__(name)
# 构建因子表
def create_dest_tables(self):
drop_sql = """drop table if exists `{0}`""".format(self._name)
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`nocf_to_t_liability_ttm` decimal(19,4),
`nocf_to_interest_bear_debt_ttm` decimal(19,4),
`nocf_to_net_debt_ttm` decimal(19,4),
`sale_service_cash_to_or_ttm` decimal(19,4),
`cash_rate_of_sales_ttm` decimal(19,4),
`nocf_to_operating_ni_ttm` decimal(19,4),
`oper_cash_in_to_current_liability_ttm` decimal(19,4),
`cash_to_current_liability_ttm` decimal(19,4),
`cfo_to_ev_ttm` decimal(19,4),
`acca_ttm` decimal(19,4),
`net_profit_cash_cover_ttm` decimal(19,4),
`oper_cash_in_to_asset_ttm` decimal(19,4),
`sales_service_cash_to_or_latest` decimal(19,4),
`cash_rate_of_sales_latest` decimal(19,4),
`nocf_to_operating_ni_latest` decimal(19,4),
PRIMARY KEY(`id`,`trade_date`,`symbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self._name)
super(FactorCashFlow, self)._create_tables(create_sql, drop_sql)
# 经营活动净现金流(TTM)/负债(TTM)
def nocf_to_t_liability_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'total_liability']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_t_liability_ttm'] = np.where(
CalcTools.is_zero(cash_flow.total_liability.values), 0,
cash_flow.net_operate_cash_flow_ttm.values / cash_flow.total_liability.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动净现金流(TTM)/带息负债(TTM)
def nocf_to_interest_bear_debt_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'total_liability', 'interest_bearing_liability']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_interest_bear_debt_ttm'] = np.where(
CalcTools.is_zero(cash_flow.interest_bearing_liability.values), 0,
cash_flow.net_operate_cash_flow_ttm.values / cash_flow.interest_bearing_liability.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动净现金流(TTM)/净负债(TTM)
def nocf_to_net_debt_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'net_liability']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_net_debt_ttm'] = np.where(CalcTools.is_zero(cash_flow.net_liability.values), 0,
cash_flow.net_operate_cash_flow_ttm.values / cash_flow.net_liability.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 销售商品和提供劳务收到的现金(TTM)/营业收入(TTM)
def sale_service_cash_to_or_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['goods_sale_and_service_render_cash_ttm', 'operating_revenue_ttm']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['sale_service_cash_to_or_ttm'] = np.where(
CalcTools.is_zero(cash_flow.operating_revenue_ttm.values), 0,
cash_flow.goods_sale_and_service_render_cash_ttm.values / cash_flow.operating_revenue_ttm.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/营业收入(TTM)
def cash_rate_of_sales_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'operating_revenue_ttm']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['cash_rate_of_sales_ttm'] = np.where(
CalcTools.is_zero(cash_flow.operating_revenue_ttm.values), 0,
cash_flow.net_operate_cash_flow_ttm.values / cash_flow.operating_revenue_ttm.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/(营业总收入(TTM)-营业总成本(TTM))
def nocf_to_operating_ni_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'total_operating_revenue_ttm', 'total_operating_cost_ttm']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_operating_ni_ttm'] = np.where(
CalcTools.is_zero(cash_flow.total_operating_revenue_ttm.values - cash_flow.total_operating_cost_ttm.values),
0, cash_flow.net_operate_cash_flow_ttm.values / (
cash_flow.total_operating_revenue_ttm.values - cash_flow.total_operating_cost_ttm.values))
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/流动负债(TTM)
def oper_cash_in_to_current_liability_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'total_current_liability']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['oper_cash_in_to_current_liability_ttm'] = np.where(
CalcTools.is_zero(cash_flow.total_current_liability.values), 0,
cash_flow.net_operate_cash_flow_ttm.values / cash_flow.total_current_liability.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 期末现金及现金等价物余额(TTM)/流动负债(TTM)
def cash_to_current_liability_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['cash_and_equivalents_at_end', 'total_current_assets']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['cash_to_current_liability_ttm'] = np.where(CalcTools.is_zero(cash_flow.total_current_assets.values),
0,
cash_flow.cash_and_equivalents_at_end.values / cash_flow.total_current_assets.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/(长期借款(TTM)+ 短期借款(TTM)+ 总市值 - 期末现金及现金等价物(TTM)
def cfo_to_ev_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'longterm_loan', 'shortterm_loan', 'market_cap',
'cash_and_equivalents_at_end']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['cfo_to_ev_ttm'] = np.where(CalcTools.is_zero(
cash_flow.longterm_loan.values + cash_flow.shortterm_loan.values + \
cash_flow.market_cap.values - cash_flow.cash_and_equivalents_at_end.values), 0,
cash_flow.net_operate_cash_flow_ttm.values / (cash_flow.longterm_loan.values + cash_flow.shortterm_loan.values + \
cash_flow.market_cap.values - cash_flow.cash_and_equivalents_at_end.values))
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# (经营活动产生的金流量净额(TTM) - 净利润(TTM)) /总资产(TTM)
def acca_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'net_profit', 'total_assets']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['acca_ttm'] = np.where(CalcTools.is_zero(cash_flow.total_assets.values), 0,
(cash_flow.net_operate_cash_flow_ttm.values - cash_flow.net_profit.values) / (
cash_flow.total_assets.values))
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/归属于母公司所有者的净利润(TTM)
def net_profit_cash_cover_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'np_parent_company_owners']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['net_profit_cash_cover_ttm'] = np.where(
CalcTools.is_zero(cash_flow.np_parent_company_owners.values), 0,
cash_flow.net_operate_cash_flow_ttm.values / cash_flow.np_parent_company_owners.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(TTM)/总资产(TTM)
def oper_cash_in_to_asset_ttm(self, ttm_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow_ttm', 'total_assets']
cash_flow = ttm_cash_flow.loc[:, columns_list]
cash_flow['oper_cash_in_to_asset_ttm'] = np.where(CalcTools.is_zero(cash_flow.total_assets.values),
0,
cash_flow.net_operate_cash_flow_ttm.values / cash_flow.total_assets.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 销售商品和提供劳务收到的现金(Latest)/营业收入(Latest)
def sales_service_cash_to_or_latest(self, tp_cash_flow, factor_cash_flow):
columns_list = ['goods_sale_and_service_render_cash', 'operating_revenue']
cash_flow = tp_cash_flow.loc[:, columns_list]
cash_flow['sales_service_cash_to_or_latest'] = np.where(CalcTools.is_zero(cash_flow.operating_revenue.values),
0,
cash_flow.goods_sale_and_service_render_cash.values / cash_flow.operating_revenue.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经验活动产生的现金流量净额 / 营业收入
def cash_rate_of_sales_latest(self, tp_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'operating_revenue']
cash_flow = tp_cash_flow.loc[:, columns_list]
cash_flow['cash_rate_of_sales_latest'] = np.where(CalcTools.is_zero(cash_flow.operating_revenue.values),
0,
cash_flow.net_operate_cash_flow.values / cash_flow.operating_revenue.values)
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
# 经营活动产生的现金流量净额(Latest)/(营业总收入(Latest)-营业总成本(Latest))
def nocf_to_operating_ni_latest(self, tp_cash_flow, factor_cash_flow):
columns_list = ['net_operate_cash_flow', 'total_operating_revenue',
'total_operating_cost']
cash_flow = tp_cash_flow.loc[:, columns_list]
cash_flow['nocf_to_operating_ni_latest'] = np.where(
CalcTools.is_zero((cash_flow.total_operating_revenue.values - cash_flow.total_operating_cost.values)), 0,
cash_flow.net_operate_cash_flow.values / (
cash_flow.total_operating_revenue.values - cash_flow.total_operating_cost.values))
cash_flow = cash_flow.drop(columns_list, axis=1)
factor_cash_flow = pd.merge(factor_cash_flow, cash_flow, on="symbol")
return factor_cash_flow
def calculate(trade_date, tp_cash_flow, cash_flow): # 计算对应因子
print(trade_date)
# 读取目前涉及到的因子
factor_cash_flow = pd.DataFrame()
factor_cash_flow['symbol'] = tp_cash_flow['symbol']
tp_cash_flow.set_index('symbol', inplace=True)
# 非TTM计算
factor_cash_flow = cash_flow.nocf_to_operating_ni_latest(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.cash_rate_of_sales_latest(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.sales_service_cash_to_or_latest(tp_cash_flow, factor_cash_flow)
# TTM计算
factor_cash_flow = cash_flow.nocf_to_t_liability_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.nocf_to_interest_bear_debt_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.nocf_to_net_debt_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.sale_service_cash_to_or_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.cash_rate_of_sales_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.nocf_to_operating_ni_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.oper_cash_in_to_current_liability_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.cash_to_current_liability_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.cfo_to_ev_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.acca_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.net_profit_cash_cover_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow = cash_flow.oper_cash_in_to_asset_ttm(tp_cash_flow, factor_cash_flow)
factor_cash_flow['id'] = factor_cash_flow['symbol'] + str(trade_date)
factor_cash_flow['trade_date'] = str(trade_date)
cash_flow._storage_data(factor_cash_flow, trade_date)
def do_update(self, start_date, end_date, count):
# 读取本地交易日
trade_date_sets = self._trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
self.calculate(trade_date)
print('----->')
@app.task()
def factor_calculate(**kwargs):
print("cash_flow_kwargs: {}".format(kwargs))
date_index = kwargs['date_index']
session = kwargs['session']
cash_flow = FactorCashFlow('factor_cash_flow') # 注意, 这里的name要与client中新建table时的name一致, 不然回报错
content = cache_data.get_cache(session, date_index)
total_cash_flow_data = json_normalize(json.loads(str(content, encoding='utf8')))
print("len_total_cash_flow_data {}".format(len(total_cash_flow_data)))
calculate(date_index, total_cash_flow_data, cash_flow)
#!/usr/bin/env python
# coding=utf-8
import pdb
import sys
import numpy as np
import pandas as pd
import argparse
from datetime import datetime
sys.path.append("..")
import json
from pandas.io.json import json_normalize
from factor import app
from factor.factor_base import FactorBase
from factor.ttm_fundamental import *
from vision.fm.signletion_engine import *
from vision.utillities.calc_tools import CalcTools
from ultron.cluster.invoke.cache_data import cache_data
class FactorContrarian(FactorBase):
def __init__(self, name):
super(FactorContrarian, self).__init__(name)
# 构建因子表
def create_dest_tables(self):
drop_sql = """drop table if exists `{0}`""".format(self._name)
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`sales_cost_ratio_ttm` decimal(19,4),
`tax_ratio_ttm` decimal(19,4),
`financail_expense_rate_ttm` decimal(19,4),
`operating_expense_rate_ttm` decimal(19,4),
`admini_expense_rate_ttm` decimal(19,4),
`period_costs_rate_ttm` decimal(19,4),
`debt_tangible_equity_ratio_latest` decimal(19,4),
`debts_asset_ratio_latest` decimal(19,4),
`inte_bear_debt_to_total_capital_latest` decimal(19,4),
PRIMARY KEY(`id`,`trade_date`,`symbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self._name)
super(FactorContrarian, self)._create_tables(create_sql, drop_sql)
# 销售成本率=营业成本(TTM)/营业收入(TTM)
def sales_cost_ratio_ttm(self, tp_contrarian, factor_contrarian):
columns_list = ['operating_cost', 'operating_revenue']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['sales_cost_ratio_ttm'] = np.where(
CalcTools.is_zero(contrarian['operating_revenue']),
0, contrarian['operating_cost'] / contrarian['operating_revenue']
)
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
# 销售税金率=营业税金及附加(TTM)/营业收入(TTM)
def tax_ratio_ttm(self, tp_contrarian, factor_contrarian):
columns_list = ['operating_tax_surcharges', 'operating_revenue']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['tax_ratio_ttm'] = np.where(
CalcTools.is_zero(contrarian['operating_revenue']), 0,
contrarian['operating_tax_surcharges'] / contrarian['operating_revenue']
)
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
# 财务费用与营业总收入之比=财务费用(TTM)/营业总收入(TTM)
def financial_expense_rate_ttm(self, tp_contrarian, factor_contrarian):
columns_list = ['financial_expense', 'total_operating_cost']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['financail_expense_rate_ttm'] = np.where(
CalcTools.is_zero(contrarian['total_operating_cost']), 0,
contrarian['financial_expense'] / contrarian['total_operating_cost']
)
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
# 营业费用与营业总收入之比=销售费用(TTM)/营业总收入(TTM)
def operating_expense_rate_ttm(self, tp_contrarian, factor_contrarian):
columns_list = ['sale_expense', 'total_operating_cost', 'total_operating_revenue']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['operating_expense_rate_ttm'] = np.where(
CalcTools.is_zero(contrarian['total_operating_cost']), 0,
contrarian['sale_expense'] / contrarian['total_operating_revenue']
)
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
# 管理费用与营业总收入之比=管理费用/营业总收入
def admini_expense_rate_ttm(self, tp_contrarian, factor_contrarian):
columns_list = ['administration_expense', 'total_operating_revenue']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['admini_expense_rate_ttm'] = np.where(
CalcTools.is_zero(contrarian['total_operating_revenue']), 0,
contrarian['administration_expense'] / contrarian['total_operating_revenue']
)
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
# 销售期间费用率 = (财务费用 + 销售费用 + 管理费用) / (营业收入)
def period_costs_rate_ttm(self, tp_contrarian, factor_contrarian):
columns_list = ['financial_expense', 'sale_expense', 'administration_expense', 'operating_revenue']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['period_costs_rate_ttm'] = np.where(
CalcTools.is_zero(contrarian['operating_revenue']), 0,
(contrarian['financial_expense'] + contrarian['sale_expense'] + contrarian['administration_expense']) / \
contrarian['operating_revenue']
)
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
# 负债合计/有形资产(流动资产+固定资产)
def debt_tangible_equity_ratio_latest(self, tp_contrarian, factor_contrarian):
columns_list = ['total_liability', 'total_current_liability', 'fixed_assets']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['debt_tangible_equity_ratio_latest'] = np.where(
CalcTools.is_zero(contrarian['total_current_liability'] + contrarian['fixed_assets']), 0,
contrarian['total_current_liability'] / (contrarian['total_current_liability'] + contrarian['fixed_assets'])
)
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
# 债务总资产比=负债合计/资产合计
def debts_asset_ratio_latest(self, tp_contrarian, factor_contrarian):
columns_list = ['total_liability', 'total_assets']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['debts_asset_ratio_latest'] = np.where(
CalcTools.is_zero(contrarian['total_assets']), 0,
contrarian['total_liability'] / contrarian['total_assets'])
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
# InteBearDebtToTotalCapital = 有息负债/总资本 总资本=固定资产+净运营资本 净运营资本=流动资产-流动负债
# InteBearDebtToTotalCapital = 有息负债/(固定资产 + 流动资产 - 流动负债)
def inte_bear_debt_to_total_capital_latest(self, tp_contrarian, factor_contrarian):
columns_list = ['interest_bearing_liability', 'fixed_assets', 'total_current_assets',
'total_current_liability']
contrarian = tp_contrarian.loc[:, columns_list]
contrarian['inte_bear_debt_to_total_capital_latest'] = np.where(
CalcTools.is_zero(contrarian['fixed_assets'] + contrarian['total_current_assets'] + \
contrarian['total_current_liability']), 0,
contrarian['interest_bearing_liability'] / (contrarian['fixed_assets'] + \
contrarian['total_current_assets'] + contrarian[
'total_current_liability'])
)
contrarian = contrarian.drop(columns_list, axis=1)
factor_contrarian = pd.merge(factor_contrarian, contrarian, on="symbol")
return factor_contrarian
def calculate(trade_date, constrain_sets, constrain): # 计算对应因子
print(trade_date)
factor_contrarian = pd.DataFrame()
tp_contrarian = constrain_sets
factor_contrarian['symbol'] = tp_contrarian['symbol']
tp_contrarian.set_index('symbol', inplace=True)
factor_contrarian = constrain.inte_bear_debt_to_total_capital_latest(constrain_sets, factor_contrarian)
factor_contrarian = constrain.debts_asset_ratio_latest(constrain_sets, factor_contrarian)
factor_contrarian = constrain.debt_tangible_equity_ratio_latest(constrain_sets, factor_contrarian)
factor_contrarian = constrain.sales_cost_ratio_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.tax_ratio_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.financial_expense_rate_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.operating_expense_rate_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.admini_expense_rate_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.period_costs_rate_ttm(constrain_sets, factor_contrarian)
factor_contrarian['id'] = factor_contrarian['symbol'] + str(trade_date)
factor_contrarian['trade_date'] = str(trade_date)
constrain._storage_data(factor_contrarian, trade_date)
@app.task()
def factor_calculate(**kwargs):
print("constrain_kwargs: {}".format(kwargs))
date_index = kwargs['date_index']
session = kwargs['session']
constrain = FactorContrarian('factor_constrain') # 注意, 这里的name要与client中新建table时的name一致, 不然回报错
content = cache_data.get_cache(session, date_index)
total_constrain_data = json_normalize(json.loads(str(content, encoding='utf8')))
print("len_total_constrain_data {}".format(len(total_constrain_data)))
calculate(date_index, total_constrain_data, constrain)
\ No newline at end of file
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version:
@author: Wang
@file: factor_management.py
@time: 2019-05-31
"""
import pdb
import argparse
import time
from datetime import datetime, timedelta
import collections
import sys
import numpy as np
import pandas as pd
sys.path.append('..')
from factor.ttm_fundamental import *
from factor.factor_base import FactorBase
from vision.fm.signletion_engine import *
from vision.file_unit.cash_flow import CashFlow
from vision.file_unit.income import Income
from vision.file_unit.balance import Balance
from vision.file_unit.valuation import Valuation
from vision.file_unit.indicator import Indicator
from vision.utillities.calc_tools import CalcTools
import json
from pandas.io.json import json_normalize
from factor import app
from ultron.cluster.invoke.cache_data import cache_data
class FactorEarning(FactorBase):
def __init__(self, name):
super(FactorEarning, self).__init__(name)
def create_dest_tables(self):
"""
创建数据库表
:return:
"""
drop_sql = """drop table if exists `{0}`""".format(self._name)
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`net_profit_ratio` decimal(19,4),
`operating_profit_ratio` decimal(19,4),
`np_to_tor` decimal(19,4),
`operating_profit_to_tor` decimal(19,4),
`gross_income_ratio` decimal(19,4),
`ebit_to_tor` decimal(19,4),
`roa` decimal(19,4),
`roa5` decimal(19,4),
`roe` decimal(19,4),
`roe5` decimal(19,4),
`roe_diluted` decimal(19,4),
`roe_avg` decimal(19,4),
`roe_cut` decimal(19,4),
`roa_ebit_ttm` decimal(19,4),
`operating_ni_to_tp_ttm` decimal(19,4),
`operating_ni_to_tp_latest` decimal(19,4),
`invest_r_associates_to_tp_ttm` decimal(19,4),
`invest_r_associates_to_tp_latest` decimal(19,4),
`npcut_to_np` decimal(19,4),
`interest_cover_ttm` decimal(19,4),
`net_non_oi_to_tp_ttm` decimal(19,4),
`net_non_oi_to_tp_latest` decimal(19,4),
PRIMARY KEY(`id`,`trade_date`,`symbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self._name)
super(FactorEarning, self)._create_tables(create_sql, drop_sql)
def net_profit_ratio(self, ttm_earning, factor_earning):
"""
销售净利率(Net profit ratio)
销售净利率=净利润/营业收入
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['net_profit', 'operating_revenue']
earning = ttm_earning.loc[:, columns_list]
earning['net_profit_ratio'] = np.where(
CalcTools.is_zero(earning.operating_revenue.values), 0,
earning.net_profit.values / earning.operating_revenue.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def operating_profit_ratio(self, ttm_earning, factor_earning):
"""
营业净利率
营业净利率=营业利润/营业收入
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['operating_profit', 'operating_revenue']
earning = ttm_earning.loc[:, columns_list]
earning['operating_profit_ratio'] = np.where(
CalcTools.is_zero(earning.operating_revenue.values), 0,
earning.operating_profit.values / earning.operating_revenue.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def np_to_tor(self, ttm_earning, factor_earning):
"""
净利润与营业总收入之比
净利润与营业总收入之比=净利润/营业总收入
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['net_profit', 'total_operating_revenue']
earning = ttm_earning.loc[:, columns_list]
earning['np_to_tor'] = np.where(
CalcTools.is_zero(earning.total_operating_revenue.values), 0,
earning.net_profit.values / earning.total_operating_revenue.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def operating_profit_to_tor(self, ttm_earning, factor_earning):
"""
营业利润与营业总收入之比
营业利润与营业总收入之比=营业利润/营业总收入
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['operating_profit', 'total_operating_revenue']
earning = ttm_earning.loc[:, columns_list]
earning['operating_profit_to_tor'] = np.where(
CalcTools.is_zero(earning.total_operating_revenue.values), 0,
earning.operating_profit.values / earning.total_operating_revenue.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def gross_income_ratio(self, ttm_earning, factor_earning):
"""
销售毛利率
销售毛利率=(营业收入-营业成本)/营业收入
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['operating_revenue', 'operating_cost']
earning = ttm_earning.loc[:, columns_list]
earning['gross_income_ratio'] = np.where(
CalcTools.is_zero(earning.operating_revenue.values), 0,
(earning.operating_revenue.values-earning.operating_cost.values)
/ earning.operating_revenue.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def ebit_to_tor(self, ttm_earning, factor_earning):
"""
息税前利润与营业总收入之比
息税前利润与营业总收入之比=(利润总额+利息支出-利息收入)/营业总收入
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['total_profit', 'financial_expense',
'interest_income', 'total_operating_revenue']
earning = ttm_earning.loc[:, columns_list]
earning['ebit_to_tor'] = np.where(
CalcTools.is_zero(earning.total_operating_revenue.values), 0,
(earning.total_profit.values +
earning.financial_expense.values -
earning.interest_income.values)
/ earning.total_operating_revenue.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def roa(self, ttm_earning, factor_earning):
"""
资产回报率
资产回报率=净利润/总资产
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['net_profit', 'total_assets']
earning = ttm_earning.loc[:, columns_list]
earning['roa'] = np.where(
CalcTools.is_zero(earning.total_assets.values), 0,
earning.net_profit.values / earning.total_assets.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def roa5(self, ttm_earning_5y, factor_earning):
"""
5年权益回报率
5年权益回报率=净利润/总资产
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['net_profit', 'total_assets']
earning = ttm_earning_5y.loc[:, columns_list]
earning['roa5'] = np.where(
CalcTools.is_zero(earning.total_assets.values), 0,
earning.net_profit.values / earning.total_assets.values / 4)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def roe(self, ttm_earning, factor_earning):
"""
权益回报率
权益回报率=净利润/股东权益
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['net_profit', 'total_owner_equities']
earning = ttm_earning.loc[:, columns_list]
earning['roe'] = np.where(
CalcTools.is_zero(earning.total_owner_equities.values), 0,
earning.net_profit.values / earning.total_owner_equities.values/4)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def roe5(self, ttm_earning_5y, factor_earning):
"""
5年权益回报率
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['net_profit', 'total_owner_equities']
earning = ttm_earning_5y.loc[:, columns_list]
earning['roe5'] = np.where(
CalcTools.is_zero(earning.total_owner_equities.values), 0,
earning.net_profit.values / earning.total_owner_equities.values / 4)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def roe_diluted(self, tp_earning, factor_earning):
"""
净资产收益率(摊薄)
净资产收益率(摊薄)=归属于母公司的净利润/期末归属于母公司的所有者权益
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['np_parent_company_owners',
'equities_parent_company_owners']
earning = tp_earning.loc[:, columns_list]
earning['roe_diluted'] = np.where(
CalcTools.is_zero(earning.equities_parent_company_owners.values), 0,
earning.np_parent_company_owners.values /
earning.equities_parent_company_owners.values/4)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def roe_avg(self, ttm_earning, factor_earning):
"""
资产收益率(平均)
资产收益率(平均)=归属于母公司的净利润*2/(期末归属于母公司的所有者权益
+ 期初归属于母公司的所有者权益)*100%
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['np_parent_company_owners',
'equities_parent_company_owners']
earning = ttm_earning.loc[:, columns_list]
earning['roe_avg'] = np.where(
CalcTools.is_zero(earning.equities_parent_company_owners.values), 0,
earning.np_parent_company_owners.values /
earning.equities_parent_company_owners.values / 4)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def roe_weighted(self, ttm_earning, factor_earning):
"""
:param ttm_earning:
:param factor_earning:
:return:
"""
pass
def roe_cut(self, tp_earning, factor_earning):
"""
:param tp_earning:
:param factor_earning:
:return:
"""
columns_list = ['adjusted_profit', 'equities_parent_company_owners']
earning = tp_earning.loc[:, columns_list]
earning['roe_cut'] = np.where(
CalcTools.is_zero(earning.equities_parent_company_owners.values), 0,
earning.adjusted_profit.values /
earning.equities_parent_company_owners.values / 4)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def roe_cut_weighted(self, ttm_earning, factor_earning):
"""
:param ttm_earning:
:param factor_earning:
:return:
"""
pass
def roic(self, ttm_earning, factor_earning):
"""
:param ttm_earning:
:param factor_earning:
:return:
"""
pass
def roa_ebit(self, ttm_earning, factor_earning):
"""
:param ttm_earning:
:param factor_earning:
:return:
"""
pass
def roa_ebit_ttm(self, ttm_earning, factor_earning):
"""
总资产报酬率
ROAEBIT = EBIT*2/(期初总资产+期末总资产)
(注,此处用过去四个季度资产均值)
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['total_profit', 'financial_expense',
'interest_income', 'total_assets']
earning = ttm_earning.loc[:, columns_list]
earning['roa_ebit_ttm'] = np.where(
CalcTools.is_zero(earning.total_assets.values), 0,
(earning.total_profit.values +
earning.financial_expense.values -
earning.interest_income.values)
/ earning.total_assets.values / 4)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def operating_ni_to_tp_ttm(self, ttm_earning, factor_earning):
"""
经营活动净收益/利润总额
(注,对于非金融企业 经营活动净收益=营业总收入-营业总成本;
对于金融企业 经营活动净收益=营业收入-公允价值变动损益-投资收益-汇兑损益-营业支出
此处以非金融企业的方式计算)
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['total_operating_revenue', 'total_operating_cost',
'total_profit']
earning = ttm_earning.loc[:, columns_list]
earning['operating_ni_to_tp_ttm'] = np.where(
CalcTools.is_zero(earning.total_profit.values), 0,
(earning.total_operating_revenue.values -
earning.total_operating_cost.values)
/ earning.total_profit.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def operating_ni_to_tp_latest(self, tp_earning, factor_earning):
"""
经营活动净收益/利润总额
(注,对于非金融企业 经营活动净收益=营业总收入-营业总成本;
对于金融企业 经营活动净收益=营业收入-公允价值变动损益-投资收益-汇兑损益-营业支出
此处以非金融企业的方式计算)
:param tp_earning:
:param factor_earning:
:return:
"""
columns_list = ['total_operating_revenue', 'total_operating_cost',
'total_profit']
earning = tp_earning.loc[:, columns_list]
earning['operating_ni_to_tp_latest'] = np.where(
CalcTools.is_zero(earning.total_profit.values), 0,
(earning.total_operating_revenue.values -
earning.total_operating_cost.values)
/ earning.total_profit.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def invest_r_associates_to_tp_ttm(self, ttm_earning, factor_earning):
"""
对联营和营公司投资收益/利润总额
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['invest_income_associates', 'total_profit']
earning = ttm_earning.loc[:, columns_list]
earning['invest_r_associates_to_tp_ttm'] = np.where(
CalcTools.is_zero(earning.total_profit.values), 0,
earning.invest_income_associates.values
/ earning.total_profit.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def invest_r_associates_to_tp_latest(self, tp_earning, factor_earning):
"""
对联营和营公司投资收益/利润总额
:param tp_earning:
:param factor_earning:
:return:
"""
columns_list = ['invest_income_associates', 'total_profit']
earning = tp_earning.loc[:, columns_list]
earning['invest_r_associates_to_tp_latest'] = np.where(
CalcTools.is_zero(earning.total_profit.values), 0,
earning.invest_income_associates.values
/ earning.total_profit.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def npcut_to_np(self, tp_earning, factor_earning):
"""
扣除非经常损益后的净利润/净利润
:param tp_earning:
:param factor_earning:
:return:
"""
columns_list = ['adjusted_profit', 'net_profit']
earning = tp_earning.loc[:, columns_list]
earning['npcut_to_np'] = np.where(
CalcTools.is_zero(earning.net_profit.values), 0,
earning.adjusted_profit.values
/ earning.net_profit.values)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def interest_cover_ttm(self, ttm_earning, factor_earning):
"""
利息保障倍数
InterestCover=(TP + INT_EXP - INT_COME)/(INT_EXP - INT_COME)
息税前利润/利息费用,息税前利润=利润总额+利息费用,利息费用=利息支出-利息收入
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['total_profit', 'financial_expense', 'interest_income']
earning = ttm_earning.loc[:, columns_list]
earning['interest_cover_ttm'] = np.where(
CalcTools.is_zero(earning.financial_expense.values-
earning.interest_income.values), 0,
(earning.total_profit.values +
earning.financial_expense.values -
earning.interest_income.values)
/ (earning.financial_expense.values -
earning.interest_income.values)
)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def degm(self, ttm_earning, ttm_earning_p1y, factor_earning):
"""
毛利率增长,与去年同期相比
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['operating_revenue', 'operating_cost']
earning = ttm_earning.loc[:, columns_list]
earning_p1y = ttm_earning_p1y.loc[:, columns_list]
earning['gross_income_ratio'] = np.where(
CalcTools.is_zero(earning.operating_revenue.values), 0,
(earning.operating_revenue.values -
earning.operating_cost.values)
/ earning.operating_revenue.values
)
earning_p1y['gross_income_ratio'] = np.where(
CalcTools.is_zero(earning_p1y.operating_revenue.values), 0,
(earning_p1y.operating_revenue.values -
earning_p1y.operating_cost.values)
/ earning_p1y.operating_revenue.values)
earning["degm"] = earning["gross_income_ratio"] - earning_p1y["gross_income_ratio"]
columns_list.append('gross_income_ratio')
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def net_non_oi_to_tp_ttm(self, ttm_earning, factor_earning):
"""
营业外收支净额/利润总额
:param ttm_earning:
:param factor_earning:
:return:
"""
columns_list = ['total_profit', 'non_operating_revenue', 'non_operating_expense']
earning = ttm_earning.loc[:, columns_list]
earning['net_non_oi_to_tp_ttm'] = np.where(
CalcTools.is_zero(earning.total_profit.values), 0,
(earning.non_operating_revenue.values +
earning.non_operating_expense.values)
/ earning.total_profit.values
)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def net_non_oi_to_tp_latest(self, tp_earning, factor_earning):
"""
营业外收支净额/利润总额
:param tp_earning:
:param factor_earning:
:return:
"""
columns_list = ['total_profit', 'non_operating_revenue', 'non_operating_expense']
earning = tp_earning.loc[:, columns_list]
earning['net_non_oi_to_tp_latest'] = np.where(
CalcTools.is_zero(earning.total_profit.values), 0,
(earning.non_operating_revenue.values +
earning.non_operating_expense.values)
/ earning.total_profit.values
)
earning = earning.drop(columns_list, axis=1)
factor_earning = pd.merge(factor_earning, earning, on="symbol")
return factor_earning
def calculate(trade_date, earning_sets_dic, earning): # 计算对应因子
print(trade_date)
tp_earning = earning_sets_dic['tp_earning']
ttm_earning = earning_sets_dic['ttm_earning']
ttm_earning_5y = earning_sets_dic['ttm_earning_5y']
# 因子计算
factor_earning = pd.DataFrame()
factor_earning['symbol'] = tp_earning.index
factor_earning = earning.net_profit_ratio(ttm_earning, factor_earning)
factor_earning = earning.operating_profit_ratio(ttm_earning, factor_earning)
factor_earning = earning.np_to_tor(ttm_earning, factor_earning)
factor_earning = earning.operating_profit_to_tor(ttm_earning, factor_earning)
factor_earning = earning.gross_income_ratio(ttm_earning, factor_earning)
factor_earning = earning.ebit_to_tor(ttm_earning, factor_earning)
factor_earning = earning.roa(ttm_earning, factor_earning)
factor_earning = earning.roa5(ttm_earning_5y, factor_earning)
factor_earning = earning.roe(ttm_earning, factor_earning)
factor_earning = earning.roe5(ttm_earning_5y, factor_earning)
factor_earning = earning.roe_diluted(tp_earning, factor_earning)
factor_earning = earning.roe_avg(ttm_earning, factor_earning)
# factor_earning = self.roe_weighted()
factor_earning = earning.roe_cut(tp_earning, factor_earning)
# factor_earning = self.roe_cut_weighted()
# factor_earning = self.roic()
# factor_earning = self.roa_ebit()
factor_earning = earning.roa_ebit_ttm(ttm_earning, factor_earning)
factor_earning = earning.operating_ni_to_tp_ttm(ttm_earning, factor_earning)
factor_earning = earning.operating_ni_to_tp_latest(tp_earning, factor_earning)
factor_earning = earning.invest_r_associates_to_tp_ttm(ttm_earning, factor_earning)
factor_earning = earning.invest_r_associates_to_tp_latest(tp_earning, factor_earning)
factor_earning = earning.npcut_to_np(tp_earning, factor_earning)
factor_earning = earning.interest_cover_ttm(ttm_earning, factor_earning)
# factor_earning = self.degm(ttm_earning, ttm_earning_p1y, factor_earning)
factor_earning = earning.net_non_oi_to_tp_ttm(ttm_earning, factor_earning)
factor_earning = earning.net_non_oi_to_tp_latest(tp_earning, factor_earning)
factor_earning['id'] = factor_earning['symbol'] + str(trade_date)
factor_earning['trade_date'] = str(trade_date)
earning._storage_data(factor_earning, trade_date)
@app.task()
def factor_calculate(**kwargs):
print("constrain_kwargs: {}".format(kwargs))
date_index = kwargs['date_index']
session = kwargs['session']
earning = FactorEarning('factor_earning') # 注意, 这里的name要与client中新建table时的name一致, 不然回报错
content1 = cache_data.get_cache(session + "1", date_index)
content2 = cache_data.get_cache(session + "2", date_index)
content3 = cache_data.get_cache(session + "3", date_index)
tp_earning = json_normalize(json.loads(str(content1, encoding='utf8')))
ttm_earning_5y = json_normalize(json.loads(str(content2, encoding='utf8')))
ttm_earning = json_normalize(json.loads(str(content3, encoding='utf8')))
# cache_date.get_cache使得index的名字丢失, 所以数据需要按照下面的方式设置index
tp_earning.set_index('symbol', inplace=True)
ttm_earning.set_index('symbol', inplace=True)
ttm_earning_5y.set_index('symbol', inplace=True)
total_earning_data = {'tp_earning': tp_earning, 'ttm_earning_5y': ttm_earning_5y, 'ttm_earning': ttm_earning}
calculate(date_index, total_earning_data, earning)
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
@time: 2019-02-12 10:03 @time: 2019-02-12 10:03
""" """
import json import json
from datetime import datetime, timedelta
from sklearn import linear_model from sklearn import linear_model
from pandas.io.json import json_normalize from pandas.io.json import json_normalize
...@@ -21,6 +20,9 @@ from ultron.cluster.invoke.cache_data import cache_data ...@@ -21,6 +20,9 @@ from ultron.cluster.invoke.cache_data import cache_data
class Growth(FactorBase): class Growth(FactorBase):
"""
成长类因子
"""
def __init__(self, name): def __init__(self, name):
super(Growth, self).__init__(name) super(Growth, self).__init__(name)
...@@ -694,20 +696,6 @@ def do_update(growth, growth_sets, start_date, end_date, count): ...@@ -694,20 +696,6 @@ def do_update(growth, growth_sets, start_date, end_date, count):
print('----->') print('----->')
def run(args):
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor_growth = Growth('factor_growth')
processor_growth.create_dest_tables()
do_update(args.start_date, end_date, args.count, args.growth_sets, processor_growth)
if args.update:
processor_growth = Growth('factor_growth')
do_update(args.start_date, end_date, args.count, args.growth_sets, processor_growth)
@app.task() @app.task()
def factor_calculate(**kwargs): def factor_calculate(**kwargs):
print("growth_kwargs: {}".format(kwargs)) print("growth_kwargs: {}".format(kwargs))
......
...@@ -29,6 +29,9 @@ from ultron.cluster.invoke.cache_data import cache_data ...@@ -29,6 +29,9 @@ from ultron.cluster.invoke.cache_data import cache_data
class HistoricalValue(FactorBase): class HistoricalValue(FactorBase):
"""
价值类因子
"""
def __init__(self, name): def __init__(self, name):
super(HistoricalValue, self).__init__(name) super(HistoricalValue, self).__init__(name)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment