Commit 72cf8bc4 authored by 子恒's avatar 子恒

Initial commit

parents
Pipeline #315 canceled with stages
# tf-data
## 天风数据解析入库
从data-upload项目中将数据上传到obs,数据路径规则为obs://csvdatas/tfzq/fundAuto/20160108/20160108_fundAuto_detail.csv
20160108: 数据日期
fundAuto: 数据大类
detail: 数据小类,若某大类数据下仅有一种数据则没有此项
上传记录会存储至redis中,通过记录时间戳获取增量redis中记录,根据redis记录的值可以提取数据内容同步到数据库
### 代码结构
- db:数据库类
- sync:数据同步任务
- data_check:数据审核
- fund_auto:基金数据任务
- industry:行业数据任务
- publish:数据发布
- fd_manager_factor.py:数据大类下仅有一种数据的情况
- utillities:工具类
### 开发流程
1. TF的数据大多以一个项目的形式出现,针对一组新的任务在sync下新建一个package
2. 在package中新建base_xxx_sync.py,代码继承sync/base_sync.py
3. 所有任务需要保留4个参数
- tmstamp:任务起始时间点
- rebuild:是否重建表
- update:强制刷新tmstamp之后的数据
- schedule:自动获取上次更新的tmstamp,并刷新后面数据
### 注意
1. 所有数据均有is_verify和flag字段,分别代表审核字段和逻辑删除字段
2. is_verify有三种状态,0:新增,1:已审核,2:数据更新未审核
3. flag有两种状态,1:数据有效,null:数据删除,所有数据均采用逻辑删除
4. 所有数据同步到生产库后,进行审核流程,审核完成后再将数据同步到发布库
5. 所有数据均有_data_time字段,用于记录数据批次时间,且是每批数据的唯一标识,当数据重新发布时根据此字段删除库中已有数据
\ No newline at end of file
#!/usr/bin/env python
# coding=utf-8
hb_db_host = '121.37.138.1'
hb_db_port = '13317'
hb_db_database = 'hbdatas'
hb_db_user = 'rl_sync'
hb_db_pwd = 'rl_sync_2019'
rl_db_host = '121.37.138.1'
rl_db_port = '13317'
rl_db_database = 'vision'
rl_db_user = 'rl_sync'
rl_db_pwd = 'rl_sync_2019'
pub_db_host = '121.37.138.1'
pub_db_port = '13317'
pub_db_database = 'vision_product'
pub_db_user = 'rl_sync'
pub_db_pwd = 'rl_sync_2019'
AK = 'ZAGWHZCNMXQVLPFGMNWN'
SK = 'sliMcYsc5ngYRxEfwss6sLYGdgZPejIRDSfloDSX'
server = 'obs.cn-east-3.myhuaweicloud.com'
bucket_name = 'csvdatas'
import os
import time
import sys
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker, scoped_session
import pandas as pd
sys.path.append('..')
import config
class DB(object):
def __init__(self, source_db=None):
if source_db is None:
source_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.rl_db_user,
config.rl_db_pwd,
config.rl_db_host,
config.rl_db_port,
config.rl_db_database)
# 源数据库
self._source = sa.create_engine(source_db, pool_size=10)
# 目标数据库Session
self._session = sessionmaker(bind=self._source, autocommit=False, autoflush=True)
self._scoped_session = scoped_session(self._session)
self._clean_columns = set(['flag', 'is_verify', 'create_time', 'update_time'])
def delete_data_by_id(self, ids, table):
start_time = time.time()
i, count = 0, 3000
while i * count < len(ids):
del_ids = ids[i * count:(i + 1) * count]
sql = '''delete from `{0}` where id in {1}'''.format(table, del_ids)
sql = sql.replace('[', '(').replace(']', ')')
self._execute_sql(sql)
i = i + 1
print(f"delete id form {del_ids[0]} to {del_ids[-1]}")
print('delete over.', time.time() - start_time)
def save(self, df, table):
df.to_sql(name=table, con=self._source, if_exists='append', index=False)
def query_sql(self, sql, clean=False):
start = time.perf_counter()
result = pd.read_sql(sql, self._source)
if clean:
clean_columns = list(self._clean_columns & set(result.columns))
result.drop(columns=clean_columns, inplace=True)
print('query cost:', time.perf_counter() - start)
return result
def get_max_column(self, table=None, column='update_time'):
sql = f'select max({column}) as tm from `{table}`'
result = self.query_sql(sql)
result['tm'] = result['tm'].astype('str')
tm = None
if not result.empty:
tm = result['tm'][0]
return tm
def _execute_sql(self, sql):
session = self._session()
session.execute(sql)
session.commit()
session.close()
def _insert_or_update(self, datas, table):
session = self._session()
for i in range(datas.shape[0]):
data = datas.iloc[i]
values = ''
update = ''
title = ''
for j in range(len(data)):
index = data.index[j]
value = str(data[j]).replace("'", "\\'").replace("%", "\\%").replace("(", "\(").replace(")",
"\)").replace(
":", "\:")
title += """`{0}`,""".format(index)
values += """'{0}',""".format(value)
update += """`{0}`='{1}',""".format(index, value)
sql = '''insert into {0} ({1}) values({2}) ON DUPLICATE KEY UPDATE {3}'''.format(table,
title[0:-1],
values[0:-1],
update[0:-1]
)
sql = sql.replace("'nan'", 'Null').replace("'None'", 'Null')
try:
session.execute(sql)
except Exception as e:
print(sql)
print(e.orig.msg)
session.commit()
session.close()
def _save_to_mysql(self, df, table):
df.replace({',': '\,', '\n': '\\\\n', '\r\n': '\\\\n', '%': '\%'}, regex=True, inplace=True)
tmpfile = table + '_mysql.csv'
sep = ','
newline = '\n'
start = time.time()
self._insert_or_update(df[:0])
# df[:0].to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
columns = df.columns.values.tolist()
print('creating ' + tmpfile, 'ok')
with open(tmpfile, mode='w', newline=newline) as fhandle:
df.to_csv(fhandle, na_rep='\\N', index=False, sep=sep, encoding='UTF-8')
# df.to_csv(tmpfile, na_rep='\\N', index=False, sep=sep, encoding='UTF-8')
print('loading ' + tmpfile, 'ok')
sql_load = "LOAD DATA LOCAL INFILE '{}' replace INTO TABLE {} FIELDS TERMINATED BY '{}' LINES TERMINATED BY '{}' IGNORE 1 LINES ({});" \
.format(tmpfile, table, sep, newline, ','.join(columns))
self._source.execute(sql_load)
os.remove(tmpfile)
end = time.time()
print('存储数据库用时(s):', end - start)
return True
# This is a sample Python script.
# Press ⌥⇧X to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
def print_hi(name):
# Use a breakpoint in the code line below to debug your script.
print(f'Hi, {name}') # Press ⌃⇧B to toggle the breakpoint.
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
print_hi('PyCharm')
# See PyCharm help at https://www.jetbrains.com/help/pycharm/
import datetime
import os
import sys
import time
sys.path.append('..')
import config
import sqlalchemy as sa
import pandas as pd
from sqlalchemy.orm import sessionmaker
from utillities.internal_code import InternalCode
from utillities.obs_util import ObsUtil
class BaseSync(object):
def __init__(self, source_table=None, dest_table=None):
source_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.hb_db_user, config.hb_db_pwd,
config.hb_db_host, config.hb_db_port,
config.hb_db_database)
destination_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.rl_db_user,
config.rl_db_pwd,
config.rl_db_host,
config.rl_db_port,
config.rl_db_database)
# 源数据库
self.source = sa.create_engine(source_db)
# 目标数据库
self.destination = sa.create_engine(destination_db)
# 目标数据库Session
self.dest_session = sessionmaker(bind=self.destination, autocommit=False, autoflush=True)
self.source_table = source_table
self.dest_table = dest_table
self.internal_code = InternalCode()
# self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
self.obs_util = ObsUtil()
def get_start_date(self, table=None, type='trade_date'):
if table is None:
table = self.dest_table
sql = """select max({0}) as trade_date from `{1}` where flag=1;""".format(type, table)
trades_sets = pd.read_sql(sql, self.destination)
td = 20070101
if not trades_sets.empty:
ts = trades_sets['trade_date'][0]
if ts is None or ts == 'None':
return td
td = str(ts).replace('-', '')
return td
def check_data_exist(self, table, filter):
sql = """select count(1) as cou from `{0}` where {1} and flag=1;""".format(table, filter)
trades_sets = pd.read_sql(sql, self.destination)
if not trades_sets.empty:
ts = trades_sets['cou'][0]
if ts > 0:
return True
return False
def get_start_date_by_uplog(self):
sql = """select * from `{0}` where task_name='{1}' and flag=1;""".format('data_update_log', self.dest_table)
trades_sets = pd.read_sql(sql, self.destination)
td = 20070101
if not trades_sets.empty:
ts = trades_sets['max_tag'][0]
if ts is None or ts == 'None':
return td
td = str(ts)
return td
def delete_trade_data(self, trade_date, date_type='trade_date'):
session = self.dest_session()
# session.execute('''delete from `{0}` where trade_date={1}'''.format(self.dest_table, trade_date))
session.execute(
'''update `{0}` set flag = null,is_verify=2 where {1}={2} and flag=1'''.format(self.dest_table, date_type,
trade_date))
session.commit()
def delete_begin_end(self, start_date, end_date):
session = self.dest_session()
sql = f"update `{self.dest_table}` set flag = null,is_verify=2 where trade_date between {start_date} and {end_date} and flag=1"
session.execute(sql)
session.commit()
def create_table(self, create_sql, table=None, model=None, unique_key=None):
if table is None:
table = self.dest_table
drop_sql = """drop table if exists `{0}`;""".format(table)
session = self.dest_session()
session.execute(drop_sql)
session.execute(create_sql)
session.execute(
'''alter table `{0}` add `flag` int DEFAULT 1;'''.format(table))
session.execute(
'''alter table `{0}` add `is_verify` int DEFAULT 0;'''.format(table))
session.execute(
'''alter table `{0}` add `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP;'''.format(table))
session.execute(
'''alter table `{0}` add `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;'''.format(
table))
session.execute('''CREATE INDEX {0}_flag_index ON `{0}` (flag);'''.format(table))
session.execute('''CREATE INDEX {0}_is_verify_index ON `{0}` (is_verify);'''.format(table))
session.execute('''CREATE INDEX {0}_update_time_index ON `{0}` (update_time);'''.format(table))
if model == 'basic_info':
session.execute('''alter table `{0}` add `tmstamp` BIGINT NOT NULL;'''.format(table))
session.execute('''alter table `{0}` add `source_id` varchar(40) NOT NULL;'''.format(table))
session.execute('''CREATE INDEX {0}_tmstamp_index ON `{0}` (tmstamp);'''.format(table))
session.execute('''CREATE UNIQUE INDEX {0}_source_id_flag_index ON `{0}` (source_id,flag);'''.format(table))
if unique_key is not None:
unique_key.append('flag')
self.create_unique_index(unique_key)
session.commit()
session.close()
def create_unique_index(self, keys):
session = self.dest_session()
uni_key = ','.join(keys)
sql = f'CREATE UNIQUE INDEX unique_{self.dest_table}_index ON `{self.dest_table}` ({uni_key})'
session.execute(sql)
session.commit()
session.close()
def get_delete_histroy_by_uplog(self, table, delete_log):
sql = f"select ID,TableName,substring(KeyValue,8,10) as KeyValue from UpLog " \
f"where TableName='{table}' and OperType='D' and ID>{delete_log};"
return pd.read_sql(sql, self.source)
def delete_by_uplog(self, source_table=None, delete_log=0):
session = self.dest_session()
if source_table is None:
source_table = self.source_table
delete_list = self.get_delete_histroy_by_uplog(source_table, delete_log)
i, count = 0, 3000
if not delete_list.empty:
while i * count < len(delete_list):
print(f'正在删除{self.dest_table}表{i * count}-{(i + 1) * count}的数据')
slice_df = delete_list[i * count:(i + 1) * count]
delete_sql = f"update {self.dest_table} set flag = null,is_verify=2 " \
f"where source_id in {list(slice_df['KeyValue'])} and flag=1".replace('[', '(').replace(
']', ')')
session.execute(delete_sql)
session.commit()
i += 1
self.utils.update_update_log(delete_list['ID'].max(), '_del_' + self.dest_table)
session.close()
def _execute_dest_sql(self, sql):
session = self.dest_session()
session.execute(sql)
session.commit()
session.close()
def delete_data_by_source_id(self, ids):
start_time = time.time()
i, count = 0, 5000
while i * count < len(ids):
del_ids = ids[i * count:(i + 1) * count]
sql = '''update {0} set flag=null,is_verify=2 where source_id in {1} and flag=1'''.format(self.dest_table,
del_ids)
sql = sql.replace('[', '(').replace(']', ')')
self._execute_dest_sql(sql)
i = i + 1
print('delete over.', time.time() - start_time)
def save_update_log(self, task_name, max_tag):
session = self.dest_session()
sql = """insert into data_update_log (data_type,task_name,max_tag) values ('{0}','{1}','{2}')
ON DUPLICATE KEY UPDATE data_type='{0}',task_name='{1}',max_tag='{2}',is_verify=2""".format(
'basic_info',
task_name,
max_tag)
session.execute(sql)
session.commit()
session.close()
def insert_or_update(self, datas):
session = self.dest_session()
for i in range(datas.shape[0]):
data = datas.iloc[i]
values = ''
update = ''
title = ''
for j in range(len(data)):
index = data.index[j]
value = str(data[j]).replace("'", "\\'").replace("%", "\\%").replace("(", "\(").replace(")",
"\)").replace(
":", "\:")
title += """`{0}`,""".format(index)
values += """'{0}',""".format(value)
update += """`{0}`='{1}',""".format(index, value)
sql = '''insert into {0} ({1}) values({2}) ON DUPLICATE KEY UPDATE {3}'''.format(self.dest_table,
title[0:-1],
values[0:-1],
update[0:-1]
)
sql = sql.replace("'nan'", 'Null').replace("'None'", 'Null')
try:
session.execute(sql)
except Exception as e:
print(sql)
print(e.orig.msg)
session.commit()
session.close()
def save_to_mysql(self, df):
tmpfile = self.dest_table + '_mysql.csv'
sep = ','
newline = '\n'
start = time.time()
self.insert_or_update(df[:0])
# df[:0].to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
columns = df.columns.values.tolist()
print('creating ' + tmpfile, 'ok')
with open(tmpfile, mode='w', newline=newline) as fhandle:
df.to_csv(fhandle, na_rep='\\N', index=False, sep=sep)
print('loading ' + tmpfile, 'ok')
sql_load = "LOAD DATA LOCAL INFILE '{}' replace INTO TABLE {} FIELDS TERMINATED BY '{}' LINES TERMINATED BY '{}' IGNORE 1 LINES ({});" \
.format(tmpfile, self.dest_table, sep, newline, ','.join(columns))
self.destination.execute(sql_load)
os.remove(tmpfile)
end = time.time()
print('存储数据库用时(s):', end - start)
return True
def get_max_tm_source(self, table=None):
if table is None:
table = self.source_table
sql = 'select max(update_time) as tm from ' + table
trades_sets = pd.read_sql(sql, self.source)
tm = 0
if not trades_sets.empty:
tm = trades_sets['tm'][0]
return tm
def get_max_tm_destation(self, table=None):
if table == None:
table = self.dest_table
sql = 'select max(tmstamp)as tm from ' + table
trades_sets = pd.read_sql(sql, self.destination)
tm = 0
if not trades_sets.empty:
tm = trades_sets['tm'][0]
return tm
def get_max_tm_log(self, table=None):
if table is None:
table = self.dest_table
sql = """select max_tag from update_log where task_name='{0}'""".format(table)
trades_sets = pd.read_sql(sql, self.destination)
tm = 0
if not trades_sets.empty:
tm = trades_sets['max_tag'][0]
return tm
def update_update_log(self, tm, table=None):
if table is None:
table = self.dest_table
session = self.dest_session()
sql = """insert into update_log (task_name,max_tag) values ('{0}',{1})
ON DUPLICATE KEY UPDATE task_name='{0}',max_tag={1}""".format(table, tm)
sql = sql.replace('\n', '')
session.execute(sql)
session.commit()
session.close()
print('更新', table, '的max_tag为', tm)
def get_fd_secode(self, keep='all'):
sql = f'select security_code,fsymbol,end_date as _end_date from fd_basic_info where flag=1'
result = pd.read_sql(sql, self.destination)
result['_end_date'] = result['_end_date'].replace(datetime.date(1900, 1, 1), datetime.date(2099, 1, 1))
result = result.sort_values(['fsymbol', '_end_date'])
index = result.drop_duplicates(subset=['fsymbol'], keep='last').index
result.loc[index, '_end_date'] = datetime.date(2099, 1, 1)
if keep == 'last':
result = result.drop_duplicates(subset=['fsymbol'], keep='last')
result = result.drop(columns=['_end_date'])
return result
def get_manager_id(self, keep='all'):
sql = f'select distinct ps_code as manager_code,ps_name as manager_name,security_code from fd_manager_info where flag=1'
result = pd.read_sql(sql, self.destination)
return result
import sys
import pandas as pd
import config
sys.path.append('..')
sys.path.append('../..')
from db.mysql_db import DB
# from utillities.sync_util import SyncUtil
class BaseCheck(object):
def __init__(self, table):
source_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.rl_db_user,
config.rl_db_pwd,
config.rl_db_host,
config.rl_db_port,
config.rl_db_database)
self.db = DB(source_db)
self.table = table
# self.sync_util = SyncUtil()
def _rule_and(self, df, columns):
rule_df = pd.DataFrame({'result': [True] * len(df)})
for column in columns:
rule_df['result'] = rule_df['result'] & df[column]
return rule_df['result']
def _rule_gt(self, df, columns, values):
rule_df = df.loc[:, columns] > values
result = self._rule_and(rule_df, columns)
return result
def _rule_gte(self, df, columns, values):
rule_df = df.loc[:, columns] >= values
result = self._rule_and(rule_df, columns)
return result
def _rule_lt(self, df, columns, values):
rule_df = df.loc[:, columns] < values
result = self._rule_and(rule_df, columns)
return result
def _rule_lte(self, df, columns, values):
rule_df = df.loc[:, columns] <= values
result = self._rule_and(rule_df, columns)
return result
def _rule_notin(self, df, columns, values):
rule_df = ~df.loc[:, columns].isin(values)
result = self._rule_and(rule_df, columns)
return result
def _rule_in(self, df, columns, values):
rule_df = df.loc[:, columns].isin(values)
result = self._rule_and(rule_df, columns)
return result
def check_rule(self, df):
i = 0
for rule in self._rules:
i += 1
if rule['relation'] == 'GT':
df['rule' + str(i)] = self._rule_gt(df, rule['columns'], rule['values'])
elif rule['relation'] == 'GTE':
df['rule' + str(i)] = self._rule_gte(df, rule['columns'], rule['values'])
elif rule['relation'] == 'LT':
df['rule' + str(i)] = self._rule_lt(df, rule['columns'], rule['values'])
elif rule['relation'] == 'LTE':
df['rule' + str(i)] = self._rule_lte(df, rule['columns'], rule['values'])
elif rule['relation'] == 'IN':
df['rule' + str(i)] = self._rule_in(df, rule['columns'], rule['values'])
elif rule['relation'] == 'NOTIN':
df['rule' + str(i)] = self._rule_notin(df, rule['columns'], rule['values'])
df['rule'] = self._rule_and(df, self._rule_tag)
return df
#!/usr/bin/env python
# coding=utf-8
import argparse
import time
from base_check import BaseCheck
class DefaultCheck(BaseCheck):
def __init__(self, table):
super(DefaultCheck, self).__init__(table)
def _get_not_check_count(self):
sql = f'select count(1) as cou from `{self.table}` where is_verify != 1;'
result = self.db.query_sql(sql)
return result['cou'][0]
def _check_data(self, check_size):
sql = f'update `{self.table}` set is_verify=1 where is_verify!=1 order by update_time limit {check_size};'
self.db._execute_sql(sql)
def do_check(self):
need_check_count = self._get_not_check_count()
check_size = 200000
if need_check_count > 0:
print('need check count:', need_check_count)
for i in range(int(need_check_count / check_size) + 1):
start = time.time()
self._check_data(check_size)
print(f'check success:{check_size},time:{time.time() - start}')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--table', type=str, default='rl_fof_fund_multi_asset_config')
parser.add_argument('--check', type=bool, default=False)
args = parser.parse_args()
processor = DefaultCheck(args.table)
if args.check:
processor.do_check()
,source_id,fsymbol,trade_date,stock_concent,entry_time,is_valid,disable,tmstamp,security_code,end_date,flag
1005,1404355,003477,2019-12-31 00:00:00,1.0,2021-10-31 17:22:52,1,0,2021-10-31 17:22:52,1030008282,2019-12-22,1
2081,1405431,002850,2021-03-31 00:00:00,0.4501,2021-10-31 17:23:02,1,0,2021-10-31 17:23:02,1030006948,2021-03-26,1
2767,1406066,003694,2018-06-30 00:00:00,1.0,2021-10-31 17:23:08,1,0,2021-10-31 17:23:08,1030008426,2018-06-13,1
2875,1406174,002976,2018-09-30 00:00:00,1.0,2021-10-31 17:23:10,1,0,2021-10-31 17:23:10,1030007080,2018-08-21,1
3465,1406740,161721,2021-03-31 00:00:00,0.7751,2021-10-31 17:23:16,1,0,2021-10-31 17:23:16,1030004178,2021-01-01,1
3467,1406741,161721,2021-06-30 00:00:00,0.9958,2021-10-31 17:23:16,1,0,2021-10-31 17:23:16,1030004178,2021-01-01,1
3469,1406742,161721,2021-09-30 00:00:00,0.9938,2021-10-31 17:23:16,1,0,2021-10-31 17:23:16,1030004178,2021-01-01,1
3680,1406952,009547,2021-06-30 00:00:00,0.5948,2021-10-31 17:23:18,1,0,2021-10-31 17:23:18,1030016864,2021-06-05,1
5323,1408570,160632,2021-01-11 00:00:00,0.8038,2021-10-31 17:23:36,1,0,2021-10-31 17:23:36,1030004676,2021-01-01,1
5325,1408571,160632,2021-03-31 00:00:00,0.8087,2021-10-31 17:23:36,1,0,2021-10-31 17:23:36,1030004676,2021-01-01,1
5327,1408572,160632,2021-06-30 00:00:00,0.8078,2021-10-31 17:23:36,1,0,2021-10-31 17:23:36,1030004676,2021-01-01,1
5329,1408573,160632,2021-09-30 00:00:00,0.7966,2021-10-31 17:23:36,1,0,2021-10-31 17:23:36,1030004676,2021-01-01,1
5557,1408778,168203,2021-03-31 00:00:00,0.6146,2021-10-31 17:23:39,1,0,2021-10-31 17:23:39,1030004983,2021-01-01,1
5559,1408779,168203,2021-06-04 00:00:00,0.6099,2021-10-31 17:23:39,1,0,2021-10-31 17:23:39,1030004983,2021-01-01,1
5561,1408780,168203,2021-06-30 00:00:00,0.5942,2021-10-31 17:23:39,1,0,2021-10-31 17:23:39,1030004983,2021-01-01,1
5563,1408781,168203,2021-09-30 00:00:00,0.6191,2021-10-31 17:23:39,1,0,2021-10-31 17:23:39,1030004983,2021-01-01,1
7998,1411203,162308,2017-06-30 00:00:00,1.0,2021-10-31 17:24:05,1,0,2021-10-31 17:24:05,2030004051,2017-04-28,1
8180,1411385,001005,2018-06-30 00:00:00,1.0,2021-10-31 17:24:06,1,0,2021-10-31 17:24:06,1030004415,2018-06-05,1
8765,1411970,002566,2018-06-30 00:00:00,0.9999,2021-10-31 17:24:12,1,0,2021-10-31 17:24:12,1030006669,2018-05-19,1
8905,1412110,003918,2018-06-30 00:00:00,0.9222,2021-10-31 17:24:13,1,0,2021-10-31 17:24:13,1030009156,2018-06-22,1
9897,1413048,165519,2021-01-27 00:00:00,0.5816,2021-10-31 17:24:22,1,0,2021-10-31 17:24:22,1030002379,2021-01-01,1
9899,1413049,165519,2021-03-31 00:00:00,0.5727,2021-10-31 17:24:22,1,0,2021-10-31 17:24:22,1030002379,2021-01-01,1
9901,1413050,165519,2021-06-30 00:00:00,0.6041,2021-10-31 17:24:22,1,0,2021-10-31 17:24:22,1030002379,2021-01-01,1
9903,1413051,165519,2021-09-30 00:00:00,0.6005,2021-10-31 17:24:22,1,0,2021-10-31 17:24:22,1030002379,2021-01-01,1
10060,1413207,160640,2020-09-30 00:00:00,0.465,2021-10-31 17:24:24,1,0,2021-10-31 17:24:24,1030004882,2020-07-08,1
import sys
sys.path.append('..')
sys.path.append('../..')
from db.mysql_db import DB
import config
class BasePublish(object):
def __init__(self, source_table=None, dest_table=None):
source_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.rl_db_user,
config.rl_db_pwd,
config.rl_db_host,
config.rl_db_port,
config.rl_db_database)
destination_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.pub_db_user,
config.pub_db_pwd,
config.pub_db_host,
config.pub_db_port,
config.pub_db_database)
# 源数据库
self.db = DB(source_db)
self.pub_db = DB(destination_db)
self.source_table = source_table
self.dest_table = dest_table
def _get_start_date(self):
sql = """select max(trade_date) as trade_date from `{0}`;""".format(self.dest_table)
trades_sets = self.pub_db.query_sql(sql)
td = 20070101
if not trades_sets.empty:
ts = trades_sets['trade_date'][0]
if ts is None or ts == 'None':
return td
td = str(ts).replace('-', '')
return td
def _update_publish_log(self, tm, table=None):
if table is None:
table = self.dest_table
sql = """insert into publish_log (task_name,max_tag) values ('{0}','{1}')
ON DUPLICATE KEY UPDATE task_name='{0}',max_tag='{1}'""".format(table, tm)
sql = sql.replace('\n', '')
self.db._execute_sql(sql)
print(f'更新{table}的max_tag为{tm}')
def _get_source_max_update_time(self, table=None):
if table is None:
table = self.source_table
sql = f'select max(update_time)as tm from `{table}`'
result = self.db.query_sql(sql)
result['tm'] = result['tm'].astype('str')
tm = '0'
if not result.empty:
tm = result['tm'][0]
return tm
def _get_max_tm_log(self, table=None):
if table is None:
table = self.dest_table
sql = """select max_tag from publish_log where task_name='{0}'""".format(table)
trades_sets = self.db.query_sql(sql)
tm = '0'
if not trades_sets.empty:
tm = str(trades_sets['max_tag'][0])
return tm
def get_source_table_sql(self):
sql = f'SHOW CREATE table `{self.source_table}`;'
result = self.db.query_sql(sql)
return result['Create Table'][0]
# 创建目标表
def create_dest_tables(self):
self._update_publish_log(0)
self.pub_db._execute_sql(f'drop table if exists `{self.dest_table}`')
create_sql = self.get_source_table_sql()
self.pub_db._execute_sql(create_sql)
def get_table_columns(self, table, database, db):
sql = """select column_name,column_type
from information_schema.columns
where table_name='{0}' and table_schema='{1}'
order by column_name;
""".format(table, database)
result = db.query_sql(sql)
return result
def check_table_columns(self):
source_table_column = self.get_table_columns(self.source_table, config.db_database, self.db)
dest_table_column = self.get_table_columns(self.dest_table, config.pub_db_database, self.pub_db)
if dest_table_column.empty:
self.create_dest_tables()
return
from pandas.testing import assert_frame_equal
try:
assert_frame_equal(source_table_column, dest_table_column)
print(f'{self.source_table}与发布库{self.dest_table}表结构一致。')
except Exception as e:
print(f'{self.source_table}与发布库{self.dest_table}表结构不一致,请检查数据库。')
print(source_table_column)
print(dest_table_column)
sys.exit(1)
# self.create_dest_tables()
#!/usr/bin/env python
# coding=utf-8
import argparse
import sys
import time
import pandas as pd
from base_publish import BasePublish
sys.path.append('..')
sys.path.append('../..')
import config
class DefaultPublish(BasePublish):
def __init__(self, source_table, dest_table):
super(DefaultPublish, self).__init__(source_table, dest_table)
def get_datas(self, update_time, max_tm, id=0):
start_time = time.time()
print(f'正在查询 {self.source_table}表,update_time>={update_time},id>{id}')
sql = """select *
from `{0}` a
where 1=1 and a.update_time>'{1}' and a.update_time<='{2}' and id>{3} and is_verify=1
order by a.id
limit 20000; """.format(self.source_table, update_time, max_tm, id)
trades_sets = self.db.query_sql(sql)
print('query data over.', time.time() - start_time)
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
while True:
result_list = self.get_datas(tm, max_tm, id)
if not result_list.empty:
ids = list(result_list['id'])
self.pub_db.delete_data_by_id(ids, self.dest_table)
start_time = time.time()
result_list.drop(columns=['update_time'], inplace=True)
self.pub_db.save(result_list, self.dest_table)
id = ids[-1]
print('save over.', time.time() - start_time)
else:
self._update_publish_log(max_tm)
print('data publish over.')
break
def do_update(self):
max_tm = self._get_source_max_update_time()
log_tm = self._get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--source_table', type=str, default='rl_fof_fund_multi_asset_config')
parser.add_argument('--dest_table', type=str, default=None)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.dest_table is None:
args.dest_table = args.source_table
processor = DefaultPublish(args.source_table, args.dest_table)
if args.rebuild:
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor.check_table_columns()
processor.do_update()
# coding=utf-8
import argparse
import datetime
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_concent', dest_table='rl_fof_concent')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`trade_date` date DEFAULT NULL COMMENT '日期',
`stock_concent` decimal(20,6) DEFAULT NULL COMMENT '前十大持股占股票市值比',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
TRADE_DT as trade_date,
stock_concent as stock_concent,
create_time as entry_time,
delete_flag as is_valid,
disable,
update_time as tmstamp
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>{4}
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
# trades_sets['flag'] = 1
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode()
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
result_list._end_date.fillna(datetime.date(2099, 1, 1), inplace=True)
result_list.trade_date = pd.to_datetime(result_list.trade_date)
result_list = result_list[result_list.trade_date <= result_list._end_date]
result_list = result_list.sort_values(by=['_end_date'])
result_list.drop_duplicates(subset=['source_id'], keep='first', inplace=True)
result_list.drop(columns=['_end_date'], inplace=True)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_fund_base_info',
dest_table='rl_fof_fund_base_info')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`rl_fund_code` varchar(40) default null comment '基金代码',
`rl_primary_class_code` varchar(40) default null comment 'rl一级分类code',
`rl_primary_class_name` varchar(40) default null comment 'rl一级分类name',
`rl_second_class_code` varchar(40) default null comment 'rl二级分类code',
`rl_second_class_name` varchar(40) default null comment 'rl二级分类name',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
hb_fund_code as rl_fund_code,
hb_primary_class_code as rl_primary_class_code,
hb_primary_class_name as rl_primary_class_name,
hb_second_class_code as rl_second_class_code,
hb_second_class_name as rl_second_class_name,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>{4}
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode(keep='last')
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
result_list.rl_primary_class_code = result_list.rl_primary_class_code.str[2:]
result_list.rl_second_class_code = result_list.rl_second_class_code.str[2:]
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_fund_multi_asset_config',
dest_table='rl_fof_fund_multi_asset_config')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`rl_cycle` varchar(10) default null comment 'rl周期',
`start_date` varchar(8) default null comment '该周期的起始时间',
`end_date` varchar(8) default null comment '该周期的结束时间',
`snwr` decimal(20,6) default null comment '股票占净值比(stock net worth ratio)',
`bnwr` decimal(20,6) default null comment '债券占净值比(bond net worth ratio)',
`fnwr` decimal(20,6) default null comment '基金占净值比(fund net worth ratio)',
`hsnwr` decimal(20,6) default null comment '港股占净值比(hstock net worth ratio)',
`cbnwr` decimal(20,6) default null comment '可转债占净值比(cbond net worth ratio)',
`hssnwr` decimal(20,6) default null comment '港股占权益净值比(hstock stocknet worth ratio)',
`cbbnwr` decimal(20,6) default null comment '可转债占债券净值比(cbond bondnet worth ratio)',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
hb_cycle as rl_cycle,
start_date,
end_date,
snwr,
bnwr,
fnwr,
hsnwr,
cbnwr,
hssnwr,
cbbnwr,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>{4}
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode(keep='last')
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_fund_multi_config_info',
dest_table='rl_fof_fund_multi_config_info')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`data_type` varchar(8) default null comment '持仓类型(1:mainstockholding,2:allstockholding)',
`rl_cycle` varchar(8) default null comment 'rl周期',
`start_date` varchar(8) default null,
`end_date` varchar(8) default null,
`change_rate` decimal(20,6) default null comment '换手率',
`pe` decimal(20,6) default null comment '市盈率(price earnings ratio)',
`pb` decimal(20,6) default null comment '平均市净率',
`roe` decimal(20,6) default null comment '净资产收益率(return on equity)',
`gronp` decimal(20,6) default null comment '净利润增速(growth rate of net profit)',
`dividend` decimal(20,6) default null comment '股息率',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
data_type,
hb_cycle as rl_cycle,
start_date,
end_date,
change_rate,
pe,
pb,
roe,
gronp,
dividend,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>{4}
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode(keep='last')
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_fund_multi_industry_config',
dest_table='rl_fof_fund_multi_industry_config')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`data_type` varchar(8) default null comment '持仓类型(1:mainstockholding,2:allstockholding)',
`rl_cycle` varchar(10) default null comment '融量周期',
`start_date` varchar(8) default null,
`end_date` varchar(8) default null,
`top_industry` text,
`ci005001` decimal(20,6) default null comment '石油石化',
`ci005002` decimal(20,6) default null comment '煤炭',
`ci005003` decimal(20,6) default null comment '有色金属',
`ci005004` decimal(20,6) default null comment '电力及公用事业',
`ci005005` decimal(20,6) default null comment '钢铁',
`ci005006` decimal(20,6) default null comment '基础化工',
`ci005007` decimal(20,6) default null,
`ci005008` decimal(20,6) default null,
`ci005009` decimal(20,6) default null,
`ci005010` decimal(20,6) default null,
`ci005011` decimal(20,6) default null,
`ci005012` decimal(20,6) default null,
`ci005013` decimal(20,6) default null,
`ci005014` decimal(20,6) default null,
`ci005015` decimal(20,6) default null,
`ci005016` decimal(20,6) default null,
`ci005017` decimal(20,6) default null,
`ci005018` decimal(20,6) default null,
`ci005019` decimal(20,6) default null,
`ci005020` decimal(20,6) default null,
`ci005021` decimal(20,6) default null,
`ci005022` decimal(20,6) default null,
`ci005023` decimal(20,6) default null,
`ci005024` decimal(20,6) default null,
`ci005025` decimal(20,6) default null,
`ci005026` decimal(20,6) default null,
`ci005027` decimal(20,6) default null,
`ci005028` decimal(20,6) default null,
`ci005029` decimal(20,6) default null,
`ci005030` decimal(20,6) default null,
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
hb_cycle as rl_cycle,
data_type,
start_date,
end_date,
top_industry,
ci005001,
ci005002,
ci005003,
ci005004,
ci005005,
ci005006,
ci005007,
ci005008,
ci005009,
ci005010,
ci005011,
ci005012,
ci005013,
ci005014,
ci005015,
ci005016,
ci005017,
ci005018,
ci005019,
ci005020,
ci005021,
ci005022,
ci005023,
ci005024,
ci005025,
ci005026,
ci005027,
ci005028,
ci005029,
ci005030,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>{4}
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode(keep='last')
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_fund_multi_stock_style',
dest_table='rl_fof_fund_multi_stock_style')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`data_type` varchar(8) default null comment '持仓类型(1:mainstockholding,2:allstockholding)',
`rl_cycle` varchar(10) default null comment '融量周期',
`start_date` varchar(8) default null,
`end_date` varchar(8) default null,
`indicator_name` varchar(40) default null comment '因子名称',
`expose_value` decimal(20,6) default null comment '组合暴露度',
`expose_score` decimal(20,6) default null comment '组合暴露度得分',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='权益类基金十大重仓因子得分';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
hb_cycle as rl_cycle,
data_type,
start_date,
end_date,
indicator_name,
expose_value,
expose_score,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>{4}
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode(keep='last')
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import datetime
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_fund_performance_score',
dest_table='rl_fof_fund_performance_score')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`rl_cycle` varchar(10) default null comment '周期',
`fd_type` int(11) default null comment '分类 1、一级分类 2、二级分类 3、大盘vs小盘 4、价值vs成长',
`trade_date` varchar(8) default null comment '日期',
`returnrate_score` decimal(20,6) default null comment '收益率得分',
`returnrate_rank` decimal(20,6) default null comment '收益率排名',
`returnrate_count` decimal(20,6) default null comment '收益率总样本数',
`sharp_score` decimal(20,6) default null comment '夏普得分',
`sharp_rank` decimal(20,6) default null comment '夏普排名',
`sharp_count` decimal(20,6) default null comment '夏普总样本数',
`vol_score` decimal(20,6) default null comment '年化波动率得分',
`vol_rank` decimal(20,6) default null comment '年化波动率排名',
`vol_count` decimal(20,6) default null comment '年化波动率样本数',
`karma_score` decimal(20,6) default null comment '卡玛比率得分',
`karma_rank` decimal(20,6) default null comment '卡玛比率排名',
`karma_count` decimal(20,6) default null comment '卡玛比率样本数',
`info_ratio_score` decimal(20,6) default null comment '信息比率得分',
`info_ratio_rank` decimal(20,6) default null comment '信息比率排名',
`info_ratio_count` decimal(20,6) default null comment '信息比率样本数',
`annualreturn_rate_score` decimal(20,6) default null comment '年化收益率得分',
`annualreturn_rate_rank` decimal(20,6) default null comment '年化收益率排名',
`annualreturn_rate_count` decimal(20,6) default null comment '年化收益率样本数',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
hb_cycle as rl_cycle,
type as fd_type,
trade_dt as trade_date,
returnrate_score,
returnrate_rank,
returnrate_count,
sharp_score,
sharp_rank,
sharp_count,
vol_score,
vol_rank,
vol_count,
karma_score,
karma_rank,
karma_count,
info_ratio_score,
info_ratio_rank,
info_ratio_count,
annualreturn_rate_score,
annualreturn_rate_rank,
annualreturn_rate_count,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>'{4}'
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode()
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
result_list._end_date.fillna(datetime.date(2099, 1, 1), inplace=True)
result_list.trade_date = pd.to_datetime(result_list.trade_date)
result_list = result_list[result_list.trade_date <= result_list._end_date]
result_list = result_list.sort_values(by=['_end_date'])
result_list.drop_duplicates(subset=['source_id'], keep='first', inplace=True)
result_list.drop(columns=['_end_date'], inplace=True)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import datetime
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_fund_performance_value',
dest_table='rl_fof_fund_performance_value')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`rl_cycle` varchar(10) default null comment '融量周期',
`start_date` varchar(8) default null,
`end_date` varchar(8) default null,
`trade_date` varchar(8) default null comment '日期',
`returnrate` decimal(20,6) default null comment '区间收益率',
`sharp` decimal(20,6) default null comment '夏普比率',
`vol` decimal(20,6) default null comment '年化波动率',
`karma` decimal(20,6) default null comment '卡玛比率',
`info_ratio` decimal(20,6) default null comment '信息比率',
`annualreturn_rate` decimal(20,6) default null comment '年化收益率',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
hb_cycle as rl_cycle,
start_date,
end_date,
trade_dt as trade_date,
returnrate,
sharp,
vol,
karma,
info_ratio,
annualreturn_rate,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>'{4}'
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode()
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
result_list._end_date.fillna(datetime.date(2099, 1, 1), inplace=True)
result_list.trade_date = pd.to_datetime(result_list.trade_date)
result_list = result_list[result_list.trade_date <= result_list._end_date]
result_list = result_list.sort_values(by=['_end_date'])
result_list.drop_duplicates(subset=['source_id'], keep='first', inplace=True)
result_list.drop(columns=['_end_date'], inplace=True)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_manager_product',
dest_table='rl_fof_manager_product')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`manager_code` varchar(40) default null comment '基金经理id',
`manager_name` varchar(40) default null comment '基金经理名称',
`year_value` decimal(20,6) default null comment '年化收益率',
`start_date` varchar(8) default null comment '任职开始日期',
`end_date` varchar(8) default null comment '任职结束日期',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
manager_id,
manager_name,
year_value,
startdate as start_date,
enddate as end_date,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>'{4}'
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def get_all_fsymbol_mapping_manager(self):
sql = f'select distinct left(hb_fund_code,6) as fsymbol,manager_id,manager_name from hb_fof_manager_product'
data = pd.read_sql(sql, self.source)
return data
def get_fill_manager_mapping(self):
return pd.DataFrame({'manager_code': ['30037859', '30040261', '30048946', '30055079', '30069892', '30132536',
'30477859', '30547580', '30561092', '30597376', '30631813', '30690081',
'30723675', '30070365', '30220330'],
'manager_id': ['00041', '00120', '00731', '00824', '01046', '01148', '13C27C8', '141CBAC',
'14260E6', '14FADEF', '151EC3D', '155A769', '156A25C', '170363', '17F6FD']},
)
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode(keep='last')
manager = self.get_manager_id()
fsymbol_mapping_manager = self.get_all_fsymbol_mapping_manager()
# wy_index = fsymbol_mapping_manager[
# (fsymbol_mapping_manager.fsymbol.isin(['003013', '003014'])) & (
# fsymbol_mapping_manager.manager_name == '王玥')].index
# fsymbol_mapping_manager.loc[wy_index, 'manager_id'] = fsymbol_mapping_manager.iloc[
# wy_index, 'manager_id'] + '_2'
fsymbol_mapping_manager = pd.merge(fsymbol_mapping_manager, fd_secode, on='fsymbol', how='left')
manager = pd.merge(manager, fsymbol_mapping_manager, on=['security_code', 'manager_name'], how='inner')
# 补充由于姓名格式问题导致没有匹配上的id
manager = manager.append(self.get_fill_manager_mapping())
manager = manager[['manager_id', 'manager_code']].drop_duplicates(subset=['manager_id', 'manager_code'],
keep='last')
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list = pd.merge(result_list, manager, on=['manager_id'], how='left')
result_list['flag'] = np.where(result_list.security_code.isna() | result_list.manager_id.isna(),
None,
1)
result_list.drop(columns=['manager_id'], inplace=True)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_multi_brinson_industry_attr_total',
dest_table='rl_fof_multi_brinson_industry_attr_total')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`benchmark` varchar(20) default null comment '基准',
`data_type` varchar(8) default null comment '持仓类型(1:mainstockholding,2:allstockholding)',
`fd_type` tinyint(1) default null comment '0 普通 1指数增强 2行业主题',
`hb_cycle` varchar(8) default null comment '华宝周期',
`level` tinyint(1) default null comment '行业级别',
`start_date` varchar(8) default null comment '开始日期',
`end_date` varchar(8) default null comment '结束日期',
`ar` decimal(20,6) default null,
`sr` decimal(20,6) default null,
`ir` decimal(20,6) default null,
`tr` decimal(20,6) default null,
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
benchmark,
data_type,
type as fd_type,
hb_cycle,
level,
start_date,
end_date,
ar,
sr,
ir,
tr,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>{4}
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode(keep='last')
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=True)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
# coding=utf-8
import argparse
import pandas as pd
import numpy as np
from base_sync import BaseSync
class SyncFdManagerFactor(BaseSync):
def __init__(self):
super(SyncFdManagerFactor, self).__init__(source_table='hb_fof_multi_concent',
dest_table='rl_fof_multi_concent')
def create_dest_tables(self):
self.update_update_log(0)
create_sql = """CREATE TABLE `{0}` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`security_code` varchar(40) DEFAULT NULL,
`fsymbol` varchar(40) DEFAULT NULL,
`rl_cycle` varchar(10) default null comment '融量周期',
`start_date` varchar(8) default null comment '该周期的起始时间',
`end_date` varchar(8) default null comment '该周期的结束时间',
`stock_concent` decimal(20,6) default null comment '前十大持股占股票市值比',
`entry_time` datetime DEFAULT NULL COMMENT '创建日期',
`is_valid` tinyint(1) DEFAULT '1' COMMENT '是否有效',
`disable` tinyint(1) DEFAULT NULL COMMENT '是否为正式数据 0为正式 1为tmp数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='持股集中度';
""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql, model='basic_info')
def get_sql(self):
sql = """select
{0} as source_id,
left(HB_FUND_CODE,6) as fsymbol,
hb_cycle as rl_cycle,
start_date,
end_date,
stock_concent,
disable,
create_time as entry_time,
update_time as tmstamp,
DELETE_FLAG as is_valid
from {1} a
where 1=1 and update_time > '{2}' and update_time<='{3}' and {0}>{4}
order by {0} limit 10000"""
return sql
def get_datas(self, tm, max_tm, id=0):
print(f'正在查询 {self.source_table}表,update_time>={tm},id>{id}')
sql = self.get_sql()
sql = sql.format('OBJECT_ID', self.source_table, tm, max_tm, id).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
trades_sets['is_valid'] = -(trades_sets['is_valid'] - 1)
trades_sets.tmstamp = trades_sets.tmstamp.astype(int) / 1000000000
return trades_sets
def update_table_data(self, tm, max_tm):
id = 0
fd_secode = self.get_fd_secode(keep='last')
while True:
result_list = self.get_datas(tm, max_tm, id)
result_list = pd.merge(result_list, fd_secode, on='fsymbol', how='left')
result_list['flag'] = np.where(result_list.security_code.isna(),
None,
1)
if not result_list.empty:
try:
ids = list(result_list['source_id'])
self.delete_data_by_source_id(ids)
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
id = ids[-1]
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
else:
self.update_update_log(max_tm.timestamp())
break
def do_update(self):
max_tm = self.get_max_tm_source()
log_tm = self.get_max_tm_log()
log_tm = pd.to_datetime(log_tm * 1000000000)
if max_tm > log_tm:
self.update_table_data(log_tm, max_tm)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncFdManagerFactor()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncFdManagerFactor()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import numpy as np
import pandas as pd
import time
import warnings
warnings.filterwarnings("ignore")
class CalcTools(object):
@classmethod
def is_zero(cls, data_frame):
return np.where(data_frame > -0.000001,
np.where(data_frame < 0.000001, True, False)
, False)
def change_single(params):
fundamentals_sets_year = params['fundamentals_sets_year']
sub_columns = params['sub_columns']
columns = fundamentals_sets_year.columns.values.tolist()
miss_columns = list(set(columns) - set(sub_columns))
def year_update(df):
# df.loc[df.index, sub_columns] = df[sub_columns] - df[sub_columns].shift(-1).fillna(0)
new_sub_df = df[sub_columns] - df[sub_columns].shift(1).fillna(0)
new_df = df[miss_columns].join(new_sub_df)
return new_df
stock_list = list(set(fundamentals_sets_year['company_id']))
new_fundamentals_sets = pd.DataFrame()
i = 0
for stock in stock_list:
i += 1
if i % 100 == 0:
print(params['report_year'], ':', i, '/', len(stock_list))
new_fundamentals_sets = new_fundamentals_sets.append(year_update(
fundamentals_sets_year[
fundamentals_sets_year['company_id'] == stock]))
return new_fundamentals_sets
def change_single_by_symbol(params):
fundamentals_sets = params['fundamentals_sets_symbol']
fundamentals_sets.reset_index(drop=True, inplace=True)
sub_columns = params['sub_columns']
columns = fundamentals_sets.columns.values.tolist()
miss_columns = list(set(columns) - set(sub_columns))
def year_update(df):
# df.loc[df.index, sub_columns] = df[sub_columns] - df[sub_columns].shift(-1).fillna(0)
new_sub_df = df[sub_columns] - df[sub_columns].shift(1).fillna(0)
new_df = df[miss_columns].join(new_sub_df)
return new_df
new_fundamentals_sets = pd.DataFrame()
year_list = list(set(fundamentals_sets['report_year']))
year_list.sort(reverse=True)
stock_list = list(set(fundamentals_sets['company_id']))
i = 0
start = time.time()
for stock in stock_list:
i += 1
if i % 100 == 0:
end = time.time()
print('cpu', params['cpu'], ':', i, '/', len(stock_list), ';time(s):', end - start)
start = end
for year in year_list:
fundamentals_sets_stock = fundamentals_sets[fundamentals_sets['company_id'] == stock]
new_fundamentals_sets = new_fundamentals_sets.append(year_update(
fundamentals_sets_stock[
fundamentals_sets_stock['report_year'] == year]))
return new_fundamentals_sets
# df必须是以PUBLISHDATE倒序排布
def get_line(df, pub_date):
df = df[df['PUBLISHDATE'] <= pub_date]
if df.empty:
return pd.Series()
return df.iloc[0]
# 按照发布日期计算mrq
@classmethod
def df_calc_mrq_by_publish_date(self, df1, df2, static_columns, sub_columns):
df = pd.DataFrame()
pub_date_list = list(set(df1['PUBLISHDATE']).union(df2['PUBLISHDATE']))
for pub_date in pub_date_list:
line1 = self.get_line(df1, pub_date)
line2 = self.get_line(df2, pub_date)
if not line1.empty:
if not line2.empty:
line = pd.concat([line1[static_columns], line1[sub_columns] - line2[sub_columns]])
line['PUBLISHDATE'] = pub_date
if line1['REPORTTYPE'] == '1' and line2['REPORTTYPE'] == '1':
line['REPORTTYPE'] = '1'
else:
line['REPORTTYPE'] = '3'
# 若无上季度数据,则将需要计算单季的字段置空,记录保留
else:
line1[sub_columns] = None
line = line1
df = df.append(line, ignore_index=True)
return df
# 按照发布日期填充数据,若存在report_type的1和3是同一天发布,即数据相同的情况下只保留1的数据
@classmethod
def fill_by_order_publish_date(self, df1, df2, columns):
if df2.empty or df1.empty:
return df1
else:
df = pd.DataFrame()
pub_date_list = list(set(df1['PUBLISHDATE']).union(df2['PUBLISHDATE']))
for pub_date in pub_date_list:
line1 = self.get_line(df1, pub_date)
line2 = self.get_line(df2, pub_date)
if not line1.empty:
if not line2.empty:
line1[columns] = line2[columns]
line1['PUBLISHDATE'] = pub_date
df = df.append(line1)
return df
def fill_na_by_before(params):
fundamentals_sets = params['fundamentals_sets_symbol']
half_fill_date_arr = params['half_fill_date_arr']
year_fill_date_arr = params['year_fill_date_arr']
half_year_columns = params['half_year_columns']
year_columns = params['year_columns']
fundamentals_sets.sort_values(by=['COMPCODE', 'ENDDATE', 'PUBLISHDATE', 'REPORTTYPE'],
ascending=[False, False, False, True],
inplace=True)
fundamentals_groupd = fundamentals_sets.groupby(['COMPCODE'])
# 记录需要补充的未填充报告期数据
supplement_half, supplement_year = [], []
df = pd.DataFrame(half_fill_date_arr)
if not df.empty:
supplement_half = list(set(df[1]))
df = pd.DataFrame(year_fill_date_arr)
if not df.empty:
supplement_year = list(set(df[1]))
df = pd.DataFrame()
i = 0
start = time.time()
for k, g in fundamentals_groupd:
i += 1
if i % 100 == 0:
end = time.time()
print('cpu', params['cpu'], ':', i, '/', len(fundamentals_groupd), ';time(s):', end - start)
start = end
df_fill_half_year = pd.DataFrame()
for fill_date in half_fill_date_arr:
try:
fill_df = CalcTools.fill_by_order_publish_date(g[g['ENDDATE'] == fill_date[0]],
g[g['ENDDATE'] == fill_date[1]],
half_year_columns)
df_fill_half_year = df_fill_half_year.append(fill_df)
except Exception as e:
print(e)
# fill_df只包含被填充过的报告期数据,无需填充的部分在此批量添加
df_fill_half_year = df_fill_half_year.append(g[g['ENDDATE'].isin(supplement_half)])
df_fill_year = pd.DataFrame()
for fill_date in year_fill_date_arr:
try:
fill_df = CalcTools.fill_by_order_publish_date(
df_fill_half_year[df_fill_half_year['ENDDATE'] == fill_date[0]],
df_fill_half_year[df_fill_half_year['ENDDATE'] == fill_date[1]],
year_columns)
df_fill_year = df_fill_year.append(fill_df)
except Exception as e:
print(e)
# fill_df只包含被填充过的报告期数据,无需填充的部分在此批量添加
df_fill_year = df_fill_year.append(g[g['ENDDATE'].isin(supplement_year)])
if not df_fill_year.empty:
df = df.append(df_fill_year)
else:
df = df.append(df_fill_half_year)
return df
def mrq_by_symbol(params):
fundamentals_sets = params['fundamentals_sets_symbol']
fundamentals_sets.reset_index(drop=True, inplace=True)
sub_columns = params['sub_columns']
half_year_sub_columns = params['half_year_columns']
year_sub_columns = params['year_columns']
mrq_sub_columns = list(set(params['sub_columns']) - set(half_year_sub_columns) - set(year_sub_columns))
columns = fundamentals_sets.columns.values.tolist()
static_columns_out_sub = list(set(columns) - set(mrq_sub_columns))
miss_columns_out_half_sub = list(set(columns) - set(half_year_sub_columns))
# fundamentals_sets.sort_values(by=['COMPCODE', 'ENDDATE', 'PUBLISHDATE'], ascending=False, inplace=True)
fundamentals_sets.sort_values(by=['COMPCODE', 'ENDDATE', 'PUBLISHDATE', 'REPORTTYPE'],
ascending=[False, False, False, True],
inplace=True)
def year_update(df):
ds = df.sort_values(by='REPORTDATETYPE')
report_date_type = list(set(df['REPORTDATETYPE']))
new_sub_df = pd.DataFrame(columns=sub_columns)
if '1' in report_date_type:
new_sub_df = new_sub_df.append(ds[ds['REPORTDATETYPE'] == '1'])
report_date_type.remove('1')
for i in report_date_type:
try:
# 计算所有可能出现的单季情况
sub_s = CalcTools.df_calc_mrq_by_publish_date(ds[ds['REPORTDATETYPE'] == i],
ds[ds['REPORTDATETYPE'] == str(int(i) - 1)],
static_columns_out_sub,
mrq_sub_columns)
# 计算半年报字段单季数据
if i == '4' and not sub_s.empty:
if '2' in report_date_type:
sub_s = CalcTools.df_calc_mrq_by_publish_date(sub_s, ds[ds['REPORTDATETYPE'] == '2'],
miss_columns_out_half_sub,
half_year_sub_columns)
else:
sub_s[half_year_sub_columns] = None
new_sub_df = new_sub_df.append(sub_s, ignore_index=True)
except Exception as e:
print(e)
new_sub_df = new_sub_df.append({}, ignore_index=True)
new_sub_df.reset_index(inplace=True, drop=True)
return new_sub_df
new_fundamentals_sets = pd.DataFrame()
year_list = list(set(fundamentals_sets['REPORTYEAR']))
year_list.sort(reverse=True)
stock_list = list(set(fundamentals_sets['COMPCODE']))
i = 0
start = time.time()
for stock in stock_list:
i += 1
if i % 100 == 0:
end = time.time()
print('cpu', params['cpu'], ':', i, '/', len(stock_list), ';time(s):', end - start)
start = end
for year in year_list:
fundamentals_sets_stock = fundamentals_sets[fundamentals_sets['COMPCODE'] == stock]
new_fundamentals_sets = new_fundamentals_sets.append(year_update(
fundamentals_sets_stock[
fundamentals_sets_stock['REPORTYEAR'] == year]))
return new_fundamentals_sets
@classmethod
def df_calc_ttm_by_publish_date(self, df1, df2, df3, static_columns, sub_columns):
df = pd.DataFrame()
pub_date_list = list(set(df1['PUBLISHDATE']).union(df2['PUBLISHDATE']).union(df3['PUBLISHDATE']))
for pub_date in pub_date_list:
line1 = self.get_line(df1, pub_date)
line2 = self.get_line(df2, pub_date)
line3 = self.get_line(df3, pub_date)
if not line1.empty:
if not line2.empty and not line3.empty:
line = pd.concat(
[line1[static_columns], line1[sub_columns] + line2[sub_columns] - line3[sub_columns]])
line['PUBLISHDATE'] = pub_date
if line1['REPORTTYPE'] == '1' and line2['REPORTTYPE'] == '1' and line3['REPORTTYPE'] == '1':
line['REPORTTYPE'] = '1'
else:
line['REPORTTYPE'] = '3'
# 若无上季度数据,则将需要计算单季的字段置空,记录保留
else:
line1[sub_columns] = None
line = line1
df = df.append(line, ignore_index=True)
return df
def ttm_by_symbol(params):
fundamentals_sets = params['fundamentals_sets_symbol']
fundamentals_sets.reset_index(drop=True, inplace=True)
# fundamentals_sets = fundamentals_sets.sort_values(by='ENDDATE', ascending=False)
fundamentals_sets.sort_values(by=['COMPCODE', 'ENDDATE', 'PUBLISHDATE', 'REPORTTYPE'],
ascending=[False, False, False, True],
inplace=True)
sub_columns = params['sub_columns']
columns = fundamentals_sets.columns.values.tolist()
miss_columns = list(set(columns) - set(sub_columns))
def year_update(df, pre_year_df):
report_date_type = list(df['REPORTDATETYPE'])
pre_report_date_type = list(pre_year_df['REPORTDATETYPE'])
new_sub_df = pd.DataFrame()
if '4' in report_date_type:
new_sub_df = new_sub_df.append(df[df['REPORTDATETYPE'] == '4'])
report_date_type.remove('4')
if '4' in pre_report_date_type:
for i in report_date_type:
try:
sub_s = CalcTools.df_calc_ttm_by_publish_date(df[df['REPORTDATETYPE'] == i],
pre_year_df[pre_year_df['REPORTDATETYPE'] == '4'],
pre_year_df[pre_year_df['REPORTDATETYPE'] == i],
miss_columns,
sub_columns)
new_sub_df = new_sub_df.append(sub_s, ignore_index=True)
except Exception as e:
print(e)
else:
sub_s = df[df['REPORTDATETYPE'] != '4']
sub_s[sub_columns] = None
new_sub_df = new_sub_df.append(sub_s, ignore_index=True)
new_sub_df.reset_index(drop=True, inplace=True)
return new_sub_df
new_fundamentals_sets = pd.DataFrame()
year_list = list(set(fundamentals_sets['REPORTYEAR']))
year_list.sort(reverse=True)
stock_list = list(set(fundamentals_sets['COMPCODE']))
i = 0
start = time.time()
for stock in stock_list:
i += 1
if i % 100 == 0:
end = time.time()
print('cpu', params['cpu'], ':', i, '/', len(stock_list), ';time(s):', end - start)
start = end
fundamentals_sets_stock = fundamentals_sets[fundamentals_sets['COMPCODE'] == stock]
for j in range(len(year_list) - 1):
new_fundamentals_sets = new_fundamentals_sets.append(
year_update(fundamentals_sets_stock[fundamentals_sets_stock['REPORTYEAR'] == year_list[j]],
fundamentals_sets_stock[fundamentals_sets_stock['REPORTYEAR'] == year_list[j + 1]]))
return new_fundamentals_sets
# df必须是以PUBLISHDATE倒序排布
def get_fin_line(df, pub_date):
df = df[df['publish_date'] <= pub_date]
if df.empty:
return pd.Series()
return df.iloc[0]
@classmethod
def df_calc_fin_ttm_by_publish_date(self, df1, df2, df3, static_columns, sub_columns):
df = pd.DataFrame()
pub_date_list = list(set(df1['publish_date']).union(df2['publish_date']).union(df3['publish_date']))
for pub_date in pub_date_list:
line1 = self.get_fin_line(df1, pub_date)
line2 = self.get_fin_line(df2, pub_date)
line3 = self.get_fin_line(df3, pub_date)
if not line1.empty:
if not line2.empty:
if not line3.empty:
line = pd.concat(
[line1[static_columns], line1[sub_columns] + line2[sub_columns] - line3[sub_columns]])
line['publish_date'] = pub_date
line['report_type'] = line1['report_type']
# 若去年年报不为空,则对ttm为null的值做填充
line[sub_columns] = line[sub_columns].fillna(line2[sub_columns])
df = df.append(line, ignore_index=True)
# 20200511修改:若无去年年报或去年同期数据,则不计算当期ttm数据
"""
else:
line = pd.concat([line1[static_columns], line2[sub_columns]])
line['publish_date'] = pub_date
line['report_type'] = line1['report_type']
# print('无去年同期数据;', line1['company_id'], ',', line1['report_date'])
# 若无去年年报数据,则将需要计算单季的字段置空,记录保留
else:
line1[sub_columns] = None
line1['publish_date'] = pub_date
line = line1
df = df.append(line, ignore_index=True)
"""
return df
def fin_ttm_by_symbol(params):
fundamentals_sets = params['fundamentals_sets_symbol']
fundamentals_sets.sort_values(by=['company_id', 'report_date', 'publish_date', 'report_type'],
ascending=[False, False, False, True],
inplace=True)
fundamentals_sets.reset_index(drop=True, inplace=True)
sub_columns = params['sub_columns']
columns = fundamentals_sets.columns.values.tolist()
miss_columns = list(set(columns) - set(sub_columns))
def year_update(df, pre_year_df):
report_date_type = list(set(df['report_date_type']))
pre_report_date_type = list(set(pre_year_df['report_date_type']))
new_sub_df = pd.DataFrame()
if '4' in report_date_type:
new_sub_df = new_sub_df.append(df[df['report_date_type'] == '4'])
report_date_type.remove('4')
if '4' in pre_report_date_type:
for i in report_date_type:
try:
# 分别计算合并报表和母公司报表的ttm
for report_types in ['1', '2']:
sub_s = CalcTools.df_calc_fin_ttm_by_publish_date(
df[(df['report_date_type'] == i) & (df['report_type'] == report_types)],
pre_year_df[
(pre_year_df['report_date_type'] == '4') & (
pre_year_df['report_type'] == report_types)],
pre_year_df[
(pre_year_df['report_date_type'] == i) & (
pre_year_df['report_type'] == report_types)],
miss_columns,
sub_columns)
new_sub_df = new_sub_df.append(sub_s, ignore_index=True)
except Exception as e:
print(e)
# else:
# sub_s = df[df['report_date_type'] != '4']
# sub_s[sub_columns] = None
# new_sub_df = new_sub_df.append(sub_s, ignore_index=True)
new_sub_df.reset_index(drop=True, inplace=True)
return new_sub_df
new_fundamentals_sets = pd.DataFrame()
year_list = list(set(fundamentals_sets['report_year']))
year_list.sort(reverse=True)
stock_list = list(set(fundamentals_sets['company_id']))
i = 0
start = time.time()
for stock in stock_list:
i += 1
if i % 100 == 0:
end = time.time()
print('cpu', params['cpu'], ':', i, '/', len(stock_list), ';time(s):', end - start)
start = end
fundamentals_sets_stock = fundamentals_sets[fundamentals_sets['company_id'] == stock]
for j in range(len(year_list) - 1):
new_fundamentals_sets = new_fundamentals_sets.append(
year_update(fundamentals_sets_stock[fundamentals_sets_stock['report_year'] == year_list[j]],
fundamentals_sets_stock[fundamentals_sets_stock['report_year'] == year_list[j + 1]]))
return new_fundamentals_sets
@classmethod
def df_calc_fin_mrq_by_publish_date(self, df1, df2, static_columns, sub_columns):
df = pd.DataFrame()
pub_date_list = list(set(df1['publish_date']).union(df2['publish_date']))
for pub_date in pub_date_list:
line1 = self.get_fin_line(df1, pub_date)
line2 = self.get_fin_line(df2, pub_date)
if not line1.empty:
if not line2.empty:
line = pd.concat(
[line1[static_columns], line1[sub_columns] - line2[sub_columns]])
line['publish_date'] = pub_date
line['report_type'] = line1['report_type']
df = df.append(line, ignore_index=True)
# 若无上一季度数据,则将需要计算单季的字段置空,记录保留
# 20200511修改:若无去年年报数据,则不计算当期单季数据
"""
else:
line1[sub_columns] = None
line = line1
"""
return df
def fin_mrq_by_symbol(params):
fundamentals_sets = params['fundamentals_sets_symbol']
fundamentals_sets.sort_values(by=['company_id', 'report_date', 'publish_date', 'report_type'],
ascending=[False, False, False, True],
inplace=True)
fundamentals_sets.reset_index(drop=True, inplace=True)
sub_columns = params['sub_columns']
columns = fundamentals_sets.columns.values.tolist()
miss_columns = list(set(columns) - set(sub_columns))
def year_update(df):
report_date_type = list(set(df['report_date_type']))
new_sub_df = pd.DataFrame()
if '1' in report_date_type:
new_sub_df = new_sub_df.append(df[df['report_date_type'] == '1'])
report_date_type.remove('1')
for i in report_date_type:
try:
# 分别计算合并报表和母公司报表的ttm
for report_types in ['1', '2']:
sub_s = CalcTools.df_calc_fin_mrq_by_publish_date(
df[(df['report_date_type'] == i) & (df['report_type'] == report_types)],
df[(df['report_date_type'] == str(int(i) - 1)) & (df['report_type'] == report_types)],
miss_columns,
sub_columns)
new_sub_df = new_sub_df.append(sub_s, ignore_index=True)
except Exception as e:
print(e)
new_sub_df.reset_index(drop=True, inplace=True)
return new_sub_df
new_fundamentals_sets = pd.DataFrame()
year_list = list(set(fundamentals_sets['report_year']))
year_list.sort(reverse=True)
stock_list = list(set(fundamentals_sets['company_id']))
i = 0
start = time.time()
for stock in stock_list:
i += 1
if i % 100 == 0:
end = time.time()
print('cpu', params['cpu'], ':', i, '/', len(stock_list), ';time(s):', end - start)
start = end
fundamentals_sets_stock = fundamentals_sets[fundamentals_sets['company_id'] == stock]
for j in range(len(year_list) - 1):
new_fundamentals_sets = new_fundamentals_sets.append(
year_update(fundamentals_sets_stock[fundamentals_sets_stock['report_year'] == year_list[j]]))
return new_fundamentals_sets
def mrq_by_symbol(params):
fundamentals_sets = params['fundamentals_sets_symbol']
fundamentals_sets.reset_index(drop=True, inplace=True)
sub_columns = params['sub_columns']
half_year_sub_columns = params['half_year_columns']
year_sub_columns = params['year_columns']
mrq_sub_columns = list(set(params['sub_columns']) - set(half_year_sub_columns) - set(year_sub_columns))
columns = fundamentals_sets.columns.values.tolist()
static_columns_out_sub = list(set(columns) - set(mrq_sub_columns))
miss_columns_out_half_sub = list(set(columns) - set(half_year_sub_columns))
# fundamentals_sets.sort_values(by=['COMPCODE', 'ENDDATE', 'PUBLISHDATE'], ascending=False, inplace=True)
fundamentals_sets.sort_values(by=['COMPCODE', 'ENDDATE', 'PUBLISHDATE', 'REPORTTYPE'],
ascending=[False, False, False, True],
inplace=True)
def year_update(df):
ds = df.sort_values(by='REPORTDATETYPE')
report_date_type = list(set(df['REPORTDATETYPE']))
new_sub_df = pd.DataFrame(columns=sub_columns)
if '1' in report_date_type:
new_sub_df = new_sub_df.append(ds[ds['REPORTDATETYPE'] == '1'])
report_date_type.remove('1')
for i in report_date_type:
try:
# 计算所有可能出现的单季情况
sub_s = CalcTools.df_calc_mrq_by_publish_date(ds[ds['REPORTDATETYPE'] == i],
ds[ds['REPORTDATETYPE'] == str(int(i) - 1)],
static_columns_out_sub,
mrq_sub_columns)
# 计算半年报字段单季数据
if i == '4' and not sub_s.empty:
if '2' in report_date_type:
sub_s = CalcTools.df_calc_mrq_by_publish_date(sub_s, ds[ds['REPORTDATETYPE'] == '2'],
miss_columns_out_half_sub,
half_year_sub_columns)
else:
sub_s[half_year_sub_columns] = None
new_sub_df = new_sub_df.append(sub_s, ignore_index=True)
except Exception as e:
print(e)
new_sub_df = new_sub_df.append({}, ignore_index=True)
new_sub_df.reset_index(inplace=True, drop=True)
return new_sub_df
new_fundamentals_sets = pd.DataFrame()
year_list = list(set(fundamentals_sets['REPORTYEAR']))
year_list.sort(reverse=True)
stock_list = list(set(fundamentals_sets['COMPCODE']))
i = 0
start = time.time()
for stock in stock_list:
i += 1
if i % 100 == 0:
end = time.time()
print('cpu', params['cpu'], ':', i, '/', len(stock_list), ';time(s):', end - start)
start = end
for year in year_list:
fundamentals_sets_stock = fundamentals_sets[fundamentals_sets['COMPCODE'] == stock]
new_fundamentals_sets = new_fundamentals_sets.append(year_update(
fundamentals_sets_stock[
fundamentals_sets_stock['REPORTYEAR'] == year]))
return new_fundamentals_sets
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: zzh
@file: sqlengine.py
@time: 2019-08-29 16:22
"""
import datetime
import sys
sys.path.append('../..')
import config
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
from PyFin.api import advanceDateByCalendar
class InternalCode(object):
def __init__(self):
source_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.rl_db_user,
config.rl_db_pwd,
config.rl_db_host,
config.rl_db_port,
config.rl_db_database)
# 源数据库
self.source = sa.create_engine(source_db)
# 数据库Session
self.session = sessionmaker(bind=self.source, autocommit=False, autoflush=True)
self.internal_code_table = 'gl_internal_code'
self.sw_industry_table = 'sw_industry_daily'
def get_sw_industry_daily(self, dates=None):
trade_dates = {}
for date in dates:
trade_date = advanceDateByCalendar('china.sse', date, '-0b')
trade_dates[trade_date] = date
pass
def get_Ashare_internal_code(self, trade_date=None, type=[], security_type=[101]):
if trade_date == None:
sql = """select security_code,symbol as code,exchange,company_id,security_name from `{0}`
where exchange in (001002,001003) and security_type in {1} and flag=1
and security_code!=2010004233""".format(
self.internal_code_table, security_type).replace('[', '(').replace(']', ')')
if 'security_code' in (type) or 'company_id' in (type):
sql = sql + ' and is_valid=1; '
else:
sql = """select security_code,symbol as code,exchange,company_id,security_name,begin_date,end_date from `{0}`
where exchange in (001002,001003) and security_type in {1} and flag=1;""".format(
self.internal_code_table, security_type).replace('[', '(').replace(']', ')')
result_list = pd.read_sql(sql, self.source)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['exchange'] == '001002',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['exchange'], axis=1, inplace=True)
return result_list
def get_Ashare_internal_code_list(self, trade_dates=[], security_type=[101]):
if len(trade_dates) == 0:
trade_dates = trade_dates.append(datetime.date.today())
result_list = self.get_Ashare_internal_code(trade_dates, security_type=security_type)
df = pd.DataFrame()
if not result_list.empty:
for trade_date in trade_dates:
if not isinstance(trade_date, datetime.date):
trade_date = datetime.datetime.strptime(str(trade_date), '%Y%m%d').date()
trade_date_code = result_list[
((result_list['begin_date'] <= trade_date) & (result_list['end_date'] > trade_date)) | (
(result_list['begin_date'] <= trade_date) & (
result_list['end_date'] == datetime.datetime.strptime('19000101', '%Y%m%d').date()))]
trade_date_code['trade_date'] = trade_date
df = df.append(trade_date_code)
df.drop(columns=['begin_date', 'end_date'], axis=1, inplace=True)
df.reset_index(drop=True, inplace=True)
return df
def get_fd_internal_code(self, trade_date=None, type=[], security_type=[302]):
if trade_date == None:
sql = """select security_code,symbol as code,company_id,exchange,security_name from `{0}`
where exchange in (001002,001003,000091) and security_type in {1} and flag=1
""".format(self.internal_code_table, security_type).replace('[', '(').replace(']', ')')
else:
sql = """select security_code,symbol as code,exchange,company_id,security_name,begin_date,end_date from `{0}`
where exchange in (001002,001003,000091) and security_type in {1} and flag=1
""".format(self.internal_code_table, security_type).replace('[', '(').replace(']', ')')
result_list = pd.read_sql(sql, self.source)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['exchange'] == '001002',
result_list['code'] + '.XSHG',
result_list['code'])
result_list['symbol'] = np.where(result_list['exchange'] == '001003',
result_list['code'] + '.XSHE',
result_list['symbol'])
result_list['symbol'] = np.where(result_list['exchange'] == '000091',
result_list['code'] + '.OFCN',
result_list['symbol'])
result_list.drop(['exchange'], axis=1, inplace=True)
return result_list
def get_fd_internal_code_list(self, trade_dates=[], security_type=[302]):
if len(trade_dates) == 0:
trade_dates = trade_dates.append(datetime.date.today())
result_list = self.get_Ashare_internal_code(trade_dates, security_type=security_type)
df = pd.DataFrame()
if not result_list.empty:
for trade_date in trade_dates:
if not isinstance(trade_date, datetime.date):
trade_date = datetime.datetime.strptime(str(trade_date), '%Y%m%d').date()
trade_date_code = result_list[
((result_list['begin_date'] <= trade_date) & (result_list['end_date'] > trade_date)) | (
(result_list['begin_date'] <= trade_date) & (
result_list['end_date'] == datetime.datetime.strptime('19000101', '%Y%m%d').date()))]
trade_date_code['trade_date'] = trade_date
df = df.append(trade_date_code)
df.drop(columns=['begin_date', 'end_date'], axis=1, inplace=True)
df.reset_index(drop=True, inplace=True)
return df
def join_internal_code(self, df, left=[], right=[], include_name=False, security_type=[101], data_type='stk',
style='basic'):
if data_type == 'stk':
get_internal_code_list = self.get_Ashare_internal_code_list
get_internal_code = self.get_Ashare_internal_code
else:
get_internal_code_list = self.get_fd_internal_code_list
get_internal_code = self.get_fd_internal_code
security_type = [302]
if not df.empty:
if ('trade_date' in left):
trade_dates = list(set(df['trade_date']))
trade_dates.sort()
internal_code_sets = get_internal_code_list(trade_dates, security_type=security_type)
else:
internal_code_sets = get_internal_code(type=left, security_type=security_type)
if not include_name:
internal_code_sets.drop(columns=['security_name'], inplace=True)
internal_code_sets.drop(columns=['company_id'], inplace=True)
df = pd.merge(df, internal_code_sets, left_on=left, right_on=right)
if style == 'basic':
df = df.drop(columns=['symbol']).rename(columns={'code': 'symbol'})
return df
def transfer_internal_code(self, column, value, security_type=101, trade_date=None):
date_str = ""
if trade_date is not None:
date_str = f" and ((begin_date<='{trade_date}' and end_date>'{trade_date}') " \
f"or (begin_date<='{trade_date}' and end_date='19000101')"
sql = f"select company_id,security_code,symbol,exchange from {self.internal_code_table} " \
f"where exchange in (001002,001003) and security_type={security_type} and is_valid=1 and flag=1 " \
f"and {column}={value} {date_str}; "
result = pd.read_sql(sql, self.source)
if result.empty:
return None
else:
result['symbol'] = np.where(result['exchange'] == '001002',
result['symbol'] + '.XSHG',
result['symbol'] + '.XSHE')
return {
'company_id': result['company_id'][0],
'security_code': result['security_code'][0],
'symbol': result['symbol'][0]
}
if __name__ == '__main__':
internal = InternalCode()
# sets = internal.get_Ashare_internal_code('20190108')
# print(sets)
# sets = internal.get_Ashare_internal_code_list(['2010-07-31'])
# sets = internal.get_Ashare_internal_code_list(['20190108'])
combine_df = pd.read_csv('../files/tf-6.csv', encoding='utf-8', sep=",")
combine_df['trade_date'] = pd.to_datetime(combine_df['times']).dt.date
combine_df['code'] = combine_df['codes'].apply(lambda x: x[0:6])
combine_df_t = internal.join_internal_code(combine_df, left=['trade_date', 'code'],
right=['trade_date', 'code'])
print(combine_df_t)
# -*- coding: utf-8 -*-
import os, logging, datetime, inspect, time
from config import default_config
logger_dict = {'info': logging.INFO,
'warning': logging.WARNING,
'critical': logging.CRITICAL,
'debug': logging.DEBUG,
'error': logging.ERROR}
project_name = 'factor-choice-server'
class MLCLogger(object):
def __init__(self, logger_name, log_level='info',
log_format=None, log_dir=None):
default_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
log_format = default_format if log_format is None else log_format
formatter = logging.Formatter(log_format)
self.logger = logging.getLogger(logger_name)
console = logging.StreamHandler()
console.setFormatter(formatter)
self.logger.addHandler(console)
self.set_level(log_level)
if log_dir is not None:
self._config_file(log_level=log_level, logger_name=logger_name,
log_dir=log_dir, log_format=log_format)
def _config_file(self, log_level, logger_name, log_format,
log_dir):
dir_name = os.path.join(log_dir, logger_name, log_level)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
filename = datetime.datetime.now().date().strftime('%Y-%m-%d') + '.log'
logging.basicConfig(level=logger_dict[log_level.lower()],
format=log_format,
datefmt='%m-%d %H:%M',
filename=os.path.join(dir_name, filename),
filemode='a')
def set_level(self, log_level):
self.logger.setLevel(logger_dict[log_level.lower()])
def info(self, msg):
self.logger.info(msg)
def warning(self, msg):
self.logger.warning(msg)
def critical(self, msg):
self.logger.critical(msg)
def debug(self, msg):
self.logger.debug(msg)
def error(self, msg):
self.logger.error(msg)
class RDLogger(object):
def __init__(self, logger_name, log_level='info'):
self._logger_name = logger_name
self.logger = logging.getLogger(logger_name)
console = logging.StreamHandler()
self.logger.addHandler(console)
self.set_level(log_level)
def set_level(self, log_level):
self.logger.setLevel(logger_dict[log_level.lower()])
def message(self, level, message):
_, file_name, line_no, function_name, _, _ = inspect.stack()[2]
return "[%s line:%s] %s - %s - %s - %s" % (file_name, line_no,
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),
self._logger_name, level.upper(), message)
def info(self, msg):
self.logger.info(self.message('info', msg))
def warning(self, msg):
self.logger.info(self.message('warning', msg))
def critical(self, msg):
self.logger.info(self.message('critical', msg))
def debug(self, msg):
self.logger.info(self.message('debug', msg))
def error(self, msg):
self.logger.info(self.message('error', msg))
class LoggerFactory(object):
@classmethod
def create_engine(self, name='custom'):
if name == 'rd':
return RDLogger(project_name, log_dir=default_config.log_dir)
else:
return MLCLogger(project_name, log_dir=default_config.log_dir)
log = LoggerFactory.create_engine()
if __name__ == '__main__':
a = {'a': '1111'}
log.info((a['a']+'12'))
import argparse
import re
from io import BytesIO
from obs import *
import pandas as pd
'''
This sample demonstrates how to do bucket-related operations
(such as do bucket ACL/CORS/Lifecycle/Logging/Website/Location/Tagging/OPTIONS)
on OBS using the OBS SDK for Python.
'''
config = {
'AK': 'QATS9AJH5NZVTREENE25',
'SK': 'jCIfH6U5Atv7xwBB4yY7xcynf7aimnhqlGcDoiDD',
'server': 'obs.cn-east-3.myhuaweicloud.com',
'bucket_name': 'csvstore'
}
class ObsUtil():
def __init__(self):
self._obs_client = ObsClient(access_key_id=config['AK'], secret_access_key=config['SK'],
server=config['server'])
# self._bucket_client = self._obs_client.bucketClient(config['bucket_name'])
def push_df_to_bucket(self, df, file_path):
df_string = BytesIO()
df_string.write(df.to_csv(index=False, sep=',', encoding='UTF-8').encode('utf-8'))
df_string.seek(0, 0)
return self.push_stream_to_bucket(df_string, file_path)
def push_stream_to_bucket(self, stream, file_path):
resp = self._obs_client.putContent(config['bucket_name'], file_path, content=stream)
if resp.status < 300:
print('requestId:' + resp.requestId)
print(f'save {file_path} success!')
return True
else:
print('errorCode:' + resp.errorCode)
print('errorMessage:' + resp.errorMessage)
print(f'save {file_path} faild!')
return False
def exist_obj(self, file_path):
resp = self._obs_client.getObjectMetadata(config['bucket_name'], file_path)
if resp.status < 300:
return True
else:
return False
def push_df_to_bucket_from_file(self, df, file_path, bucketname=None):
file_name = f'/tmp/{str(time.time())}.csv'
df.to_csv(file_name, index=False, sep=',', encoding='UTF-8')
return self.push_file_to_bucket(file_name, file_path, bucketname)
def push_file_to_bucket(self, file_path, obs_file_path, bucketname=None):
if bucketname is None:
bucketname = config.bucket_name
resp = self._obs_client.putFile(bucketname, obs_file_path, file_path)
if resp.status < 300:
print('requestId:' + resp.requestId)
print(f'save {file_path} success!')
return True
else:
print('errorCode:' + resp.errorCode)
print('errorMessage:' + resp.errorMessage)
print(f'save {file_path} faild!')
return False
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--path', type=str, default='/Users/zzh/Downloads/20210404.csv')
args = parser.parse_args()
util = ObsUtil()
# df = pd.DataFrame({
# "a": [1, 2, 3],
# "b": [4, 5, 6],
# "c": [7, 8, 9]
# })
# 测试路径
# util.push_df_to_bucket(df, 'test/20200101.csv')
# 真实路径
# util.push_df_to_bucket(df, 'TFZQ/tf_exception/20200101.csv')
obs_path = re.split('[/\\\\]', args.path)
obs_path = [i for i in obs_path if i != '']
obs_path = ['TFZQ'] + obs_path[1:]
obs_path = '/'.join(obs_path)
util.push_file_to_bucket(args.path, obs_path, 'csvstore')
from io import BytesIO
from obs import *
import config
import pandas as pd
import time
'''
This sample demonstrates how to do bucket-related operations
(such as do bucket ACL/CORS/Lifecycle/Logging/Website/Location/Tagging/OPTIONS)
on OBS using the OBS SDK for Python.
'''
# log = LoggerFactory.create_engine()
class ObsUtil():
def __init__(self):
self._obs_client = ObsClient(access_key_id=config.AK, secret_access_key=config.SK,
server=config.server)
# self._bucket_client = self._obs_client.bucketClient(config.bucket_name)
def push_df_to_bucket(self, df, file_path):
df_string = BytesIO()
df_string.write(df.to_csv(index=False, sep=',', encoding='UTF-8').encode('utf-8'))
df_string.seek(0, 0)
return self.push_stream_to_bucket(df_string, file_path)
def push_df_to_bucket_from_file(self, df, file_path, bucketname=None):
file_name = f'/tmp/{str(time.time())}.csv'
df.to_csv(file_name, index=False, sep=',', encoding='utf_8_sig')
return self.push_file_to_bucket(file_name, file_path, bucketname)
def push_stream_to_bucket(self, stream, file_path):
resp = self._obs_client.putContent(config.bucket_name, file_path, content=stream)
if resp.status < 300:
print('requestId:' + resp.requestId)
print(f'save {file_path} success!')
return True
else:
print('errorCode:' + resp.errorCode)
print('errorMessage:' + resp.errorMessage)
print(f'save {file_path} faild!')
return False
def push_file_to_bucket(self, file_path, obs_file_path, bucketname=None):
if bucketname is None:
bucketname = config.bucket_name
resp = self._obs_client.putFile(bucketname, obs_file_path, file_path)
if resp.status < 300:
print('requestId:' + resp.requestId)
print(f'save {file_path} success!')
return True
else:
print('errorCode:' + resp.errorCode)
print('errorMessage:' + resp.errorMessage)
print(f'save {file_path} faild!')
return False
# 二进制下载数据
def get_bitdata_from_bucket(self, file_path):
resp = self._obs_client.getObject(config.bucket_name, file_path, loadStreamInMemory=True)
if resp.status < 300:
print('url:' + resp.body.url)
# df = pd.read_csv(io.BytesIO(resp.body.buffer), encoding='gb2312', sep=",")
# df = pd.read_excel(io.BytesIO(resp.body.buffer), encoding='gb2312', sep=",")
else:
print('errorCode:' + resp.errorCode)
print('errorMessage:' + resp.errorMessage)
return resp.body.buffer
# 文件下载数据
def get_file_from_bucket(self, file_path, save_path):
resp = self._obs_client.getObject(config.bucket_name, file_path, downloadPath=save_path)
if resp.status < 300:
print('requestId:' + resp.requestId)
print('url:' + resp.body.url)
return True
else:
print('errorCode:' + resp.errorCode)
print('errorMessage:' + resp.errorMessage)
return False
def get_stream_from_bucket(self, file_path):
result = b''
resp = self._obs_client.getObject(config.bucket_name, file_path, loadStreamInMemory=False)
if resp.status < 300:
print('requestId:' + resp.requestId)
# 读取对象内容
while True:
chunk = resp.body.response.read(10485760)
if not chunk:
break
else:
result += chunk
resp.body.response.close()
else:
print('errorCode:' + resp.errorCode)
print('errorMessage:' + resp.errorMessage)
return result
def get_dataframe_from_bucket(self, file_path, type='csv', encoding='utf-8'):
df = pd.DataFrame()
stream = self.get_stream_from_bucket(file_path)
if len(stream) > 0:
if type == 'csv':
df = pd.read_csv(BytesIO(stream), encoding=encoding, sep=",")
elif type == 'excel':
df = pd.read_excel(BytesIO(stream), encoding=encoding)
return df
def get_obj_metadata(self, file_path):
resp = self._obs_client.getObjectMetadata(config.bucket_name, file_path)
if resp.status < 300:
print('requestId:', resp.requestId)
print('contentType:', resp.body.contentType)
print('contentLength:', resp.body.contentLength)
print('property:', dict(resp.header).get('property'))
else:
print('requestId:', resp.requestId)
def exist_obj(self, file_path):
resp = self._obs_client.getObjectMetadata(config.bucket_name, file_path)
if resp.status < 300:
return True
else:
return False
if __name__ == '__main__':
import time
util = ObsUtil()
start = time.time()
# util.get_stream_from_bucket('factor_tf_securities_init.xlsx')
# result = util.get_stream_from_bucket('0617.csv')
# df = pd.read_csv(io.BytesIO(result), encoding='gb2312', sep=",")
# util.get_dataframe_from_bucket('stand_factor_data/factor_inhouse_000985_1w.csv')
# util.get_dataframe_from_bucket('TFZQ/tf_fund_factor/fund_product/20201130.csv',encoding='GBK')
# util.get_dataframe_from_bucket('QSJG/QSJG202011.csv', encoding='GBK')
# print(time.time() - start)
# start = time.time()
# util.get_file_from_bucket('factor_tf_securities_init.xlsx', './factor_tf_securities_init.xlsx')
# print(time.time() - start)
# util.exist_obj('stand_factor_data1/')
util.get_dataframe_from_bucket('tfzq/industry/20210420/20210420_industry_data1.csv')
#!/usr/bin/env python
# coding=utf-8
import sys
import sqlalchemy as sa
import pandas as pd
from datetime import datetime, timedelta
from PyFin.api import advanceDateByCalendar, makeSchedule
from redis import Redis
sys.path.append('..')
import config
class SyncUtil(object):
def __init__(self):
self.r = Redis(host=config.redis_host, port=config.redis_port, db=config.redis_db,
password=config.redis_password)
def get_file_list(self, tmstamp, project, data_name):
max_tag = tmstamp
file_history = self.r.zrangebyscore(config.data_root, '(' + str(int(float(tmstamp))), 9999999999999,
withscores=True)
update_file_list = []
for file in file_history:
file_name = str(file[0], encoding="utf-8")
if f'/{project}/' in file_name and f'_{data_name}.' in file_name:
update_file_list.append(file_name)
max_tag = file[1]
return update_file_list, max_tag
# 指定年份 ttm 周期计算
def ttm_report_date_by_year(self, end_date, year):
end_date = str(end_date).replace('-', '')
end_datetime = datetime.strptime(end_date, '%Y%m%d')
ttm_report_list = []
start_year = end_datetime.year - year + 1
pos_year = end_datetime.year
while pos_year >= start_year:
ttm_report_list += self.ttm_report_date(
str(pos_year) + '-' + str(end_datetime.month) + '-' + str(end_datetime.day))
pos_year -= 1
ttm_report_list.sort(reverse=True)
return ttm_report_list
# ttm周期计算
def ttm_report_date(self, end_date):
end_datetime = datetime.strptime(end_date, '%Y-%m-%d')
ttm_report_list = []
if end_datetime.month * 100 + end_datetime.day < 501:
ttm_report_list = [(end_datetime.year - 2) * 10000 + 1231,
(end_datetime.year - 1) * 10000 + 331,
(end_datetime.year - 1) * 10000 + 630,
(end_datetime.year - 1) * 10000 + 930]
elif 501 <= (end_datetime.month * 100 + end_datetime.day) < 901:
ttm_report_list = [(end_datetime.year - 1) * 10000 + 331,
(end_datetime.year - 1) * 10000 + 630,
(end_datetime.year - 1) * 10000 + 930,
(end_datetime.year) * 10000 + 331]
elif 901 <= (end_datetime.month * 100 + end_datetime.day) < 1101:
ttm_report_list = [(end_datetime.year - 1) * 10000 + 630,
(end_datetime.year - 1) * 10000 + 930,
(end_datetime.year) * 10000 + 331,
(end_datetime.year) * 10000 + 630]
elif 1101 <= (end_datetime.month * 100 + end_datetime.day):
ttm_report_list = [(end_datetime.year - 1) * 10000 + 1231,
(end_datetime.year) * 10000 + 331,
(end_datetime.year) * 10000 + 630,
(end_datetime.year) * 10000 + 930]
return ttm_report_list
# 获取报告日期
def create_report_date(self, min_year, max_year):
report_date_list = []
start_date = min_year - 1
if start_date < 2007:
start_date = 2007
while start_date <= max_year:
report_date_list.append(start_date * 10000 + 331)
report_date_list.append(start_date * 10000 + 630)
report_date_list.append(start_date * 10000 + 930)
report_date_list.append(start_date * 10000 + 1231)
start_date += 1
report_date_list.sort()
return report_date_list
# 从当前日期前推n个报告期,返回报告期日期
def get_before_report_date(self, trade_date, num):
year_size = int(num / 4) + 1
current_year = int(trade_date[:4])
all_report_date = []
for year in range(current_year - year_size, current_year + 1):
all_report_date.append(year * 10000 + 331)
all_report_date.append(year * 10000 + 630)
all_report_date.append(year * 10000 + 930)
all_report_date.append(year * 10000 + 1231)
all_report_date.sort(reverse=True)
for report_date in all_report_date:
if report_date < int(trade_date):
num -= 1
if num == 0:
return report_date
# 获取区间
def every_report_range(self, trade_date, report_date_list):
report_date_list.sort(reverse=True)
start_flag = 0
start_count = 0
for report_date in report_date_list:
if int(trade_date) >= report_date:
start_flag = 1
if start_flag == 1:
start_count += 1
if start_count == 2:
return (trade_date, report_date)
return (0, 0)
# 财务报告时间换算
def plus_year(self, row):
# 331 1, 603 2, 930 3, 1231 4
row['year'] = row['report_date'].year
if row['report_date'].month * 100 + row['report_date'].day == 331:
row['report_type'] = 1
elif row['report_date'].month * 100 + row['report_date'].day == 630:
row['report_type'] = 2
elif row['report_date'].month * 100 + row['report_date'].day == 930:
row['report_type'] = 3
elif row['report_date'].month * 100 + row['report_date'].day == 1231:
row['report_type'] = 4
return row
# 判断是否是本周或本月最后一个交易日
def is_month_or_week_end_trade_date(self, trade_date, period):
later_trade_date = advanceDateByCalendar('china.sse', trade_date, '1b')
if period == '1m':
if later_trade_date.month != trade_date.month:
return True
else:
return False
else:
if later_trade_date.isocalendar()[1] != trade_date.isocalendar()[1]:
return True
else:
return False
# 判断是否是本月最后一天
def is_month_end_date(self, date):
next_month = date.replace(day=28) + timedelta(days=4) # this will never fail
return date == (next_month - timedelta(days=next_month.day))
def get_trade_month(self, start_date, end_date):
month_dates = []
dates = makeSchedule(datetime.strptime(str(start_date), '%Y%m%d'), datetime.strptime(str(end_date), '%Y%m%d'),
'1b', 'china.sse')
for date in dates:
# if self.sync_util.is_month_end_date(date):
if self.is_month_or_week_end_trade_date(date, '1m'):
month_dates.append(str(date.date()).replace('-', ''))
return month_dates
if __name__ == "__main__":
# pdb.set_trace()
sync = SyncUtil()
# test = sync.ttm_report_date_by_year('2018-06-10', 5)
# test = sync.get_before_report_date('20180701', 2)
# test = sync.get_trades_ago('001002', '20180610', '20180615', 1, order='DESC')
test = sync.is_month_end_date(datetime(2020, 10, 31))
print(test)
#!/usr/bin/env python
# coding=utf-8
import pdb
import sys
import os
import pandas as pd
from collections import OrderedDict
import collections
sys.path.append("..")
import config
class TradeDate(object):
def __init__(self):
self._all_trade_file = config.RECORD_BASE_DIR + 'trade_date/' + 'trade_date.csv'
self._trade_date_sets = OrderedDict()
self._load_trade_date()
def _load_trade_date(self):
if os.path.exists(self._all_trade_file):
trade_date = pd.read_csv(self._all_trade_file, index_col=0)
for index in trade_date.index:
self._trade_date_sets[int(trade_date.loc[index].values[0])] = int(trade_date.loc[index].values[0])
self._trade_date_sets = collections.OrderedDict(sorted(self._trade_date_sets.items(),
key=lambda t: t[0], reverse=False))
def trade_date_sets_ago(self, start_date, end_date, count):
sub_trade_date = []
trade_date_sets = collections.OrderedDict(
sorted(self._trade_date_sets.items(), key=lambda t: t[0], reverse=True))
start_flag = 0
start_count = 0
for trade_date, values in trade_date_sets.items():
# print(trade_date, start_date, end_date)
if trade_date <= end_date:
start_flag = 1
if start_flag == 1:
if start_date <= trade_date and start_count != count:
sub_trade_date.append(trade_date)
start_count += 1
else:
break
return sub_trade_date
def trade_date_sets(self, start_date, end_date):
sub_trade_date = []
trade_date_sets = collections.OrderedDict(
sorted(self._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
start_flag = 0
for trade_date, values in trade_date_sets.items():
print(trade_date, start_date, end_date)
if trade_date == start_date:
start_flag = 1
if start_flag == 1:
sub_trade_date.append(trade_date)
if end_date <= trade_date:
break
return sub_trade_date
def trade_date_sets_range(self, start_date, range_day, flag=1):
start_count = 0
sub_trade_date = []
if flag == 0:
trade_date_sets = collections.OrderedDict(
sorted(self._trade_date_sets.items(), key=lambda t: t[0], reverse=False))
else:
trade_date_sets = collections.OrderedDict(
sorted(self._trade_date_sets.items(), key=lambda t: t[0], reverse=True))
start_flag = 0
for trade_date, values in trade_date_sets.items():
if trade_date == start_date:
start_flag = 1
if start_flag == 1:
sub_trade_date.append(trade_date)
start_count += 1
if start_count >= range_day:
break
return sub_trade_date
if __name__ == '__main__':
tr = TradeDate()
result = tr.trade_date_sets_ago(20070101, 20190101, -1)
print(result)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment