Commit b2dea9f5 authored by 李煜's avatar 李煜

code update

parent 2353e349
......@@ -377,7 +377,6 @@ def get_basic_cash_flow(trade_date):
}
ttm_factor_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factor_sets = ttm_factor_sets[-ttm_factor_sets.duplicated()]
# 合并
ttm_factor_sets = pd.merge(ttm_factor_sets, valuation_sets, on="symbol")
......@@ -409,7 +408,7 @@ def get_basic_constrain(trade_date):
Income.administration_expense
]}
ttm_factors_sets = get_ttm_fundamental([], ttm_factors, trade_date)
ttm_factors_sets = get_ttm_fundamental([], ttm_factors, trade_date).reset_index()
ttm_factors_sets = ttm_factors_sets[-ttm_factors_sets.duplicated()]
return balance_sets, ttm_factors_sets
......@@ -525,7 +524,7 @@ if __name__ == '__main__':
history_value = historical_value.HistoricalValue('factor_historical_value')
history_value.create_dest_tables()
# per shapre
# per share indicator
per_share = factor_per_share_indicators.PerShareIndicators('factor_per_share')
per_share.create_dest_tables()
......@@ -582,8 +581,8 @@ if __name__ == '__main__':
# constrain
balance_sets, ttm_factors_sets = get_basic_constrain(date_index)
constrain_sets = pd.merge(balance_sets, ttm_factors_sets, on='symbol')
cache_data.set_cache(session5, date_index, constrain_sets.to_json(orient='records'))
cache_data.set_cache(session5 + '1', date_index, balance_sets.to_json(orient='records'))
cache_data.set_cache(session5 + '2', date_index, ttm_factors_sets.to_json(orient='records'))
factor_contrarian.factor_calculate(date_index=date_index, session=session5)
time5 = time.time()
print('constrain_cal_time:{}'.format(time5 - time4))
......
......@@ -9,21 +9,21 @@ from sqlalchemy.orm import sessionmaker
sys.path.append('..')
from factor.utillities.trade_date import TradeDate
from factor import config
from factor import factor_config
class FactorBase(object):
def __init__(self, name):
destination_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.destination_db_user,
config.destination_db_pwd,
config.destination_db_host,
config.destination_db_port,
config.destination_db_database)
destination_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(factor_config.destination_db_user,
factor_config.destination_db_pwd,
factor_config.destination_db_host,
factor_config.destination_db_port,
factor_config.destination_db_database)
self._name = name
self._destination = sa.create_engine(destination_db)
self._dest_session = sessionmaker(bind=self._destination, autocommit=False, autoflush=True)
self._trade_date = TradeDate()
self._dir = config.RECORD_BASE_DIR + 'factor/' + str(self._name)
self._dir = factor_config.RECORD_BASE_DIR + 'factor/' + str(self._name)
def _create_index(self):
session = self._dest_session()
......
......@@ -269,7 +269,7 @@ def factor_calculate(**kwargs):
ttm_factor_sets = json_normalize(json.loads(str(content2, encoding='utf8')))
tp_cash_flow.set_index('symbol', inplace=True)
ttm_factor_sets.set_index('symbol', inplace=True)
print("len_tp_cash_flow_data {}".format(len(tp_cash_flow)))
print("len_ttm_cash_flow_data {}".format(len(ttm_factor_sets)))
total_cash_flow_data = {'tp_cash_flow': tp_cash_flow, 'ttm_factor_sets': ttm_factor_sets}
print("len_total_cash_flow_data {}".format(len(total_cash_flow_data)))
calculate(date_index, total_cash_flow_data, cash_flow)
......@@ -11,7 +11,7 @@ source_db_pwd = 'read'
destination_db_host = '10.15.97.128'
destination_db_port = '3306'
destination_db_database = 'vision'
destination_db_database = 'test'
destination_db_user = 'root'
destination_db_pwd = '1234'
......
......@@ -158,36 +158,42 @@ class FactorContrarian(FactorBase):
return factor_contrarian
def calculate(trade_date, constrain_sets, constrain): # 计算对应因子
print(trade_date)
def calculate(trade_date, total_constrain_data_dic, constrain): # 计算对应因子
balance_sets = total_constrain_data_dic['balance_sets']
ttm_factors_sets = total_constrain_data_dic['ttm_factors_sets']
factor_contrarian = pd.DataFrame()
tp_contrarian = constrain_sets
factor_contrarian['symbol'] = tp_contrarian['symbol']
tp_contrarian.set_index('symbol', inplace=True)
factor_contrarian = constrain.inte_bear_debt_to_total_capital_latest(constrain_sets, factor_contrarian)
factor_contrarian = constrain.debts_asset_ratio_latest(constrain_sets, factor_contrarian)
factor_contrarian = constrain.debt_tangible_equity_ratio_latest(constrain_sets, factor_contrarian)
factor_contrarian = constrain.sales_cost_ratio_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.tax_ratio_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.financial_expense_rate_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.operating_expense_rate_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.admini_expense_rate_ttm(constrain_sets, factor_contrarian)
factor_contrarian = constrain.period_costs_rate_ttm(constrain_sets, factor_contrarian)
factor_contrarian['symbol'] = balance_sets.index
# 非TTM计算
factor_contrarian = constrain.inte_bear_debt_to_total_capital_latest(balance_sets, factor_contrarian)
factor_contrarian = constrain.debts_asset_ratio_latest(balance_sets, factor_contrarian)
factor_contrarian = constrain.debt_tangible_equity_ratio_latest(balance_sets, factor_contrarian)
# TTM计算
factor_contrarian = constrain.sales_cost_ratio_ttm(ttm_factors_sets, factor_contrarian)
factor_contrarian = constrain.tax_ratio_ttm(ttm_factors_sets, factor_contrarian)
factor_contrarian = constrain.financial_expense_rate_ttm(ttm_factors_sets, factor_contrarian)
factor_contrarian = constrain.operating_expense_rate_ttm(ttm_factors_sets, factor_contrarian)
factor_contrarian = constrain.admini_expense_rate_ttm(ttm_factors_sets, factor_contrarian)
factor_contrarian = constrain.period_costs_rate_ttm(ttm_factors_sets, factor_contrarian)
factor_contrarian['id'] = factor_contrarian['symbol'] + str(trade_date)
factor_contrarian['trade_date'] = str(trade_date)
constrain._storage_data(factor_contrarian, trade_date)
@app.task()
def factor_calculate(**kwargs):
print("constrain_kwargs: {}".format(kwargs))
date_index = kwargs['date_index']
session = kwargs['session']
constrain = FactorContrarian('factor_constrain') # 注意, 这里的name要与client中新建table时的name一致, 不然回报错
content = cache_data.get_cache(session, date_index)
total_constrain_data = json_normalize(json.loads(str(content, encoding='utf8')))
print("len_total_constrain_data {}".format(len(total_constrain_data)))
calculate(date_index, total_constrain_data, constrain)
\ No newline at end of file
content1 = cache_data.get_cache(session + '1', date_index)
content2 = cache_data.get_cache(session + '2', date_index)
balance_sets = json_normalize(json.loads(str(content1, encoding='utf8')))
ttm_factors_sets = json_normalize(json.loads(str(content2, encoding='utf8')))
balance_sets.set_index('symbol', inplace=True)
ttm_factors_sets.set_index('symbol', inplace=True)
print("len_constrain_data {}".format(len(balance_sets)))
print("len_ttm_constrain_data {}".format(len(ttm_factors_sets)))
total_constrain_data_dic = {'balance_sets': balance_sets, 'ttm_factors_sets': ttm_factors_sets}
calculate(date_index, total_constrain_data_dic, constrain)
......@@ -687,15 +687,6 @@ def calculate(trade_date, growth_sets, growth):
growth._storage_data(factor_historical_growth, trade_date)
def do_update(growth, growth_sets, start_date, end_date, count):
# 读取本地交易日
_trade_date = td.TradeDate()
trade_date_sets = _trade_date.trade_date_sets_ago(start_date, end_date, count)
for trade_date in trade_date_sets:
calculate(trade_date, growth_sets, growth)
print('----->')
@app.task()
def factor_calculate(**kwargs):
print("growth_kwargs: {}".format(kwargs))
......
......@@ -11,20 +11,16 @@ import sys
from datetime import datetime
sys.path.append("..")
import json
import math
import numpy as np
from vision.fm.signletion_engine import *
from factor.utillities.calc_tools import CalcTools
import json
from pandas.io.json import json_normalize
from factor import app
from factor.factor_base import FactorBase
from factor.ttm_fundamental import *
from vision.fm.signletion_engine import *
from factor.utillities import trade_date as td
from factor.utillities.calc_tools import CalcTools
from ultron.cluster.invoke.cache_data import cache_data
......
......@@ -8,13 +8,13 @@ from collections import OrderedDict
import collections
sys.path.append("../../")
from factor import config
from factor import factor_config
class TradeDate(object):
def __init__(self):
self._all_trade_file = config.RECORD_BASE_DIR + 'trade_date/' + 'trade_date.csv'
self._all_trade_file = factor_config.RECORD_BASE_DIR + 'trade_date/' + 'trade_date.csv'
self._trade_date_sets = OrderedDict()
self._load_trade_date()
......
from ultron.config import config_setting
config_setting.set_queue(qtype='redis', host='10.15.5.34', port=6379, pwd='', db=1)
config_setting.update()
# coding=utf-8
import sys
import gevent.monkey; gevent.monkey.patch_all()
from twisted.internet import reactor
sys.path.append('../..')
from ultron.cluster.work.work_engine import WorkEngine
if __name__ == "__main__":
reactor.__init__()
work_engine = WorkEngine()
reactor.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment