Commit c007ed3f authored by 李煜's avatar 李煜

remove trash file

parent 4795dcd5
import sys
from tm_import_utils import TmImportUtils
sys.path.append('..')
import config
import sqlalchemy as sa
import pandas as pd
from sqlalchemy.orm import sessionmaker
class BaseSync(object):
def __init__(self, dest_table):
source_db = '''mssql+pymssql://{0}:{1}@{2}:{3}/{4}'''.format(config.source_db_user, config.source_db_pwd,
config.source_db_host, config.source_db_port,
config.source_db_database)
destination_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.destination_db_user,
config.destination_db_pwd,
config.destination_db_host,
config.destination_db_port,
config.destination_db_database)
# 源数据库
self.source = sa.create_engine(source_db)
# 目标数据库
self.destination = sa.create_engine(destination_db)
# 目标数据库Session
self.dest_session = sessionmaker(bind=self.destination, autocommit=False, autoflush=True)
self.dest_table = dest_table
def get_start_date(self):
sql = """select max(trade_date) as trade_date from `{0}`;""".format(self.dest_table)
trades_sets = pd.read_sql(sql, self.destination)
td = 20070101
if not trades_sets.empty:
td = trades_sets['trade_date'][0]
td = str(td).replace('-', '')
return td
def delete_trade_data(self, trade_date):
session = self.dest_session()
session.execute('''delete from `{0}` where trade_date={1}'''.format(self.dest_table, trade_date))
session.commit()
def create_table(self, create_sql):
drop_sql = """drop table if exists `{0}`;""".format(self.dest_table)
session = self.dest_session()
session.execute(drop_sql)
session.execute(create_sql)
session.execute(
'''alter table `{0}` add `creat_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP;'''.format(self.dest_table))
session.execute(
'''alter table `{0}` add `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;'''.format(
self.dest_table))
session.commit()
session.close()
def insert_or_update(self, datas):
session = self.dest_session()
for i in range(datas.shape[0]):
data = datas.iloc[i]
values = ''
update = ''
title = ''
for j in range(len(data)):
index = data.index[j]
value = str(data[j]).replace("'", "\\'").replace("%", "\\%")
title += """`{0}`,""".format(index)
values += """'{0}',""".format(value)
update += """`{0}`='{1}',""".format(index, value)
sql = '''insert into {0} ({1}) values({2}) ON DUPLICATE KEY UPDATE {3}'''.format(self.dest_table,
title[0:-1],
values[0:-1],
update[0:-1]
)
sql = sql.replace("'nan'", 'Null').replace("'None'", 'Null')
session.execute(sql)
session.commit()
session.close()
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
import config
class SyncStkCapitalChange(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_SHARESTRUCHG'
self.dest_table = 'stk_capital_change'
super(SyncStkCapitalChange, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT,
symbol VARCHAR(20),
company_id VARCHAR(10),
pub_date Date,
begin_date Date,
end_date Date,
total_shares NUMERIC(15,6),
floating_shares NUMERIC(15,6),
floating_ashares NUMERIC(15,6),
floating_bshares NUMERIC(15,6),
floating_hshares NUMERIC(15,6),
other_floating_shares NUMERIC(15,6),
restrict_floating_shares NUMERIC(15,6),
restrict_floating_ashares NUMERIC(15,6),
non_floating_ashares NUMERIC(15,6),
free_floating_shares NUMERIC(15,6),
b_shares NUMERIC(15,6),
exdividend_date Date,
`explain` VARCHAR(100),
change_type VARCHAR(2),
change_reason VARCHAR(1000),
is_valid INT,
entry_date DATE,
entry_time VARCHAR(8),
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`begin_date`,`exdividend_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPCODE as company_id,
a.PUBLISHDATE as pub_date,
a.BEGINDATE as begin_date,
a.ENDDATE as end_date,
a.TOTALSHARE as total_shares,
a.CIRCSKAMT as floating_shares,
a.CIRCAAMT as floating_ashares,
a.CIRCBAMT as floating_bshares,
a.CIRCHAMT as floating_hshares,
a.OTHERCIRCAMT as other_floating_shares,
a.LIMSKAMT as restrict_floating_shares,
a.RECIRCAAMT as restrict_floating_ashares,
a.NCIRCAMT as non_floating_ashares,
a.FCIRCAAMT as free_floating_shares,
a.BSK as b_shares,
a.EXRIGHTDATE as exdividend_date,
a.EXRIGHTEXP as explain,
a.SKCHGTYPE as change_type,
a.SHCHGRSN as change_reason,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 2000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkCapitalChange()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkCapitalChange()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncCompanyInfo(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_COMP_INFO'
self.dest_table = 'stk_company_info'
super(SyncCompanyInfo, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(15) NOT NULL,
pub_date Date DEFAULT NULL,
company_id VARCHAR(10) NOT NULL,
full_name VARCHAR(200) DEFAULT NULL,
short_name VARCHAR(100) DEFAULT NULL,
english_name_full VARCHAR(300) DEFAULT NULL,
english_name VARCHAR(300) DEFAULT NULL,
type1 VARCHAR(10) DEFAULT NULL,
type2 VARCHAR(10) DEFAULT NULL,
islist INT DEFAULT NULL,
isbranche INT DEFAULT NULL,
establish_date Date DEFAULT NULL,
type VARCHAR(10) DEFAULT NULL,
reg_capital NUMERIC(19,2) DEFAULT NULL,
auth_share NUMERIC(19,0) DEFAULT NULL,
currency VARCHAR(10) DEFAULT NULL,
org_code VARCHAR(20) DEFAULT NULL,
region VARCHAR(10) DEFAULT NULL,
country VARCHAR(10) DEFAULT NULL,
chairman VARCHAR(100) DEFAULT NULL,
ceo VARCHAR(100) DEFAULT NULL,
leger VARCHAR(100) DEFAULT NULL,
secretary VARCHAR(50) DEFAULT NULL,
secretary_phone VARCHAR(100) DEFAULT NULL,
secretary_email VARCHAR(100) DEFAULT NULL,
security_representative VARCHAR(50) DEFAULT NULL,
lawfirm VARCHAR(100) DEFAULT NULL,
cpafirm VARCHAR(100) DEFAULT NULL,
business_scale VARCHAR(10) DEFAULT NULL,
register_location VARCHAR(200) DEFAULT NULL,
zipcode VARCHAR(20) DEFAULT NULL,
office VARCHAR(200) DEFAULT NULL,
telephone VARCHAR(100) DEFAULT NULL,
fax VARCHAR(100) DEFAULT NULL,
email VARCHAR(100) DEFAULT NULL,
website VARCHAR(100) DEFAULT NULL,
pub_url VARCHAR(100) DEFAULT NULL,
description TEXT DEFAULT NULL,
business_scope TEXT DEFAULT NULL,
main_business TEXT DEFAULT NULL,
license_number VARCHAR(50) DEFAULT NULL,
live_status VARCHAR(10) DEFAULT NULL,
live_begindate Date DEFAULT NULL,
live_enddate Date DEFAULT NULL,
is_valid INT NOT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) NOT NULL,
total_employees INT DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
b.Symbol as code,
b.Exchange,
a.PUBLISHDATE as pub_date,
a.COMPCODE as company_id,
a.COMPNAME as full_name,
a.COMPSNAME as short_name,
a.ENGNAME as english_name_full,
a.COMPSNAME as english_name,
a.COMPTYPE1 as type1,
a.COMPTYPE2 as type2,
a.ISLIST as islist,
a.ISBRANCH as isbranche,
a.FOUNDDATE as establish_date,
a.ORGTYPE as type,
a.REGCAPITAL as reg_capital,
a.AUTHCAPSK as auth_share,
a.CUR as currency,
a.ORGCODE as org_code,
a.REGION as region,
a.COUNTRY as country,
a.CHAIRMAN as chairman,
a.MANAGER as ceo,
a.LEGREP as leger,
a.BSECRETARY as secretary,
a.BSECRETARYTEL as secretary_phone,
a.BSECRETARYMAIL as secretary_email,
a.SEAFFREPR as security_representative,
a.LECONSTANT as lawfirm,
a.ACCFIRM as cpafirm,
a.BIZSCALE as business_scale,
a.REGADDR as register_location,
a.REGPTCODE as zipcode,
a.OFFICEADDR as office,
a.COMPTEL as telephone,
a.COMPFAX as fax,
a.COMPEMAIL as email,
a.COMPURL as website,
a.DISURL as pub_url,
a.COMPINTRO as description,
a.BIZSCOPE as business_scope,
a.MAJORBIZ as main_business,
a.BIZLICENSENO as license_number,
a.COMPSTATUS as live_status,
a.EXISTBEGDATE as live_begindate,
a.EXISTENDDATE as live_enddate,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
a.WORKFORCE as total_employees,
cast(a.tmstamp as bigint) as tmstamp
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncCompanyInfo()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncCompanyInfo()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkEmployeeInfo(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_COMP_EMPLOYEE'
self.dest_table = 'stk_employee_info'
super(SyncStkEmployeeInfo, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id   int NOT NULL,
symbol VARCHAR(20) NOT NULL,
company_name   VARCHAR(200) NOT NULL,
company_id   varchar(10) NOT NULL,
end_date   Date NOT NULL,
pub_date   Date DEFAULT NULL,
work_dorce   int DEFAULT NULL,
product_num   int DEFAULT NULL,
sales_num   int DEFAULT NULL,
financial_num   int DEFAULT NULL,
tech_num   int DEFAULT NULL,
reasearch_num   int DEFAULT NULL,
admin_nunm   int DEFAULT NULL,
retire_num   int DEFAULT NULL,
other_num   int DEFAULT NULL,
doctor_num   int DEFAULT NULL,
posrg_num   int DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`end_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPNAME as company_name,
a.COMPCODE as company_id,
a.ENDDATE as end_date,
a.DECLAREDATE as pub_date,
a.WORKFORCE as work_dorce,
a.PRODUCTIONSTAFF as product_num,
a.SALESPERSONS as sales_num,
a.FINANCIALSTAFF as financial_num,
a.TECHNICALSTAFF as tech_num,
a.RESEARCHSTAFF as reasearch_num,
a.ADMTRATIVESTAFF as admin_nunm,
a.RETIREESTAFF as retire_num,
a.OTHERSTAFF as other_num,
a.DRNUM as doctor_num,
a.POSTGRAD as posrg_num,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkEmployeeInfo()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkEmployeeInfo()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkExd(BaseSync):
def __init__(self, source=None, destination=None):
self.dest_table = 'stk_exd'
self.source_table = 'TQ_SK_XDRY'
super(SyncStkExd,self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id   INT NOT NULL,
symbol   VARCHAR(20) NOT NULL,
begin_date   DATE NOT NULL,
end_date   DATE NOT NULL,
exd_factor   NUMERIC(32,19) DEFAULT NULL,
back_exd   NUMERIC(29,16) DEFAULT NULL,
direct_exd   NUMERIC(29,16) DEFAULT NULL,
is_valid   INT NOT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`begin_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.BEGINDATE as begin_date,
a.ENDDATE as end_date,
a.XDY as exd_factor,
a.LTDXDY as back_exd,
a.THELTDXDY as direct_exd,
a.ISVALID as is_valid,
cast(a.tmstamp as bigint) as tmstamp,
b.Exchange,
b.SYMBOL as code
from TQ_SK_XDRY a
left join TQ_OA_STCODE as b
on b.SECODE = a.SECODE
where b.ISVALID=1 and b.LISTSTATUS=1 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkExd()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkExd()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkFinForcast(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_EXPTPERFORMANCE'
self.dest_table = 'stk_fin_forcast'
super(SyncStkFinForcast, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(20) NOT NULL,
company_id VARCHAR(10) NOT NULL,
pub_date DATE NOT NULL,
source VARCHAR(10) NOT NULL,
begin_date DATE NOT NULL,
end_date DATE NOT NULL,
base_begin_date DATE NOT NULL,
base_end_date DATE NOT NULL,
operating_income_estimate NUMERIC(18,6) DEFAULT NULL,
operating_income_increas_estimate NUMERIC(15,6) DEFAULT NULL,
operating_income_text VARCHAR(400) DEFAULT NULL,
operating_income_mark VARCHAR(10) DEFAULT NULL,
operating_profit_estimate NUMERIC(18,6) DEFAULT NULL,
operating_profit_increase_estimate NUMERIC(15,6) DEFAULT NULL,
operating_profit_text VARCHAR(400) DEFAULT NULL,
operating_profit_mark VARCHAR(10) DEFAULT NULL,
net_profit_top NUMERIC(18,6) DEFAULT NULL,
net_profit_bottom NUMERIC(18,6) DEFAULT NULL,
net_profit_increas_top NUMERIC(18,6) DEFAULT NULL,
net_profit_increas_bottom NUMERIC(18,6) DEFAULT NULL,
net_profit_estimate_top VARCHAR(10) DEFAULT NULL,
net_profit_estimate_bottom VARCHAR(10) DEFAULT NULL,
net_profit_estimate_text VARCHAR(400) DEFAULT NULL,
eps_top NUMERIC(15,6) DEFAULT NULL,
eps_bottom NUMERIC(15,6) DEFAULT NULL,
eps_estimate_top VARCHAR(10) DEFAULT NULL,
eps_estimate_bottom VARCHAR(10) DEFAULT NULL,
isvalid INT DEFAULT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) NOT NULL,
currency VARCHAR(10) DEFAULT NULL,
eps_increase_estimate_top NUMERIC(15,6) DEFAULT NULL,
eps_increase_estimate_bottom NUMERIC(15,6) DEFAULT NULL,
exp_year VARCHAR(4) DEFAULT NULL,
exp_type VARCHAR(10) DEFAULT NULL,
report_type VARCHAR(10) DEFAULT NULL,
estimate_origin_text TEXT DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`pub_date`,`source`,`begin_date`,`end_date`,`base_begin_date`,`base_end_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPCODE as company_id,
a.PUBLISHDATE as pub_date,
a.DATASOURCE as source,
a.SESSIONBEGDATE as begin_date,
a.SESSIONENDDATE as end_date,
a.BASESSIONBEGDATE as base_begin_date,
a.BASESSIONENDDATE as base_end_date,
a.OPERMINCOME as operating_income_estimate,
a.OPERMINCOMEINC as operating_income_increas_estimate,
a.OPERMINCOMEDES as operating_income_text,
a.OPERMINCOMEMK as operating_income_mark,
a.OPERMPROFIT as operating_profit_estimate,
a.OPERMPROFITINC as operating_profit_increase_estimate,
a.OPERMPROFITDES as operating_profit_text,
a.OPERMPROFITMK as operating_profit_mark,
a.RETAMAXPROFITS as net_profit_top,
a.RETAMINPROFITS as net_profit_bottom,
a.RETAMAXPROFITSINC as net_profit_increas_top,
a.RETAMINPROFITSINC as net_profit_increas_bottom,
a.RETAMAXPROFITSMK as net_profit_estimate_top,
a.RETAMINPROFITSMK as net_profit_estimate_bottom,
a.RETAPROFITSDES as net_profit_estimate_text,
a.EPSMAXFORE as eps_top,
a.EPSMINFORE as eps_bottom,
a.EPSMAXFOREMK as eps_estimate_top,
a.EPSMINFOREMK as eps_estimate_bottom,
a.ISVALID as isvalid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
a.CUR as currency,
a.EPSMAXFOREINC as eps_increase_estimate_top,
a.EPSMINFOREINC as eps_increase_estimate_bottom,
a.EXPTYEAR as exp_year,
a.EXPTTYPE as exp_type,
a.GLOBALEXPTMOD as report_type,
a.EXPTORIGTEXT as estimate_origin_text,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkFinForcast()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkFinForcast()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkHolderNum(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_SHAREHOLDERNUM'
self.dest_table = 'stk_holder_num'
super(SyncStkHolderNum, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(15) NOT NULL,
company_id VARCHAR(10) NOT NULL,
end_date DATE NOT NULL,
pub_date DATE DEFAULT NULL,
share_holders NUMERIC(10,0) DEFAULT NULL,
total_share NUMERIC(19,0) DEFAULT NULL,
share_holders_ave NUMERIC(19,4) DEFAULT NULL,
share_holders_ave_ratio NUMERIC(10,6) DEFAULT NULL,
a_share_holders NUMERIC(10,0) DEFAULT NULL,
a_share NUMERIC(19,0) DEFAULT NULL,
a_share_holders_ave NUMERIC(19,4) DEFAULT NULL,
a_share_holders_ave_ratio NUMERIC(10,6) DEFAULT NULL,
b_share_holders NUMERIC(10,0) DEFAULT NULL,
h_share_holders NUMERIC(10,0) DEFAULT NULL,
memo VARCHAR(400) DEFAULT NULL,
is_valid INT DEFAULT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) NOT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`end_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
b.Symbol as code,
b.Exchange,
a.COMPCODE as company_id,
a.ENDDATE as end_date,
a.PUBLISHDATE as pub_date,
a.TOTALSHAMT as share_holders,
a.TOTALSHARE as total_share,
a.KAVGSH as share_holders_ave,
a.HOLDPROPORTIONPACC as share_holders_ave_ratio,
a.ASKSHAMT as a_share_holders,
a.ASK as a_share,
a.ASKAVGSH as a_share_holders_ave,
a.AHOLDPROPORTIONPACC as a_share_holders_ave_ratio,
a.BSKSHAMT as b_share_holders,
a.HSKSHAMT as h_share_holders,
a.MEMO as memo,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
cast(a.tmstamp as bigint) as tmstamp
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkHolderNum()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkHolderNum()
processor.do_update()
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkManagementInfo(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_COMP_MANAGER'
self.dest_table = 'stk_management_info'
super(SyncStkManagementInfo,self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(15) NOT NULL,
update_date DATE NOT NULL,
company_code VARCHAR(20) NOT NULL,
Job_attribute VARCHAR(10) NOT NULL,
job_code VARCHAR(10) NOT NULL,
job_mode VARCHAR(10) NOT NULL,
job_name VARCHAR(100) NOT NULL,
person_code VARCHAR(20) NOT NULL,
name VARCHAR(100) NOT NULL,
board_session INT DEFAULT NULL,
employment_session INT DEFAULT NULL,
status VARCHAR(10) DEFAULT NULL,
begin_date DATE NOT NULL,
end_date DATE NOT NULL,
dimission_reason VARCHAR(10) DEFAULT NULL,
is_dimission INT DEFAULT NULL,
memo VARCHAR(1000) DEFAULT NULL,
is_valid INT NOT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) NOT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`person_code`,`job_name`,`begin_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.UPDATEDATE as update_date,
a.COMPCODE as company_code,
a.POSTYPE as Job_attribute,
a.DUTYCODE as job_code,
a.DUTYMOD as job_mode,
a.ACTDUTYNAME as job_name,
a.PERSONALCODE as person_code,
a.CNAME as name,
a.MGENTRYS as board_session,
a.DENTRYS as employment_session,
a.NOWSTATUS as status,
a.BEGINDATE as begin_date,
a.ENDDATE as end_date,
a.DIMREASON as dimission_reason,
a.ISRELDIM as is_dimission,
a.MEMO as memo,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkManagementInfo()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkManagementInfo()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncSecurityInfo(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_BASICINFO'
self.dest_table = 'stk_security_info'
super(SyncSecurityInfo,self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
company_id VARCHAR(20) NOT NULL,
symbol VARCHAR(20) NOT NULL,
exchange VARCHAR(10) NOT NULL,
security_type VARCHAR(10) NOT NULL,
short_name VARCHAR(100) NOT NULL,
english_name VARCHAR(100) DEFAULT NULL,
decnum INT DEFAULT NULL,
currency VARCHAR(10) NOT NULL,
isin_code VARCHAR(20) DEFAULT NULL,
sedol_code VARCHAR(20) DEFAULT NULL,
pairvalue NUMERIC(19,2) DEFAULT NULL,
total_shares NUMERIC(15,6) DEFAULT NULL,
lists_tatus VARCHAR(10) NOT NULL,
list_date DATE NOT NULL,
ipo_price NUMERIC(9,4) DEFAULT NULL,
delist_date DATE NOT NULL,
delist_price NUMERIC(9,4) DEFAULT NULL,
sfc_industry1_code VARCHAR(10) DEFAULT NULL,
sfc_industry1_name VARCHAR(100) DEFAULT NULL,
sfc_industry2_code VARCHAR(10) DEFAULT NULL,
sfc_industry2_name VARCHAR(100) DEFAULT NULL,
gics_industry1_code VARCHAR(10) DEFAULT NULL,
gics_industry1_name VARCHAR(100) DEFAULT NULL,
gics_industry2_code VARCHAR(10) DEFAULT NULL,
gics_industry2_name VARCHAR(100) DEFAULT NULL,
sw_industry1_code VARCHAR(10) DEFAULT NULL,
sw_industry1_name VARCHAR(100) DEFAULT NULL,
sw_industry2_code VARCHAR(10) DEFAULT NULL,
sw_industry2_name VARCHAR(100) DEFAULT NULL,
csi_industry1_code VARCHAR(10) DEFAULT NULL,
csi_industry1_name VARCHAR(100) DEFAULT NULL,
csi_industry2_code VARCHAR(10) DEFAULT NULL,
csi_industry2_name VARCHAR(100) DEFAULT NULL,
province_code VARCHAR(10) DEFAULT NULL,
province_name VARCHAR(100) DEFAULT NULL,
city_code VARCHAR(10) DEFAULT NULL,
city_name VARCHAR(100) DEFAULT NULL,
isvalid INT DEFAULT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) DEFAULT NULL,
value_currency VARCHAR(10) DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPCODE as company_id,
a.SYMBOL as code,
a.EXCHANGE as exchange,
a.SETYPE as security_type,
a.SESNAME as short_name,
a.SEENGNAME as english_name,
a.DECNUM as decnum,
a.CUR as currency,
a.ISINCODE as isin_code,
a.SEDOLCODE as sedol_code,
a.PARVALUE as pairvalue,
a.TOTALSHARE as total_shares,
a.LISTSTATUS as lists_tatus,
a.LISTDATE as list_date,
a.LISTOPRICE as ipo_price,
a.DELISTDATE as delist_date,
a.DELISTCPRICE as delist_price,
a.CSRCLEVEL1CODE as sfc_industry1_code,
a.CSRCLEVEL1NAME as sfc_industry1_name,
a.CSRCLEVEL2CODE as sfc_industry2_code,
a.CSRCLEVEL2NAME as sfc_industry2_name,
a.GICSLEVEL1CODE as gics_industry1_code,
a.GICSLEVEL1NAME as gics_industry1_name,
a.GICSLEVEL2CODE as gics_industry2_code,
a.GICSLEVEL2NAME as gics_industry2_name,
a.SWLEVEL1CODE as sw_industry1_code,
a.SWLEVEL1NAME as sw_industry1_name,
a.SWLEVEL2CODE as sw_industry2_code,
a.SWLEVEL2NAME as sw_industry2_name,
a.CSILEVEL1CODE as csi_industry1_code,
a.CSILEVEL1NAME as csi_industry1_name,
a.CSILEVEL2CODE as csi_industry2_code,
a.CSILEVEL2NAME as csi_industry2_name,
a.PROVINCECODE as province_code,
a.PROVINCENAME as province_name,
a.CITYCODE as city_code,
a.CITYNAME as city_name,
a.ISVALID as isvalid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
a.VALUECUR as value_currency,
cast(a.tmstamp as bigint) as tmstamp
from {1} a
where a.LISTSTATUS=1 and (a.EXCHANGE='001002' or a.EXCHANGE='001003') """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['exchange'] == '001002',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncSecurityInfo()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncSecurityInfo()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkShareholderFloatingTop10(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_OTSHOLDER'
self.dest_table = 'stk_shareholder_floating_top10'
super(SyncStkShareholderFloatingTop10, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(20) NOT NULL,
pub_date DATE NOT NULL,
end_date DATE NOT NULL,
company_id VARCHAR(10) NOT NULL,
shareholder_id VARCHAR(10) DEFAULT NULL,
shareholder_name VARCHAR(200) NOT NULL,
shareholder_class VARCHAR(10) DEFAULT NULL,
sharesnature VARCHAR(10) DEFAULT NULL,
shareholder_rank NUMERIC(10,0) NOT NULL,
share_number NUMERIC(26,2) DEFAULT NULL,
total_share_ratio NUMERIC(12,6) DEFAULT NULL,
a_share_ratio NUMERIC(8,4) DEFAULT NULL,
a_share_number NUMERIC(26,2) DEFAULT NULL,
b_share_number NUMERIC(26,2) DEFAULT NULL,
h_share_number NUMERIC(26,2) DEFAULT NULL,
share_number_change NUMERIC(16,0) DEFAULT NULL,
share_pledge NUMERIC(16,0) DEFAULT NULL,
share_freeze NUMERIC(16,0) DEFAULT NULL,
freeze_reason VARCHAR(200) DEFAULT NULL,
a1_share_ratio NUMERIC(12,6) DEFAULT NULL,
is_history INT DEFAULT NULL,
update_date DATE DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`end_date`,`shareholder_name`,`shareholder_rank`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.PUBLISHDATE as pub_date,
a.ENDDATE as end_date,
a.COMPCODE as company_id,
a.SHHOLDERCODE as shareholder_id,
a.SHHOLDERNAME as shareholder_name,
a.SHHOLDERTYPE as shareholder_class,
a.SHHOLDERNATURE as sharesnature,
a.RANK as shareholder_rank,
a.HOLDERAMT as share_number,
a.PCTOFFLOATSHARES as total_share_ratio,
a.HOLDERRTO as a_share_ratio,
a.HOLDERANUM as a_share_number,
a.HOLDERBNUM as b_share_number,
a.HOLDERHNUM as h_share_number,
a.HOLDERSUMCHG as share_number_change,
a.PLEDGEINVOLVEDSUM as share_pledge,
a.FREEZEINVOLVEDSUM as share_freeze,
a.PFSTATEMENT as freeze_reason,
a.PCTOFFLOTSHARES as a1_share_ratio,
a.ISHIS as is_history,
a.UPDATEDATE as update_date,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkShareholderFloatingTop10()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkShareholderFloatingTop10()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkShareholderTop10(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_SHAREHOLDER'
self.dest_table = 'stk_shareholder_top10'
super(SyncStkShareholderTop10, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(20) NOT NULL,
pub_date DATE NOT NULL,
end_date DATE NOT NULL,
company_id VARCHAR(10) NOT NULL,
shareholder_id VARCHAR(10) DEFAULT NULL,
shareholder_name VARCHAR(200) NOT NULL,
shareholder_class VARCHAR(10) NOT NULL,
sharesnature VARCHAR(10) NOT NULL,
shareholder_rank NUMERIC(10,0) NOT NULL,
sharesnature_id VARCHAR(100) DEFAULT NULL,
share_number NUMERIC(26,2) DEFAULT NULL,
share_ratio NUMERIC(8,4) DEFAULT NULL,
share_pledge_freeze NUMERIC(26,2) DEFAULT NULL,
share_change NUMERIC(26,2) DEFAULT NULL,
is_history INT DEFAULT NULL,
update_date DATE DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`end_date`,`shareholder_name`,`shareholder_rank`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.PUBLISHDATE as pub_date,
a.ENDDATE as end_date,
a.COMPCODE as company_id,
a.SHHOLDERCODE as shareholder_id,
a.SHHOLDERNAME as shareholder_name,
a.SHHOLDERTYPE as shareholder_class,
a.SHHOLDERNATURE as sharesnature,
a.RANK as shareholder_rank,
a.SHARESTYPE as sharesnature_id,
a.HOLDERAMT as share_number,
a.HOLDERRTO as share_ratio,
a.PFHOLDERAMT as share_pledge_freeze,
a.CURCHG as share_change,
a.ISHIS as is_history,
a.UPDATEDATE as update_date,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkShareholderTop10()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkShareholderTop10()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkStatusChange(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_COMP_INFOCHG'
self.dest_table = 'stk_status_change'
super(SyncStkStatusChange, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(15) NOT NULL,
company_id VARCHAR(10),
pub_date DATE,
change_type_id VARCHAR(10),
begin_change DATE,
end_change DATE,
change_before TEXT DEFAULT NULL,
change_after TEXT DEFAULT NULL,
change_reason TEXT DEFAULT NULL,
isvalid INT,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`pub_date`,`change_type_id`,`begin_change`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPCODE as company_id,
a.PUBLISHDATE as pub_date,
a.CHGTYPE as change_type_id,
a.BEGINDATE as begin_change,
a.ENDDATE as end_change,
a.BECHG as change_before,
a.AFCHG as change_after,
a.CHGEXP as change_reason,
a.ISVALID as isvalid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkStatusChange()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkStatusChange()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
import sys
import numpy as np
import pandas as pd
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkXrXd(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_PRORIGHTS'
self.dest_table = 'stk_xr_xd'
super(SyncStkXrXd, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
pub_date DATE,
update_date DATE,
sec_code VARCHAR(20) NOT NULL,
symbol VARCHAR(20) NOT NULL,
company_id VARCHAR(20) NOT NULL,
divdence_year VARCHAR(20) NOT NULL,
date_type VARCHAR(10) NOT NULL,
divdence_type VARCHAR(10) NOT NULL,
rank_num INT NOT NULL,
issue_object_type VARCHAR(10) NOT NULL,
issue_object VARCHAR(400) DEFAULT NULL,
project_type VARCHAR(10) NOT NULL,
currency VARCHAR(10) NOT NULL,
equity_base_date DATE,
equity_base NUMERIC(19,0) DEFAULT NULL,
record_date DATE,
xdr_date DATE,
lasttrade_date DATE,
aftertax_earning NUMERIC(19,6) DEFAULT NULL,
qfii_aftertax_earning NUMERIC(19,6) DEFAULT NULL,
cash_begindate DATE,
cash_enddate DATE,
share_deliveryratio NUMERIC(19,10) DEFAULT NULL,
capital_transferratio NUMERIC(19,10) DEFAULT NULL,
share_donationratio NUMERIC(19,10) DEFAULT NULL,
share_arrivaldate DATE,
list_date DATE,
buyback_date DATE,
buyback_deadline date,
sharereform_date DATE,
meeting_pubdate DATE,
is_newplan INT NOT NULL,
xdr_statement VARCHAR(2000) DEFAULT NULL,
is_valid INT DEFAULT NULL,
entry_date DATE,
entry_time VARCHAR(8) DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`sec_code`,`divdence_year`,`date_type`,`divdence_type`,`rank_num`,`project_type`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
b.Exchange,
a.PUBLISHDATE as pub_date,
a.UPDATEDATE as update_date,
a.SECODE as sec_code,
a.SYMBOL as code,
a.COMPCODE as company_id,
a.DIVIYEAR as divdence_year,
a.DATETYPE as date_type,
a.DIVITYPE as divdence_type,
a.RANKNUM as rank_num,
a.GRAOBJTYPE as issue_object_type,
a.GRAOBJ as issue_object,
a.PROJECTTYPE as project_type,
a.CUR as currency,
a.SHCAPBASEDATE as equity_base_date,
a.SHCAPBASEQTY as equity_base,
a.EQURECORDDATE as record_date,
a.XDRDATE as xdr_date,
a.LASTTRADDAE as lasttrade_date,
a.AFTTAXCASHDVCNY as aftertax_earning,
a.AFTTAXCASHDVCNYQFII as qfii_aftertax_earning,
a.CASHDVARRBEGDATE as cash_begindate,
a.CASHDVARRENDDATE as cash_enddate,
a.PROBONUSRT as share_deliveryratio,
a.TRANADDRT as capital_transferratio,
a.BONUSRT as share_donationratio,
a.SHARRDATE as share_arrivaldate,
a.LISTDATE as list_date,
a.REPUBEGDATE as buyback_date,
a.REPUENDDATE as buyback_deadline,
a.ASSREPLACDATE as sharereform_date,
a.SHHDMEETRESPUBDATE as meeting_pubdate,
a.ISNEWEST as is_newplan,
a.DIVIEXPMEMO as xdr_statement,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
cast(a.tmstamp as bigint) as tmstamp
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == '001002',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkXrXd()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkXrXd()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import pdb
import sys
import os
import sqlalchemy as sa
import pandas as pd
import numpy as np
import collections
import argparse
from base_sync import BaseSync
sys.path.append('..')
from datetime import datetime, date
from sqlalchemy.orm import sessionmaker
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import config
from utillities.sync_util import SyncUtil
class SyncIndex(BaseSync):
def __init__(self):
self.sync_util = SyncUtil()
super(SyncIndex, self).__init__('index')
self.source = sa.create_engine("mssql+pymssql://read:read@192.168.100.87:1433/FCDB")
self.dir = config.RECORD_BASE_DIR + self.dest_table + '/'
def create_dest_tables(self):
create_sql = """create table `{0}`(
`id` varchar(128) NOT NULL,
`isymbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`iname` varchar(128) CHARACTER SET 'utf8' COLLATE 'utf8_general_ci' NOT NULL,
`symbol` varchar(32) NOT NULL,
`sname` varchar(128) CHARACTER SET 'utf8' COLLATE 'utf8_general_ci' NOT NULL,
`weighing` decimal(8,2) DEFAULT NULL,
PRIMARY KEY(`id`,`trade_date`,`isymbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8
PARTITION BY RANGE (to_days(trade_date))
(PARTITION p0 VALUES LESS THAN (TO_DAYS('20000101')) ENGINE = InnoDB);
""".format(self.dest_table)
self.create_table(create_sql)
self.create_index()
self.build_history_partion(2000)
def create_index(self):
session = self.dest_session()
indexs = [
'CREATE INDEX index_trade_date_isymbol_index ON `index` (trade_date, isymbol);',
'CREATE INDEX index_trade_date_symbol_index ON `index` (trade_date, symbol);'
]
for sql in indexs:
session.execute(sql)
session.commit()
session.close()
def build_history_partion(self, start_year):
print('正在生成', start_year, '年之后的历史分区表')
current_year = datetime.now().year
current_month = datetime.now().month
session = self.dest_session()
for i in range(start_year, current_year):
print(i)
for j in range(1, 13):
if j < 10:
j = '0' + str(j)
session.execute(
'''call SP_TABLE_PARTITION_AUTO('index','{0}','par_index');'''
.format(str(i) + str(j))
)
for j in range(1, current_month):
if j < 10:
j = '0' + str(j)
session.execute(
'''call SP_TABLE_PARTITION_AUTO('index','{0}','par_index');'''
.format(str(current_year) + str(j))
)
def get_index_sets(self, trade_date):
sql = """SELECT Isymbol as icode, Iexchange as iexchange, Iname as iname, Tdate as trade_date,
Symbol as code ,Exchange , Sname as sname, Weighing as weighing from FCDB.dbo.issweight
where Isymbol in ('000300','000906','000985','399005','399006','000852','000905','399102','000016')
and Tdate = '{0}';""".format(trade_date)
return pd.read_sql(sql, self.source)
def do_update(self, start_date, end_date, count, order='DESC'):
# 读取交易日
trade_sets = self.sync_util.get_trades_ago('001002', start_date, end_date, count, order)
trade_list = list(trade_sets['TRADEDATE'])
session = self.dest_session()
for trade_date in trade_list:
print(trade_date)
index_sets = self.get_index_sets(trade_date)
if index_sets.empty:
continue
try:
index_sets['symbol'] = np.where(index_sets['Exchange'] == 'CNSESH',
index_sets['code'] + '.XSHG',
index_sets['code'] + '.XSHE')
index_sets['isymbol'] = np.where(index_sets['iexchange'] == 'CNSESH',
index_sets['icode'] + '.XSHG',
index_sets['icode'] + '.XSHE')
index_sets['id'] = index_sets['symbol'] + str(trade_date) + index_sets['iexchange'] + index_sets[
'isymbol']
index_sets.drop(['iexchange', 'Exchange', 'code', 'icode'], axis=1, inplace=True)
# 本地保存
if not os.path.exists(self.dir):
os.makedirs(self.dir)
file_name = self.dir + str(trade_date) + '.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
index_sets.to_csv(file_name, encoding='UTF-8')
# 数据库保存
session.execute('''call SP_TABLE_PARTITION_AUTO('index','{0}','par_index');'''.format(trade_date))
try:
self.delete_trade_data(trade_date)
index_sets.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as sql_err:
print(sql_err.orig.msg)
self.insert_or_update(index_sets)
except Exception as e:
print(e)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=2)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = SyncIndex()
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
if args.update:
processor = SyncIndex()
processor.do_update(args.start_date, end_date, args.count)
if args.schedule:
processor = SyncIndex()
start_date = processor.get_start_date()
print('running schedule task, start date:', start_date, ';end date:', end_date)
processor.do_update(start_date, end_date, -1, '')
#!/usr/bin/env python
# coding=utf-8
import pdb
import sys
import os
import sqlalchemy as sa
import pandas as pd
import numpy as np
import collections
import argparse
from datetime import datetime, date
from sqlalchemy.orm import sessionmaker
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from base_sync import BaseSync
sys.path.append('..')
import config
from utillities.sync_util import SyncUtil
class SyncIndexDailyPrice(BaseSync):
def __init__(self):
self.sync_util = SyncUtil()
super(SyncIndexDailyPrice, self).__init__('index_daily_price')
self.dir = config.RECORD_BASE_DIR + self.dest_table + '/'
def create_dest_tables(self):
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(32) NOT NULL,
`trade_date` date NOT NULL,
`name` varchar(50) CHARACTER SET 'utf8' COLLATE 'utf8_general_ci' NOT NULL,
`pre_close` decimal(15,6) DEFAULT NULL,
`open` decimal(15,6) DEFAULT NULL,
`close` decimal(15,6) DEFAULT NULL,
`high` decimal(15,6) DEFAULT NULL,
`low` decimal(15,6) DEFAULT NULL,
`volume` decimal(20,2) DEFAULT NULL,
`money` decimal(18,3) DEFAULT NULL,
`deals` decimal(10,0) DEFAULT NULL,
`change` decimal(9,4) DEFAULT NULL,
`change_pct` decimal(8,4) DEFAULT NULL,
`tot_mkt_cap` decimal(18,4) DEFAULT NULL,
PRIMARY KEY(`id`,`trade_date`,`symbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
""".format(self.dest_table)
self.create_table(create_sql)
def get_index_sets(self, trade_date):
sql = """SELECT TRADEDATE as trade_date,
i.exchange as Exchange,
s.symbol as code,
INDEXNAME as name,
LCLOSE as pre_close,
TOPEN as 'open',
TCLOSE as 'close',
THIGH as high,
TLOW as low,
VOL as volume,
AMOUNT as money,
DEALS as deals,
CHANGE as change,
PCHG as change_pct,
TOTMKTCAP as tot_mkt_cap
from QADB.dbo.TQ_QT_INDEX i
left join TQ_OA_STCODE s on i.SECODE = s.secode
where (i.exchange = '001002' or i.exchange = '001003') and i.ISVALID = 1 and s.ISVALID = 1 and TRADEDATE = '{0}';""".format(
trade_date)
return pd.read_sql(sql, self.source)
def do_update(self, start_date, end_date, count, order='DESC'):
# 读取交易日
# 路径 index/
trade_sets = self.sync_util.get_trades_ago('001002', start_date, end_date, count, order)
trade_list = list(trade_sets['TRADEDATE'])
for trade_date in trade_list:
print(trade_date)
index_sets = self.get_index_sets(trade_date)
if index_sets.empty:
continue
try:
index_sets['symbol'] = np.where(index_sets['Exchange'] == '001002',
index_sets['code'] + '.XSHG',
index_sets['code'] + '.XSHE')
index_sets['id'] = index_sets['symbol'] + str(trade_date)
index_sets.drop(['Exchange', 'code'], axis=1, inplace=True)
# 本地保存
if not os.path.exists(self.dir):
os.makedirs(self.dir)
file_name = self.dir + str(trade_date) + '.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
index_sets.to_csv(file_name, encoding='UTF-8')
# 数据库保存
try:
self.delete_trade_data(trade_date)
index_sets.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as sql_err:
print(sql_err.orig.msg)
self.insert_or_update(index_sets)
except Exception as e:
print(e)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=2)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = SyncIndexDailyPrice()
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
if args.update:
processor = SyncIndexDailyPrice()
processor.do_update(args.start_date, end_date, args.count)
if args.schedule:
processor = SyncIndexDailyPrice()
start_date = processor.get_start_date()
print('running schedule task, start date:', start_date, ';end date:', end_date)
processor.do_update(start_date, end_date, -1, '')
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: __init__.py.py
@time: 2019-07-02 16:09
"""
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import collections
import numpy as np
import pdb
from datetime import datetime
import sqlalchemy as sa
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import argparse
from sync_fundamentals import SyncFundamentals
class SyncIncome(object):
def __init__(self):
self._sync_fun = SyncFundamentals(None, None, 'income')
def create_dest_tables(self):
self._sync_fun.create_dest_tables('income')
def create_dest_report_tables(self):
self._sync_fun.create_dest_report_tables('income_report')
def create_columns(self):
columns_list = collections.OrderedDict()
columns_list["total_operating_revenue"] = "decimal(19,4)"
columns_list["operating_revenue"] = "decimal(19,4)"
columns_list["interest_income"] = "decimal(19,4)"
columns_list["premiums_earned"] = "decimal(19,4)"
columns_list["commission_income"] = "decimal(19,4)"
columns_list["total_operating_cost"] = "decimal(19,4)"
columns_list["operating_cost"] = "decimal(19,4)"
columns_list["interest_expense"] = "decimal(19,4)"
columns_list["commission_expense"] = "decimal(19,4)"
columns_list["refunded_premiums"] = "decimal(19,4)"
columns_list["net_pay_insurance_claims"] = "decimal(19,4)"
columns_list["withdraw_insurance_contract_reserve"] = "decimal(19,4)"
columns_list["policy_dividend_payout"] = "decimal(19,4)"
columns_list["reinsurance_cost"] = "decimal(19,4)"
columns_list["operating_tax_surcharges"] = "decimal(19,4)"
columns_list["sale_expense"] = "decimal(19,4)"
columns_list["administration_expense"] = "decimal(19,4)"
columns_list["financial_expense"] = "decimal(19,4)"
columns_list["asset_impairment_loss"] = "decimal(19,4)"
columns_list["fair_value_variable_income"] = "decimal(19,4)"
columns_list["investment_income"] = "decimal(19,4)"
columns_list["invest_income_associates"] = "decimal(19,4)"
columns_list["exchange_income"] = "decimal(19,4)"
columns_list["operating_profit"] = "decimal(19,4)"
columns_list["non_operating_revenue"] = "decimal(19,4)"
columns_list["non_operating_expense"] = "decimal(19,4)"
columns_list["disposal_loss_non_current_liability"] = "decimal(19,4)"
columns_list["total_profit"] = "decimal(19,4)"
columns_list["income_tax_expense"] = "decimal(19,4)"
columns_list["net_profit"] = "decimal(19,4)"
columns_list["np_parent_company_owners"] = "decimal(19,4)"
columns_list["minority_profit"] = "decimal(19,4)"
columns_list["basic_eps"] = "decimal(19,4)"
columns_list["diluted_eps"] = "decimal(19,4)"
columns_list["other_composite_income"] = "decimal(19,4)"
columns_list["total_composite_income"] = "decimal(19,4)"
columns_list["ci_parent_company_owners"] = "decimal(19,4)"
columns_list["ci_minority_owners"] = "decimal(19,4)"
columns_list = collections.OrderedDict(sorted(columns_list.items(), key=lambda t: t[0]))
time_columns = 'P.ENDDATE'
del_columns = ['code', 'EXCHANGE', 'SType', 'ReportStyle', 'year']
sub_columns = ['total_operating_revenue', 'operating_revenue', 'interest_income', 'premiums_earned',
'commission_income', 'total_operating_cost', 'operating_cost', 'interest_expense',
'commission_expense', 'refunded_premiums', 'net_pay_insurance_claims',
'withdraw_insurance_contract_reserve', 'policy_dividend_payout', 'reinsurance_cost',
'operating_tax_surcharges', 'sale_expense', 'administration_expense', 'financial_expense',
'asset_impairment_loss', 'fair_value_variable_income', 'investment_income',
'invest_income_associates', 'exchange_income', 'operating_profit', 'non_operating_revenue',
'non_operating_expense', 'disposal_loss_non_current_liability', 'total_profit',
'income_tax_expense', 'net_profit', 'np_parent_company_owners', 'minority_profit', 'basic_eps',
'diluted_eps', 'other_composite_income', 'total_composite_income', 'ci_parent_company_owners',
'ci_minority_owners'] # 需要拿出来算单季
self._sync_fun.set_columns(columns_list, self.create_sql(), time_columns, del_columns, sub_columns)
self._sync_fun.set_change_symbol(self.change_symbol)
def create_sql(self):
sql = """select S.Symbol AS code,S.Exchange AS EXCHANGE, S.SType, P.PublishDate AS pub_date,
P.ENDDATE AS report_date,P.REPORTTYPE AS ReportStyle, REPORTYEAR as year,
P.BIZTOTINCO as total_operating_revenue,
P.BIZINCO as operating_revenue,
P.INTEINCO as interest_income,
P.EARNPREM as premiums_earned,
P.POUNINCO as commission_income,
P.BIZTOTCOST as total_operating_cost,
P.BIZCOST as operating_cost,
P.INTEEXPE as interest_expense,
P.POUNEXPE as commission_expense,
P.SURRGOLD as refunded_premiums,
P.COMPNETEXPE as net_pay_insurance_claims,
P.CONTRESS as withdraw_insurance_contract_reserve,
P.POLIDIVIEXPE as policy_dividend_payout,
P.REINEXPE as reinsurance_cost,
P.BIZTAX as operating_tax_surcharges,
P.SALESEXPE as sale_expense,
P.MANAEXPE as administration_expense,
P.FINEXPE as financial_expense,
P.ASSEIMPALOSS as asset_impairment_loss,
P.VALUECHGLOSS as fair_value_variable_income,
P.INVEINCO as investment_income,
P.ASSOINVEPROF as invest_income_associates,
P.EXCHGGAIN as exchange_income,
P.PERPROFIT as operating_profit,
P.NONOREVE as non_operating_revenue,
P.NONOEXPE as non_operating_expense,
P.NONCASSETSDISL as disposal_loss_non_current_liability,
P.TOTPROFIT as total_profit,
P.INCOTAXEXPE as income_tax_expense,
P.NETPROFIT as net_profit,
P.PARENETP as np_parent_company_owners,
P.MINYSHARRIGH as minority_profit,
P.BASICEPS as basic_eps,
P.DILUTEDEPS as diluted_eps,
P.OTHERCOMPINCO as other_composite_income,
P.COMPINCOAMT as total_composite_income,
P.PARECOMPINCO as ci_parent_company_owners,
P.MINYSHARINCO as ci_minority_owners
from QADB.dbo.TQ_FIN_PROINCSTATEMENTNEW AS P JOIN FCDB.dbo.SecurityCode as S on
S.CompanyCode = P.COMPCODE
where P.REPORTTYPE={0} AND S.SType='{1}' and S.Enabled=0 and S.Status=0 AND """.format(1,
'EQA')
return sql
def change_symbol(self, trades_date_df):
return np.where(trades_date_df['EXCHANGE'] == 'CNSESH',
trades_date_df['code'] + '.XSHG',
trades_date_df['code'] + '.XSHE')
def update_report(self, start_date, end_date, count):
self._sync_fun.update_report(start_date, end_date, count)
def do_update(self, start_date, end_date, count, order='DESC'):
self._sync_fun.do_update(start_date, end_date, count, order)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=2)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--report', type=bool, default=False)
parser.add_argument('--rebuild_report', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = SyncIncome()
processor.create_columns()
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
elif args.update:
processor = SyncIncome()
processor.create_columns()
processor.do_update(args.start_date, end_date, args.count)
elif args.rebuild_report:
processor = SyncIncome()
processor.create_columns()
processor.create_dest_report_tables()
processor.update_report(args.start_date, end_date, args.count)
elif args.report:
processor = SyncIncome()
processor.create_columns()
processor.update_report(args.start_date, end_date, args.count)
if args.schedule:
processor = SyncIncome()
processor.create_columns()
start_date = processor._sync_fun.get_start_date(processor._sync_fun._table_name, 'trade_date')
print('running schedule task, start date:', start_date, ';end date:', end_date)
processor.do_update(start_date, end_date, -1, '')
processor.create_columns()
start_date = processor._sync_fun.get_start_date(processor._sync_fun._table_name + '_report', 'report_date')
print('running schedule report task, start date:', start_date, ';end date:', end_date)
processor.update_report(start_date, end_date, -1)
\ No newline at end of file
#!/usr/bin/env python
# coding=utf-8
import collections
import numpy as np
import pdb
from datetime import datetime
import sqlalchemy as sa
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import argparse
from sync_fundamentals import SyncFundamentals
class SyncIndicator(object):
def __init__(self):
self._sync_fun = SyncFundamentals(sa.create_engine("mssql+pymssql://read:read@192.168.100.64:1433/QADB"),
None, 'indicator')
def create_dest_tables(self):
self._sync_fun.create_dest_tables('indicator')
def create_dest_report_tables(self):
self._sync_fun.create_dest_report_tables('indicator_report')
def create_columns(self):
columns_list = collections.OrderedDict()
columns_list["eps"] = "decimal(19,4)"
columns_list["adjusted_profit"] = "decimal(19,4)"
columns_list["operating_profit"] = "decimal(19,4)"
columns_list["value_change_profit"] = "decimal(19,4)"
columns_list["roe"] = "decimal(19,4)"
columns_list["inc_return"] = "decimal(19,4)"
columns_list["roa"] = "decimal(19,4)"
columns_list["net_profit_margin"] = "decimal(19,4)"
columns_list["gross_profit_margin"] = "decimal(19,4)"
columns_list["expense_to_total_revenue"] = "decimal(19,4)"
columns_list["operation_profit_to_total_revenue"] = "decimal(19,4)"
columns_list["net_profit_to_total_revenue"] = "decimal(19,4)"
columns_list["operating_expense_to_total_revenue"] = "decimal(19,4)"
columns_list["ga_expense_to_total_revenue"] = "decimal(19,4)"
columns_list["financing_expense_to_total_revenue"] = "decimal(19,4)"
columns_list["operating_profit_to_profit"] = "decimal(19,4)"
columns_list["invesment_profit_to_profit"] = "decimal(19,4)"
columns_list["adjusted_profit_to_profit"] = "decimal(19,4)"
columns_list["goods_sale_and_service_to_revenue"] = "decimal(19,4)"
columns_list["ocf_to_revenue"] = "decimal(19,4)"
columns_list["ocf_to_operating_profit"] = "decimal(19,4)"
columns_list["inc_total_revenue_year_on_year"] = "decimal(19,4)"
columns_list["inc_total_revenue_annual"] = "decimal(19,4)"
columns_list["inc_revenue_year_on_year"] = "decimal(19,4)"
columns_list["inc_revenue_annual"] = "decimal(19,4)"
columns_list["inc_operation_profit_year_on_year"] = "decimal(19,4)"
columns_list["inc_operation_profit_annual"] = "decimal(19,4)"
columns_list["inc_net_profit_year_on_year"] = "decimal(19,4)"
columns_list["inc_net_profit_annual"] = "decimal(19,4)"
columns_list["inc_net_profit_to_shareholders_year_on_year"] = "decimal(19,4)"
columns_list["inc_net_profit_to_shareholders_annual"] = "decimal(19,4)"
columns_list = collections.OrderedDict(sorted(columns_list.items(), key=lambda t: t[0]))
time_columns = 'a.ENDDATE'
del_columns = ['code', 'EXCHANGE', 'SType', 'year']
sub_columns = ['eps',
'adjusted_profit',
'operating_profit',
'value_change_profit'
] # 需要拿出来算单季
self._sync_fun.set_columns(columns_list, self.create_sql(), time_columns, del_columns, sub_columns)
self._sync_fun.set_change_symbol(self.change_symbol)
def create_sql(self):
sql = """select S.Symbol AS code,S.Exchange AS EXCHANGE, S.SType,a.REPORTYEAR as year,
a.FIRSTPUBLISHDATE as pub_date,
a.ENDDATE AS report_date,
a.EPSBASIC as eps,
a.NPCUT as adjusted_profit,
b.NOPI as operating_profit,
b.NVALCHGIT as value_change_profit,
a.ROEDILUTED as roe,
b.ROEDILUTEDCUT as inc_return,
b.ROAAANNUAL as roa,
b.SNPMARGINCONMS as net_profit_margin,
b.SGPMARGIN as gross_profit_margin,
c.OCOI as expense_to_total_revenue,
b.OPPRORT as operation_profit_to_total_revenue,
c.PROFITRATIO as net_profit_to_total_revenue,
b.OPEXPRT as operating_expense_to_total_revenue,
b.MGTEXPRT as ga_expense_to_total_revenue,
b.FINLEXPRT as financing_expense_to_total_revenue,
b.OPANITOTP as operating_profit_to_profit,
b.NVALCHGITOTP as invesment_profit_to_profit,
b.ROEDILUTEDCUT as adjusted_profit_to_profit,
b.SCASHREVTOOPIRT as goods_sale_and_service_to_revenue,
b.OPANCFTOOPNI as ocf_to_operating_profit,
b.TAGRT as inc_total_revenue_year_on_year,
c.OPERINYOYB as inc_revenue_year_on_year,
c.OPERPROYOYB as inc_operation_profit_year_on_year,
c.NETPROFITYOYB as inc_net_profit_year_on_year,
c.NETINPNRPLYOYB as inc_net_profit_to_shareholders_year_on_year
from TQ_FIN_PROFINMAININDEX a
left join TQ_FIN_PROINDICDATA b
on a.COMPCODE=b.COMPCODE and a.REPORTYEAR=b.REPORTYEAR and b.REPORTTYPE=3 and a.REPORTDATETYPE=b.REPORTDATETYPE
left join TQ_FIN_PROINDICDATASUB c
on a.COMPCODE=c.COMPCODE and a.REPORTYEAR=c.REPORTYEAR and c.REPORTTYPE=3 and a.REPORTDATETYPE=c.REPORTDATETYPE
JOIN FCDB.dbo.SecurityCode as S
on S.CompanyCode = a.COMPCODE
where a.REPORTTYPE={0} AND S.SType='{1}' and S.Enabled=0 and S.Status=0 AND """.format(1,
'EQA')
return sql
def change_symbol(self, trades_date_df):
return np.where(trades_date_df['EXCHANGE'] == 'CNSESH',
trades_date_df['code'] + '.XSHG',
trades_date_df['code'] + '.XSHE')
def update_report(self, start_date, end_date, count):
self._sync_fun.update_report(start_date, end_date, count)
def do_update(self, start_date, end_date, count, order='DESC'):
self._sync_fun.do_update(start_date, end_date, count, order)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=2)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--report', type=bool, default=False)
parser.add_argument('--rebuild_report', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = SyncIndicator()
processor.create_columns()
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
elif args.update:
processor = SyncIndicator()
processor.create_columns()
processor.do_update(args.start_date, end_date, args.count)
elif args.rebuild_report:
processor = SyncIndicator()
processor.create_columns()
processor.create_dest_report_tables()
processor.update_report(args.start_date, end_date, args.count)
elif args.report:
processor = SyncIndicator()
processor.create_columns()
processor.update_report(args.start_date, end_date, args.count)
if args.schedule:
processor = SyncIndicator()
processor.create_columns()
start_date = processor._sync_fun.get_start_date(processor._sync_fun._table_name, 'trade_date')
print('running schedule task, start date:', start_date, ';end date:', end_date)
processor.do_update(start_date, end_date, -1, '')
processor.create_columns()
start_date = processor._sync_fun.get_start_date(processor._sync_fun._table_name + '_report', 'report_date')
print('running schedule report task, start date:', start_date, ';end date:', end_date)
processor.update_report(start_date, end_date, -1)
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import pdb
import sys
import os
import sqlalchemy as sa
import pandas as pd
import numpy as np
import collections
import argparse
from datetime import datetime, date, time
from sqlalchemy.orm import sessionmaker
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from base_sync import BaseSync
sys.path.append('..')
from utillities.sync_util import SyncUtil
import config
class SyncSkDailyPrice(BaseSync):
def __init__(self):
super(SyncSkDailyPrice, self).__init__('sk_daily_price')
self.sync_util = SyncUtil()
self.dir = config.RECORD_BASE_DIR + self.dest_table + '/'
def create_dest_tables(self):
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(32) NOT NULL,
`trade_date` date NOT NULL,
`name` varchar(50) CHARACTER SET 'utf8' COLLATE 'utf8_general_ci' NOT NULL,
`pre_close` decimal(15,6) DEFAULT NULL,
`open` decimal(15,6) DEFAULT NULL,
`close` decimal(15,6) DEFAULT NULL,
`high` decimal(15,6) DEFAULT NULL,
`low` decimal(15,6) DEFAULT NULL,
`volume` decimal(20,2) DEFAULT NULL,
`money` decimal(18,3) DEFAULT NULL,
`deals` decimal(10,0) DEFAULT NULL,
`change` decimal(9,4) DEFAULT NULL,
`change_pct` decimal(8,4) DEFAULT NULL,
`tot_mkt_cap` decimal(18,4) DEFAULT NULL,
`turn_rate` decimal(9,4) DEFAULT NULL,
`factor` decimal(9,4) DEFAULT NULL,
`ltd_factor` decimal(9,4) DEFAULT NULL,
PRIMARY KEY(`id`,`trade_date`,`symbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
""".format(self.dest_table)
self.create_table(create_sql)
def get_index_sets(self, trade_date):
# sql = """SELECT TRADEDATE as trade_date,
# i.exchange as Exchange,
# s.symbol as code,
# i.SENAME as name,
# LCLOSE as pre_close,
# TOPEN as 'open',
# TCLOSE as 'close',
# THIGH as high,
# TLOW as low,
# VOL as volume,
# AMOUNT as money,
# DEALS as deals,
# CHANGE as change,
# PCHG as change_pct,
# TOTMKTCAP as tot_mkt_cap,
# TURNRATE as turn_rate,
# n.R as factor,
# n.LTDR as ltd_factor
# from QADB.dbo.TQ_QT_SKDAILYPRICE i
# left join TQ_OA_STCODE s on i.SECODE = s.secode
# left join FCDB.dbo.DISPARA_NEW n on n.symbol = s.symbol and n.TDATE = '{0}' and n.etl_isvalid=1
# where (i.exchange = '001002' or i.exchange = '001003') and i.ISVALID = 1 and s.ISVALID = 1 and TRADEDATE = '{0}';""".format(
# trade_date)
sql = """select a.*,
b.R as factor,
b.LTDR as ltd_factor
from (
SELECT TRADEDATE as trade_date,
i.exchange as Exchange,
s.symbol as code,
i.SENAME as name,
LCLOSE as pre_close,
TOPEN as 'open',
TCLOSE as 'close',
THIGH as high,
TLOW as low,
VOL as volume,
AMOUNT as money,
DEALS as deals,
CHANGE as change,
PCHG as change_pct,
TOTMKTCAP as tot_mkt_cap,
TURNRATE as turn_rate
from QADB.dbo.TQ_QT_SKDAILYPRICE i
left join TQ_OA_STCODE s on i.SECODE = s.secode
where (i.exchange = '001002' or i.exchange = '001003') and i.ISVALID = 1 and s.ISVALID = 1 and TRADEDATE ='{0}'
) a
left join (select * from FCDB.dbo.DISPARA_NEW n where n.TDATE= '{0}' and n.etl_isvalid=1) b
on b.symbol = a.code;""".format(trade_date)
return pd.read_sql(sql, self.source)
def do_update(self, start_date, end_date, count, order='DESC'):
# 读取交易日
trade_sets = self.sync_util.get_trades_ago('001002', start_date, end_date, count, order)
trade_list = list(trade_sets['TRADEDATE'])
for trade_date in trade_list:
print(trade_date)
index_sets = self.get_index_sets(trade_date)
if index_sets.empty:
continue
try:
index_sets['symbol'] = np.where(index_sets['Exchange'] == '001002',
index_sets['code'] + '.XSHG',
index_sets['code'] + '.XSHE')
index_sets['id'] = index_sets['symbol'] + str(trade_date)
index_sets.drop(['Exchange', 'code'], axis=1, inplace=True)
# 本地保存
if not os.path.exists(self.dir):
os.makedirs(self.dir)
file_name = self.dir + str(trade_date) + '.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
index_sets.to_csv(file_name, encoding='UTF-8')
# 数据库保存
try:
self.delete_trade_data(trade_date)
index_sets.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as sql_err:
print(sql_err.orig.msg)
self.insert_or_update(index_sets)
except Exception as e:
print(e)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=2)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = SyncSkDailyPrice()
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
if args.update:
processor = SyncSkDailyPrice()
processor.do_update(args.start_date, end_date, args.count)
if args.schedule:
processor = SyncSkDailyPrice()
start_date = processor.get_start_date()
print('running schedule task, start date:', start_date, ';end date:', end_date)
processor.do_update(start_date, end_date, -1, '')
#!/usr/bin/env python
# coding=utf-8
import pdb
import os
import sys
import sqlalchemy as sa
import pandas as pd
import numpy as np
import collections
import argparse
from datetime import datetime, date
from sqlalchemy.orm import sessionmaker
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from base_sync import BaseSync
sys.path.append('..')
from utillities.sync_util import SyncUtil
import config
class SyncIndustry(BaseSync):
def __init__(self):
self.sync_util = SyncUtil()
super(SyncIndustry, self).__init__('sw_industry')
self.source = sa.create_engine("mssql+pymssql://HF_read:read@192.168.100.165:1433/FCDB")
self.dir = config.RECORD_BASE_DIR + self.dest_table + '/'
def create_dest_tables(self):
create_sql = """create table `{0}`(
`id` varchar(128) NOT NULL,
`isymbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`iname` varchar(128) CHARACTER SET 'utf8' COLLATE 'utf8_general_ci' NOT NULL,
`symbol` varchar(32) NOT NULL,
`sname` varchar(128) CHARACTER SET 'utf8' COLLATE 'utf8_general_ci' NOT NULL,
`weighing` decimal(8,2) DEFAULT NULL,
PRIMARY KEY(`id`,`trade_date`,`isymbol`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
""".format(self.dest_table)
self.create_table(create_sql)
def get_sw_industry(self):
sql_pe = """(SELECT Symbol from FCDB.dbo.iprofile where
Iprofile7 ='申万一级行业指数' or Iprofile7 ='申万二级行业指数'
or Iprofile7 ='申万三级行业指数')"""
return pd.read_sql(sql_pe, self.source)
def get_index_sets(self, trade_date, industry_sets):
sql = """select Isymbol as isymbol,Tdate as trade_date,
Iname as iname, Symbol as code,Exchange,
Sname as sname, Weighing as weighing from FCDB.dbo.issweight where
Isymbol in {1} and Tdate = '{0}';""".format(trade_date, industry_sets)
sql = sql.replace('[', '(')
sql = sql.replace(']', ')')
return pd.read_sql(sql, self.source)
def do_update(self, start_date, end_date, count, order='DESC'):
# 读取交易日
trade_sets = self.sync_util.get_trades_ago('001002', start_date, end_date, count, order)
sw_industry = self.get_sw_industry()
trade_list = list(trade_sets['TRADEDATE'])
for trade_date in trade_list:
print(trade_date)
index_sets = self.get_index_sets(trade_date, list(sw_industry['Symbol'].astype('str')))
if index_sets.empty:
continue
try:
index_sets['symbol'] = np.where(index_sets['Exchange'] == 'CNSESH',
index_sets['code'] + '.XSHG',
index_sets['code'] + '.XSHE')
index_sets['id'] = index_sets['symbol'] + str(trade_date) + index_sets['isymbol']
index_sets.drop(['Exchange', 'code'], axis=1, inplace=True)
# 本地保存
if not os.path.exists(self.dir):
os.makedirs(self.dir)
file_name = self.dir + str(trade_date) + '.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
index_sets.to_csv(file_name, encoding='UTF-8')
# 数据库保存
try:
self.delete_trade_data(trade_date)
index_sets.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as sql_err:
print(sql_err.orig.msg)
self.insert_or_update(index_sets)
except Exception as e:
print(e)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=2)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = SyncIndustry()
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
if args.update:
processor = SyncIndustry()
processor.do_update(args.start_date, end_date, args.count)
if args.schedule:
processor = SyncIndustry()
start_date = processor.get_start_date()
print('running schedule task, start date:', start_date, ';end date:', end_date)
processor.do_update(start_date, end_date, -1, '')
#!/usr/bin/env python
# coding=utf-8
import datetime
import os
import sys
import pdb
import sqlalchemy as sa
import pandas as pd
import numpy as np
from sqlalchemy.orm import sessionmaker
sys.path.append('..')
import config
class TmImportUtils(object):
def __init__(self, source, destination, source_table, dest_table):
# 源数据库
self.source = source
# 目标数据库
self.destination = destination
# 目标数据库Session
self.dest_session = sessionmaker(bind=self.destination, autocommit=False, autoflush=True)
self.source_table = source_table
self.dest_table = dest_table
self._dir = config.RECORD_BASE_DIR
self._secondary_func = None
def create_dest_tables(self, create_sql):
drop_sql = """drop table if exists `{0}`;""".format(self.dest_table)
session = self.dest_session()
session.execute(drop_sql)
session.execute(create_sql)
session.commit()
session.close()
def get_max_tm_source(self):
sql = 'select max(cast(tmstamp as bigint))as tm from ' + self.source_table
trades_sets = pd.read_sql(sql, self.source)
tm = 0
if not trades_sets.empty:
tm = trades_sets['tm'][0]
return tm
def get_min_tm_source(self):
sql = 'select min(cast(tmstamp as bigint))as tm from ' + self.source_table
trades_sets = pd.read_sql(sql, self.source)
tm = 0
if not trades_sets.empty:
tm = trades_sets['tm'][0]
return tm
def get_max_tm_log(self):
sql = """select max_tag from update_log where task_name='{0}'""".format(self.dest_table)
trades_sets = pd.read_sql(sql, self.destination)
tm = 0
if not trades_sets.empty:
tm = trades_sets['max_tag'][0]
return tm
def update_update_log(self, tm):
session = self.dest_session()
sql = """insert into update_log (task_name,max_tag) values ('{0}',{1})
ON DUPLICATE KEY UPDATE task_name='{0}',max_tag={1}""".format(self.dest_table, tm)
sql = sql.replace('\n', '')
session.execute(sql)
session.commit()
session.close()
print('更新', self.dest_table, '的max_tag为', tm)
def create_report_date(self, min_year, max_year):
report_date_list = []
start_date = min_year - 1
while start_date < max_year:
report_date_list.append(start_date * 10000 + 331)
report_date_list.append(start_date * 10000 + 630)
report_date_list.append(start_date * 10000 + 930)
report_date_list.append(start_date * 10000 + 1231)
start_date += 1
report_date_list.sort()
return report_date_list
# 遗留问题,专门生成以日期为文件的报告期数据
def update_report(self, count, end_date, sql):
sql += ' and a.EndDate = {0}'
max_year = int(end_date / 10000)
min_year = max_year - count
report_date_list = self.create_report_date(min_year, max_year)
for report_date in report_date_list:
report_fundamentals = pd.read_sql(sql.format(report_date), self.source)
if report_fundamentals.empty:
continue
report_fundamentals['symbol'] = np.where(report_fundamentals['Exchange'] == 'CNSESH',
report_fundamentals['code'] + '.XSHG',
report_fundamentals['code'] + '.XSHE')
report_fundamentals.drop(['Exchange', 'code'], axis=1, inplace=True)
# 二次加工
if self._secondary_func is not None:
report_fundamentals = self._secondary_func(report_fundamentals)
# 本地保存
if not os.path.exists(self._dir):
os.makedirs(self._dir)
file_name = self._dir + self.dest_table + '/' + str(report_date) + '.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
report_fundamentals.to_csv(self._dir + self.dest_table + '/' + str(report_date) + '.csv',
encoding='UTF-8')
print(self._dir + self.dest_table + '/' + str(report_date) + '.csv')
\ No newline at end of file
#!/usr/bin/env python
# coding=utf-8
import os
import sys
import pdb
import argparse
from datetime import datetime
sys.path.append('..')
import config
from utillities.sync_util import SyncUtil
class SyncTradeDate(object):
def __init__(self):
self._unit = SyncUtil()
self._dir = config.RECORD_BASE_DIR + 'trade_date/'
def do_update(self, start_date, end_date, count):
trade_sets = self._unit.get_trades_ago('001002', start_date, end_date, count)
trade_sets.rename(columns={'TRADEDATE': 'trade_date'}, inplace=True)
# 本地保存
file_name = self._dir + 'trade_date.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
trade_sets.to_csv(file_name, encoding='utf-8')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=-1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
if args.end_date == 0:
end_date = int(str(datetime.now().date()).replace('-', ''))
else:
end_date = args.end_date
processor = SyncTradeDate()
# processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
#!/usr/bin/env python
# coding=utf-8
import pdb
import os
import sys
import sqlalchemy as sa
import pandas as pd
import numpy as np
import collections
import argparse
from base_sync import BaseSync
sys.path.append('..')
from utillities.sync_util import SyncUtil
from datetime import datetime, date
from sqlalchemy.orm import sessionmaker
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import config
class SyncValuation(BaseSync):
def __init__(self):
self.sync_util = SyncUtil()
super(SyncValuation, self).__init__('valuation')
self.dir = config.RECORD_BASE_DIR + self.dest_table + '/'
def create_dest_tables(self):
create_sql = """create table `{0}`(
`id` varchar(32) NOT NULL,
`symbol` varchar(24) NOT NULL,
`trade_date` date NOT NULL,
`market_cap` decimal(19,4) DEFAULT NULL,
`circulating_market_cap` decimal(19,4) DEFAULT NULL,
`turnover_ratio` decimal(9,4) DEFAULT NULL,
`pb` decimal(19,4) DEFAULT NULL,
`pe_lfy` decimal(19,4) DEFAULT NULL,
`pe` decimal(19,4) DEFAULT NULL,
`ps_lfy` decimal(19,4) DEFAULT NULL,
`ps` decimal(19,4) DEFAULT NULL,
`pcf` decimal(19,4) DEFAULT NULL,
`capitalization` decimal(19,4) DEFAULT NULL,
`circulating_cap` decimal(19,4) DEFAULT NULL,
PRIMARY KEY(`id`,`symbol`,`trade_date`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
""".format(self.dest_table)
self.create_table(create_sql)
def get_valutaion(self, trade_date):
sql = """select a.SYMBOL as code,
a.TRADEDATE as trade_date,
a.TOTMKTCAP as market_cap,
a.NEGOTIABLEMV as circulating_market_cap,
a.TURNRATE as turnover_ratio,
a.PETTM as pe,
a.PELFY as pe_lfy,
a.PB as pb,
a.PSTTM as ps,
a.PSLFY as ps_lfy,
a.PCTTM as pcf,
b.TOTALSHARE as capitalization,
b.MKTSHARE as circulating_cap,
c.Exchange
from TQ_SK_FININDIC a left join TQ_SK_DQUOTEINDIC b
on a.SYMBOL=b.SYMBOL and a.TRADEDATE=b.TRADEDATE
left join TQ_OA_STCODE c
on a.SECODE = c.SECODE and a.SYMBOL = c.SYMBOL
where a.TRADEDATE='{0}';""".format(trade_date)
return pd.read_sql(sql, self.source)
def do_update(self, start_date, end_date, count, order='DESC'):
# 读取交易日
trade_sets = self.sync_util.get_trades_ago('001002', start_date, end_date, count, order)
trade_list = list(trade_sets['TRADEDATE'])
for trade_date in trade_list:
print(trade_date)
index_sets = self.get_valutaion(trade_date)
if index_sets.empty:
continue
try:
index_sets['symbol'] = np.where(index_sets['Exchange'] == '001002',
index_sets['code'] + '.XSHG',
index_sets['code'] + '.XSHE')
index_sets['id'] = index_sets['symbol'] + str(trade_date)
index_sets.drop(['Exchange', 'code'], axis=1, inplace=True)
# 本地保存
if not os.path.exists(self.dir):
os.makedirs(self.dir)
file_name = self.dir + str(trade_date) + '.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
index_sets.to_csv(file_name, encoding='UTF-8')
try:
self.delete_trade_data(trade_date)
index_sets.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as sql_err:
print(sql_err.orig.msg)
self.insert_or_update(index_sets)
except Exception as e:
print(e)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=2)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.end_date == 0:
end_date = int(datetime.now().date().strftime('%Y%m%d'))
else:
end_date = args.end_date
if args.rebuild:
processor = SyncValuation()
processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
if args.update:
processor = SyncValuation()
processor.do_update(args.start_date, end_date, args.count)
if args.schedule:
processor = SyncValuation()
start_date = processor.get_start_date()
print('running schedule task, start date:', start_date, ';end date:', end_date)
processor.do_update(start_date, end_date, -1, '')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment