Commit d42da92f authored by 李煜's avatar 李煜

fisrt commit

parent 086c1f72
Pipeline #57 failed with stages
# Created by .ignore support plugin (hsz.mobi)
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.idea/
/ultron/
This diff is collapsed.
import pdb
from alphamind.api import *
from PyFin.api import *
from PyFin.api import makeSchedule
from sqlalchemy import create_engine, select, and_, or_
from sqlalchemy.pool import NullPool
from factors.models import Alpha191
import pandas as pd
import time
import datetime
import json
import sys
from factors import analysis
from ultron.cluster.invoke.cache_data import cache_data
from ultron.utilities.short_uuid import unique_machine,decode
def fetch_factor(engine191, factor_names, start_date, end_date):
db_columns = []
db_columns.append(Alpha191.trade_date)
db_columns.append(Alpha191.code)
for factor_name in factor_names:
db_columns.append(Alpha191.__dict__[factor_name])
query = select(db_columns).where(
and_(Alpha191.trade_date >= start_date, Alpha191.trade_date <= end_date, ))
return pd.read_sql(query, engine191)
def factor_combination(engine, factors, universe_name_list, start_date, end_date, freq):
universe = None
for name in universe_name_list:
if universe is None:
universe = Universe(name)
else:
universe += Universe(name)
dates = makeSchedule(start_date, end_date, freq, calendar='china.sse')
factor_negMkt = engine.fetch_factor_range(universe, "negMarketValue", dates=dates)
risk_cov, risk_factors = engine.fetch_risk_model_range(universe, dates=dates)
dx_returns = engine.fetch_dx_return_range(universe, dates=dates, horizon=map_freq(freq))
# data combination
total_data = pd.merge(factors, risk_factors, on=['trade_date', 'code'])
total_data = pd.merge(total_data, factor_negMkt, on=['trade_date', 'code'])
total_data = pd.merge(total_data, dx_returns, on=['trade_date', 'code'])
industry_category = engine.fetch_industry_range(universe, dates=dates)
total_data = pd.merge(total_data, industry_category, on=['trade_date', 'code']).dropna()
total_data.dropna(inplace=True)
return total_data
def fetch_factor_sets(**kwargs):
db_info = kwargs["db_info"]
factor_names = kwargs["factor_names"]
start_date = kwargs['start_date']
end_date = kwargs['end_date']
universe_name_list = kwargs['universe_name']
benchmark_code = kwargs['benchmark_code']
freq = kwargs['freq']
engine = SqlEngine(db_info) # alpha-mind engine
engine191 = create_engine(db_info, poolclass=NullPool)
factors = fetch_factor(engine191, factor_names, start_date, end_date)
total_data = factor_combination(engine, factors, universe_name_list, start_date, end_date, freq)
return total_data
#session = str('15609986886946081')
session = str(int(time.time() * 1000000 + datetime.datetime.now().microsecond))
alpha_list = []
for i in range(31,32):
alpha_name = 'alpha_' + str(i)
alpha_list.append(alpha_name)
db_info = 'postgresql+psycopg2://alpha:alpha@180.166.26.82:8889/alpha'
total_data = fetch_factor_sets(db_info=db_info,
factor_names=alpha_list, risk_styles=["SIZE"],
start_date='2010-01-01', end_date='2018-12-31',
universe_name=['zz500','hs300','ashare'],
benchmark_code=905,
freq='3b')
try:
diff_sets = set(total_data.columns) - set(alpha_list)
except:
import pdb
pdb.set_trace()
grouped_list = []
for alpha_name in alpha_list:
print(alpha_name, session)
#pdb.set_trace()
#print(cache_data.get_cache(session, alpha_name))
factors_list = list(diff_sets)
factors_list.append(alpha_name)
factors_sets = total_data[factors_list]
cache_data.set_cache(session, alpha_name, factors_sets.to_json(orient='records'))
analysis.factor_analysis(factor_name=alpha_name,risk_styles=['SIZE'],
benchmark_code=905,
session=session)
This diff is collapsed.
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: __init__.py.py
@time: 2019-06-30 19:04
"""
from ultron.cluster.invoke.app_engine import create_app
app = create_app('factor', ['factor.factor_growth', 'factor.historical_value'])
#!/usr/bin/env python
# coding=utf-8
RECORD_BASE_DIR = '/home/vision/data/vision/'
# RECORD_BASE_DIR = 'C://Users/zzh/git/rongliang/basic-data/file_data/'
source_db_host = '192.168.100.151'
source_db_port = '1433'
source_db_database = 'QADB'
source_db_user = 'read'
source_db_pwd = 'read'
destination_db_host = '10.15.97.128'
destination_db_port = '3306'
destination_db_database = 'vision'
destination_db_user = 'root'
destination_db_pwd = '1234'
# destination_db_host = 'db1.irongliang.com'
# destination_db_port = '3306'
# destination_db_database = 'vision'
# destination_db_user = 'rl_sync'
# destination_db_pwd = 'rl_sync_2019'
\ No newline at end of file
#!/usr/bin/env python
# coding=utf-8
import os
import sys
import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker
sys.path.append('..')
from factor.utillities.trade_date import TradeDate
from factor import config
class FactorBase(object):
def __init__(self, name):
destination_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.destination_db_user,
config.destination_db_pwd,
config.destination_db_host,
config.destination_db_port,
config.destination_db_database)
self._name = name
self._destination = sa.create_engine(destination_db)
self._dest_session = sessionmaker(bind=self._destination, autocommit=False, autoflush=True)
self._trade_date = TradeDate()
self._dir = config.RECORD_BASE_DIR + 'factor/' + str(self._name)
def _create_index(self):
session = self._dest_session()
indexs = [
'''CREATE INDEX {0}_trade_date_symbol_index ON `{0}` (trade_date, symbol);'''.format(self._name)
]
for sql in indexs:
session.execute(sql)
session.commit()
session.close()
def _create_tables(self, create_sql, drop_sql):
session = self._dest_session()
if drop_sql is not None:
session.execute(drop_sql)
session.execute(create_sql)
session.commit()
session.close()
self._create_index()
def _storage_data(self, data_flow, trade_date):
data_flow = data_flow.where(pd.notnull(data_flow), None)
data_flow = data_flow.replace([-np.inf, np.inf], 0).fillna(value=0)
# 保存本地
if not os.path.exists(self._dir):
os.makedirs(self._dir)
file_name = self._dir + '/' + str(trade_date) + '.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
data_flow.to_csv(file_name, encoding='UTF-8')
try:
self.delete_trade_data(trade_date)
data_flow.to_sql(name=self._name, con=self._destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(data_flow)
def delete_trade_data(self, trade_date):
session = self._dest_session()
session.execute('''delete from `{0}` where trade_date={1}'''.format(self._name, trade_date))
session.commit()
def insert_or_update(self, datas):
session = self._dest_session()
for i in range(datas.shape[0]):
data = datas.iloc[i]
values = ''
update = ''
title = ''
for j in range(len(data)):
index = data.index[j]
value = str(data[j]).replace("'", "\\'")
title += """`{0}`,""".format(index)
values += """'{0}',""".format(value)
update += """`{0}`='{1}',""".format(index, value)
sql = '''insert into {0} ({1}) values({2}) ON DUPLICATE KEY UPDATE {3}'''.format(self._name,
title[0:-1],
values[0:-1],
update[0:-1]
)
sql = sql.replace("'nan'", 'Null').replace("'None'", 'Null')
session.execute(sql)
session.commit()
session.close()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# -*- coding: utf-8 -*-
from sqlalchemy import BigInteger, Column, DateTime, Float, Index, Integer, String, Text, Boolean, text, JSON,TIMESTAMP
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
class Growth(Base):
__tablename__ = 'growth'
trade_date = Column(DateTime, primary_key=True, nullable=False)
code = Column(Integer, primary_key=True, nullable=False)
net_asset_grow_rate_latest = Column(Float(53))
total_asset_grow_rate_latest = Column(Float(53))
operating_revenue_grow_rate_ttm = Column(Float(53))
operating_profit_grow_rate_ttm = Column(Float(53))
total_profit_grow_rate_ttm = Column(Float(53))
net_profit_grow_rate_ttm = Column(Float(53))
np_parent_company_grow_rate = Column(Float(53))
net_profit_grow_rate_3y_ttm = Column(Float(53))
net_profit_grow_rate_5y_ttm = Column(Float(53))
operating_revenue_grow_rate_3y_ttm = Column(Float(53))
operating_revenue_grow_rate_5y_ttm = Column(Float(53))
net_cash_flow_grow_rate_ttm = Column(Float(53))
np_parent_company_cut_yoy_ttm = Column(Float(53))
growth_egro_ttm = Column(Float(53))
growth_sue_ttm = Column(Float(53))
growth_suoi_ttm = Column(Float(53))
financing_cash_grow_rate_ttm = Column(Float(53))
invest_cash_grow_rate_ttm = Column(Float(53))
oper_cash_grow_rate_ttm = Column(Float(53))
growth_sgro_ttm = Column(Float(53))
import sys
from tm_import_utils import TmImportUtils
sys.path.append('..')
import config
import sqlalchemy as sa
import pandas as pd
from sqlalchemy.orm import sessionmaker
class BaseSync(object):
def __init__(self, dest_table):
source_db = '''mssql+pymssql://{0}:{1}@{2}:{3}/{4}'''.format(config.source_db_user, config.source_db_pwd,
config.source_db_host, config.source_db_port,
config.source_db_database)
destination_db = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.destination_db_user,
config.destination_db_pwd,
config.destination_db_host,
config.destination_db_port,
config.destination_db_database)
# 源数据库
self.source = sa.create_engine(source_db)
# 目标数据库
self.destination = sa.create_engine(destination_db)
# 目标数据库Session
self.dest_session = sessionmaker(bind=self.destination, autocommit=False, autoflush=True)
self.dest_table = dest_table
def get_start_date(self):
sql = """select max(trade_date) as trade_date from `{0}`;""".format(self.dest_table)
trades_sets = pd.read_sql(sql, self.destination)
td = 20070101
if not trades_sets.empty:
td = trades_sets['trade_date'][0]
td = str(td).replace('-', '')
return td
def delete_trade_data(self, trade_date):
session = self.dest_session()
session.execute('''delete from `{0}` where trade_date={1}'''.format(self.dest_table, trade_date))
session.commit()
def create_table(self, create_sql):
drop_sql = """drop table if exists `{0}`;""".format(self.dest_table)
session = self.dest_session()
session.execute(drop_sql)
session.execute(create_sql)
session.execute(
'''alter table `{0}` add `creat_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP;'''.format(self.dest_table))
session.execute(
'''alter table `{0}` add `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;'''.format(
self.dest_table))
session.commit()
session.close()
def insert_or_update(self, datas):
session = self.dest_session()
for i in range(datas.shape[0]):
data = datas.iloc[i]
values = ''
update = ''
title = ''
for j in range(len(data)):
index = data.index[j]
value = str(data[j]).replace("'", "\\'").replace("%", "\\%")
title += """`{0}`,""".format(index)
values += """'{0}',""".format(value)
update += """`{0}`='{1}',""".format(index, value)
sql = '''insert into {0} ({1}) values({2}) ON DUPLICATE KEY UPDATE {3}'''.format(self.dest_table,
title[0:-1],
values[0:-1],
update[0:-1]
)
sql = sql.replace("'nan'", 'Null').replace("'None'", 'Null')
session.execute(sql)
session.commit()
session.close()
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
import config
class SyncStkCapitalChange(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_SHARESTRUCHG'
self.dest_table = 'stk_capital_change'
super(SyncStkCapitalChange, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT,
symbol VARCHAR(20),
company_id VARCHAR(10),
pub_date Date,
begin_date Date,
end_date Date,
total_shares NUMERIC(15,6),
floating_shares NUMERIC(15,6),
floating_ashares NUMERIC(15,6),
floating_bshares NUMERIC(15,6),
floating_hshares NUMERIC(15,6),
other_floating_shares NUMERIC(15,6),
restrict_floating_shares NUMERIC(15,6),
restrict_floating_ashares NUMERIC(15,6),
non_floating_ashares NUMERIC(15,6),
free_floating_shares NUMERIC(15,6),
b_shares NUMERIC(15,6),
exdividend_date Date,
`explain` VARCHAR(100),
change_type VARCHAR(2),
change_reason VARCHAR(1000),
is_valid INT,
entry_date DATE,
entry_time VARCHAR(8),
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`begin_date`,`exdividend_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPCODE as company_id,
a.PUBLISHDATE as pub_date,
a.BEGINDATE as begin_date,
a.ENDDATE as end_date,
a.TOTALSHARE as total_shares,
a.CIRCSKAMT as floating_shares,
a.CIRCAAMT as floating_ashares,
a.CIRCBAMT as floating_bshares,
a.CIRCHAMT as floating_hshares,
a.OTHERCIRCAMT as other_floating_shares,
a.LIMSKAMT as restrict_floating_shares,
a.RECIRCAAMT as restrict_floating_ashares,
a.NCIRCAMT as non_floating_ashares,
a.FCIRCAAMT as free_floating_shares,
a.BSK as b_shares,
a.EXRIGHTDATE as exdividend_date,
a.EXRIGHTEXP as explain,
a.SKCHGTYPE as change_type,
a.SHCHGRSN as change_reason,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 2000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkCapitalChange()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkCapitalChange()
processor.do_update()
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncCompanyInfo(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_COMP_INFO'
self.dest_table = 'stk_company_info'
super(SyncCompanyInfo, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(15) NOT NULL,
pub_date Date DEFAULT NULL,
company_id VARCHAR(10) NOT NULL,
full_name VARCHAR(200) DEFAULT NULL,
short_name VARCHAR(100) DEFAULT NULL,
english_name_full VARCHAR(300) DEFAULT NULL,
english_name VARCHAR(300) DEFAULT NULL,
type1 VARCHAR(10) DEFAULT NULL,
type2 VARCHAR(10) DEFAULT NULL,
islist INT DEFAULT NULL,
isbranche INT DEFAULT NULL,
establish_date Date DEFAULT NULL,
type VARCHAR(10) DEFAULT NULL,
reg_capital NUMERIC(19,2) DEFAULT NULL,
auth_share NUMERIC(19,0) DEFAULT NULL,
currency VARCHAR(10) DEFAULT NULL,
org_code VARCHAR(20) DEFAULT NULL,
region VARCHAR(10) DEFAULT NULL,
country VARCHAR(10) DEFAULT NULL,
chairman VARCHAR(100) DEFAULT NULL,
ceo VARCHAR(100) DEFAULT NULL,
leger VARCHAR(100) DEFAULT NULL,
secretary VARCHAR(50) DEFAULT NULL,
secretary_phone VARCHAR(100) DEFAULT NULL,
secretary_email VARCHAR(100) DEFAULT NULL,
security_representative VARCHAR(50) DEFAULT NULL,
lawfirm VARCHAR(100) DEFAULT NULL,
cpafirm VARCHAR(100) DEFAULT NULL,
business_scale VARCHAR(10) DEFAULT NULL,
register_location VARCHAR(200) DEFAULT NULL,
zipcode VARCHAR(20) DEFAULT NULL,
office VARCHAR(200) DEFAULT NULL,
telephone VARCHAR(100) DEFAULT NULL,
fax VARCHAR(100) DEFAULT NULL,
email VARCHAR(100) DEFAULT NULL,
website VARCHAR(100) DEFAULT NULL,
pub_url VARCHAR(100) DEFAULT NULL,
description TEXT DEFAULT NULL,
business_scope TEXT DEFAULT NULL,
main_business TEXT DEFAULT NULL,
license_number VARCHAR(50) DEFAULT NULL,
live_status VARCHAR(10) DEFAULT NULL,
live_begindate Date DEFAULT NULL,
live_enddate Date DEFAULT NULL,
is_valid INT NOT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) NOT NULL,
total_employees INT DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
b.Symbol as code,
b.Exchange,
a.PUBLISHDATE as pub_date,
a.COMPCODE as company_id,
a.COMPNAME as full_name,
a.COMPSNAME as short_name,
a.ENGNAME as english_name_full,
a.COMPSNAME as english_name,
a.COMPTYPE1 as type1,
a.COMPTYPE2 as type2,
a.ISLIST as islist,
a.ISBRANCH as isbranche,
a.FOUNDDATE as establish_date,
a.ORGTYPE as type,
a.REGCAPITAL as reg_capital,
a.AUTHCAPSK as auth_share,
a.CUR as currency,
a.ORGCODE as org_code,
a.REGION as region,
a.COUNTRY as country,
a.CHAIRMAN as chairman,
a.MANAGER as ceo,
a.LEGREP as leger,
a.BSECRETARY as secretary,
a.BSECRETARYTEL as secretary_phone,
a.BSECRETARYMAIL as secretary_email,
a.SEAFFREPR as security_representative,
a.LECONSTANT as lawfirm,
a.ACCFIRM as cpafirm,
a.BIZSCALE as business_scale,
a.REGADDR as register_location,
a.REGPTCODE as zipcode,
a.OFFICEADDR as office,
a.COMPTEL as telephone,
a.COMPFAX as fax,
a.COMPEMAIL as email,
a.COMPURL as website,
a.DISURL as pub_url,
a.COMPINTRO as description,
a.BIZSCOPE as business_scope,
a.MAJORBIZ as main_business,
a.BIZLICENSENO as license_number,
a.COMPSTATUS as live_status,
a.EXISTBEGDATE as live_begindate,
a.EXISTENDDATE as live_enddate,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
a.WORKFORCE as total_employees,
cast(a.tmstamp as bigint) as tmstamp
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncCompanyInfo()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncCompanyInfo()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkEmployeeInfo(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_COMP_EMPLOYEE'
self.dest_table = 'stk_employee_info'
super(SyncStkEmployeeInfo, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id   int NOT NULL,
symbol VARCHAR(20) NOT NULL,
company_name   VARCHAR(200) NOT NULL,
company_id   varchar(10) NOT NULL,
end_date   Date NOT NULL,
pub_date   Date DEFAULT NULL,
work_dorce   int DEFAULT NULL,
product_num   int DEFAULT NULL,
sales_num   int DEFAULT NULL,
financial_num   int DEFAULT NULL,
tech_num   int DEFAULT NULL,
reasearch_num   int DEFAULT NULL,
admin_nunm   int DEFAULT NULL,
retire_num   int DEFAULT NULL,
other_num   int DEFAULT NULL,
doctor_num   int DEFAULT NULL,
posrg_num   int DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`end_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPNAME as company_name,
a.COMPCODE as company_id,
a.ENDDATE as end_date,
a.DECLAREDATE as pub_date,
a.WORKFORCE as work_dorce,
a.PRODUCTIONSTAFF as product_num,
a.SALESPERSONS as sales_num,
a.FINANCIALSTAFF as financial_num,
a.TECHNICALSTAFF as tech_num,
a.RESEARCHSTAFF as reasearch_num,
a.ADMTRATIVESTAFF as admin_nunm,
a.RETIREESTAFF as retire_num,
a.OTHERSTAFF as other_num,
a.DRNUM as doctor_num,
a.POSTGRAD as posrg_num,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkEmployeeInfo()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkEmployeeInfo()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkExd(BaseSync):
def __init__(self, source=None, destination=None):
self.dest_table = 'stk_exd'
self.source_table = 'TQ_SK_XDRY'
super(SyncStkExd,self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id   INT NOT NULL,
symbol   VARCHAR(20) NOT NULL,
begin_date   DATE NOT NULL,
end_date   DATE NOT NULL,
exd_factor   NUMERIC(32,19) DEFAULT NULL,
back_exd   NUMERIC(29,16) DEFAULT NULL,
direct_exd   NUMERIC(29,16) DEFAULT NULL,
is_valid   INT NOT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`begin_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
create_sql = " ".join(create_sql.split())
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.BEGINDATE as begin_date,
a.ENDDATE as end_date,
a.XDY as exd_factor,
a.LTDXDY as back_exd,
a.THELTDXDY as direct_exd,
a.ISVALID as is_valid,
cast(a.tmstamp as bigint) as tmstamp,
b.Exchange,
b.SYMBOL as code
from TQ_SK_XDRY a
left join TQ_OA_STCODE as b
on b.SECODE = a.SECODE
where b.ISVALID=1 and b.LISTSTATUS=1 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkExd()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkExd()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkFinForcast(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_EXPTPERFORMANCE'
self.dest_table = 'stk_fin_forcast'
super(SyncStkFinForcast, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(20) NOT NULL,
company_id VARCHAR(10) NOT NULL,
pub_date DATE NOT NULL,
source VARCHAR(10) NOT NULL,
begin_date DATE NOT NULL,
end_date DATE NOT NULL,
base_begin_date DATE NOT NULL,
base_end_date DATE NOT NULL,
operating_income_estimate NUMERIC(18,6) DEFAULT NULL,
operating_income_increas_estimate NUMERIC(15,6) DEFAULT NULL,
operating_income_text VARCHAR(400) DEFAULT NULL,
operating_income_mark VARCHAR(10) DEFAULT NULL,
operating_profit_estimate NUMERIC(18,6) DEFAULT NULL,
operating_profit_increase_estimate NUMERIC(15,6) DEFAULT NULL,
operating_profit_text VARCHAR(400) DEFAULT NULL,
operating_profit_mark VARCHAR(10) DEFAULT NULL,
net_profit_top NUMERIC(18,6) DEFAULT NULL,
net_profit_bottom NUMERIC(18,6) DEFAULT NULL,
net_profit_increas_top NUMERIC(18,6) DEFAULT NULL,
net_profit_increas_bottom NUMERIC(18,6) DEFAULT NULL,
net_profit_estimate_top VARCHAR(10) DEFAULT NULL,
net_profit_estimate_bottom VARCHAR(10) DEFAULT NULL,
net_profit_estimate_text VARCHAR(400) DEFAULT NULL,
eps_top NUMERIC(15,6) DEFAULT NULL,
eps_bottom NUMERIC(15,6) DEFAULT NULL,
eps_estimate_top VARCHAR(10) DEFAULT NULL,
eps_estimate_bottom VARCHAR(10) DEFAULT NULL,
isvalid INT DEFAULT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) NOT NULL,
currency VARCHAR(10) DEFAULT NULL,
eps_increase_estimate_top NUMERIC(15,6) DEFAULT NULL,
eps_increase_estimate_bottom NUMERIC(15,6) DEFAULT NULL,
exp_year VARCHAR(4) DEFAULT NULL,
exp_type VARCHAR(10) DEFAULT NULL,
report_type VARCHAR(10) DEFAULT NULL,
estimate_origin_text TEXT DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`pub_date`,`source`,`begin_date`,`end_date`,`base_begin_date`,`base_end_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPCODE as company_id,
a.PUBLISHDATE as pub_date,
a.DATASOURCE as source,
a.SESSIONBEGDATE as begin_date,
a.SESSIONENDDATE as end_date,
a.BASESSIONBEGDATE as base_begin_date,
a.BASESSIONENDDATE as base_end_date,
a.OPERMINCOME as operating_income_estimate,
a.OPERMINCOMEINC as operating_income_increas_estimate,
a.OPERMINCOMEDES as operating_income_text,
a.OPERMINCOMEMK as operating_income_mark,
a.OPERMPROFIT as operating_profit_estimate,
a.OPERMPROFITINC as operating_profit_increase_estimate,
a.OPERMPROFITDES as operating_profit_text,
a.OPERMPROFITMK as operating_profit_mark,
a.RETAMAXPROFITS as net_profit_top,
a.RETAMINPROFITS as net_profit_bottom,
a.RETAMAXPROFITSINC as net_profit_increas_top,
a.RETAMINPROFITSINC as net_profit_increas_bottom,
a.RETAMAXPROFITSMK as net_profit_estimate_top,
a.RETAMINPROFITSMK as net_profit_estimate_bottom,
a.RETAPROFITSDES as net_profit_estimate_text,
a.EPSMAXFORE as eps_top,
a.EPSMINFORE as eps_bottom,
a.EPSMAXFOREMK as eps_estimate_top,
a.EPSMINFOREMK as eps_estimate_bottom,
a.ISVALID as isvalid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
a.CUR as currency,
a.EPSMAXFOREINC as eps_increase_estimate_top,
a.EPSMINFOREINC as eps_increase_estimate_bottom,
a.EXPTYEAR as exp_year,
a.EXPTTYPE as exp_type,
a.GLOBALEXPTMOD as report_type,
a.EXPTORIGTEXT as estimate_origin_text,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkFinForcast()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkFinForcast()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkHolderNum(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_SHAREHOLDERNUM'
self.dest_table = 'stk_holder_num'
super(SyncStkHolderNum, self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(15) NOT NULL,
company_id VARCHAR(10) NOT NULL,
end_date DATE NOT NULL,
pub_date DATE DEFAULT NULL,
share_holders NUMERIC(10,0) DEFAULT NULL,
total_share NUMERIC(19,0) DEFAULT NULL,
share_holders_ave NUMERIC(19,4) DEFAULT NULL,
share_holders_ave_ratio NUMERIC(10,6) DEFAULT NULL,
a_share_holders NUMERIC(10,0) DEFAULT NULL,
a_share NUMERIC(19,0) DEFAULT NULL,
a_share_holders_ave NUMERIC(19,4) DEFAULT NULL,
a_share_holders_ave_ratio NUMERIC(10,6) DEFAULT NULL,
b_share_holders NUMERIC(10,0) DEFAULT NULL,
h_share_holders NUMERIC(10,0) DEFAULT NULL,
memo VARCHAR(400) DEFAULT NULL,
is_valid INT DEFAULT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) NOT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`end_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
b.Symbol as code,
b.Exchange,
a.COMPCODE as company_id,
a.ENDDATE as end_date,
a.PUBLISHDATE as pub_date,
a.TOTALSHAMT as share_holders,
a.TOTALSHARE as total_share,
a.KAVGSH as share_holders_ave,
a.HOLDPROPORTIONPACC as share_holders_ave_ratio,
a.ASKSHAMT as a_share_holders,
a.ASK as a_share,
a.ASKAVGSH as a_share_holders_ave,
a.AHOLDPROPORTIONPACC as a_share_holders_ave_ratio,
a.BSKSHAMT as b_share_holders,
a.HSKSHAMT as h_share_holders,
a.MEMO as memo,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
cast(a.tmstamp as bigint) as tmstamp
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkHolderNum()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkHolderNum()
processor.do_update()
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncStkManagementInfo(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_COMP_MANAGER'
self.dest_table = 'stk_management_info'
super(SyncStkManagementInfo,self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
symbol VARCHAR(15) NOT NULL,
update_date DATE NOT NULL,
company_code VARCHAR(20) NOT NULL,
Job_attribute VARCHAR(10) NOT NULL,
job_code VARCHAR(10) NOT NULL,
job_mode VARCHAR(10) NOT NULL,
job_name VARCHAR(100) NOT NULL,
person_code VARCHAR(20) NOT NULL,
name VARCHAR(100) NOT NULL,
board_session INT DEFAULT NULL,
employment_session INT DEFAULT NULL,
status VARCHAR(10) DEFAULT NULL,
begin_date DATE NOT NULL,
end_date DATE NOT NULL,
dimission_reason VARCHAR(10) DEFAULT NULL,
is_dimission INT DEFAULT NULL,
memo VARCHAR(1000) DEFAULT NULL,
is_valid INT NOT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) NOT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`,`person_code`,`job_name`,`begin_date`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.UPDATEDATE as update_date,
a.COMPCODE as company_code,
a.POSTYPE as Job_attribute,
a.DUTYCODE as job_code,
a.DUTYMOD as job_mode,
a.ACTDUTYNAME as job_name,
a.PERSONALCODE as person_code,
a.CNAME as name,
a.MGENTRYS as board_session,
a.DENTRYS as employment_session,
a.NOWSTATUS as status,
a.BEGINDATE as begin_date,
a.ENDDATE as end_date,
a.DIMREASON as dimission_reason,
a.ISRELDIM as is_dimission,
a.MEMO as memo,
a.ISVALID as is_valid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
cast(a.tmstamp as bigint) as tmstamp,
b.Symbol as code,
b.Exchange
from {1} a
left join FCDB.dbo.SecurityCode as b
on b.CompanyCode = a.COMPCODE
where b.SType ='EQA' and b.Enabled=0 and b.Status=0 """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['Exchange'] == 'CNSESH',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['Exchange', 'code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncStkManagementInfo()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncStkManagementInfo()
processor.do_update()
#!/usr/bin/env python
# coding=utf-8
import argparse
from datetime import datetime
import pdb
import sqlalchemy as sa
import numpy as np
import pandas as pd
from sqlalchemy.orm import sessionmaker
import sys
sys.path.append('..')
sys.path.append('../..')
from sync.base_sync import BaseSync
from sync.tm_import_utils import TmImportUtils
class SyncSecurityInfo(BaseSync):
def __init__(self, source=None, destination=None):
self.source_table = 'TQ_SK_BASICINFO'
self.dest_table = 'stk_security_info'
super(SyncSecurityInfo,self).__init__(self.dest_table)
self.utils = TmImportUtils(self.source, self.destination, self.source_table, self.dest_table)
# 创建目标表
def create_dest_tables(self):
self.utils.update_update_log(0)
create_sql = """create table {0}
(
id INT NOT NULL,
company_id VARCHAR(20) NOT NULL,
symbol VARCHAR(20) NOT NULL,
exchange VARCHAR(10) NOT NULL,
security_type VARCHAR(10) NOT NULL,
short_name VARCHAR(100) NOT NULL,
english_name VARCHAR(100) DEFAULT NULL,
decnum INT DEFAULT NULL,
currency VARCHAR(10) NOT NULL,
isin_code VARCHAR(20) DEFAULT NULL,
sedol_code VARCHAR(20) DEFAULT NULL,
pairvalue NUMERIC(19,2) DEFAULT NULL,
total_shares NUMERIC(15,6) DEFAULT NULL,
lists_tatus VARCHAR(10) NOT NULL,
list_date DATE NOT NULL,
ipo_price NUMERIC(9,4) DEFAULT NULL,
delist_date DATE NOT NULL,
delist_price NUMERIC(9,4) DEFAULT NULL,
sfc_industry1_code VARCHAR(10) DEFAULT NULL,
sfc_industry1_name VARCHAR(100) DEFAULT NULL,
sfc_industry2_code VARCHAR(10) DEFAULT NULL,
sfc_industry2_name VARCHAR(100) DEFAULT NULL,
gics_industry1_code VARCHAR(10) DEFAULT NULL,
gics_industry1_name VARCHAR(100) DEFAULT NULL,
gics_industry2_code VARCHAR(10) DEFAULT NULL,
gics_industry2_name VARCHAR(100) DEFAULT NULL,
sw_industry1_code VARCHAR(10) DEFAULT NULL,
sw_industry1_name VARCHAR(100) DEFAULT NULL,
sw_industry2_code VARCHAR(10) DEFAULT NULL,
sw_industry2_name VARCHAR(100) DEFAULT NULL,
csi_industry1_code VARCHAR(10) DEFAULT NULL,
csi_industry1_name VARCHAR(100) DEFAULT NULL,
csi_industry2_code VARCHAR(10) DEFAULT NULL,
csi_industry2_name VARCHAR(100) DEFAULT NULL,
province_code VARCHAR(10) DEFAULT NULL,
province_name VARCHAR(100) DEFAULT NULL,
city_code VARCHAR(10) DEFAULT NULL,
city_name VARCHAR(100) DEFAULT NULL,
isvalid INT DEFAULT NULL,
entry_date DATE NOT NULL,
entry_time VARCHAR(8) DEFAULT NULL,
value_currency VARCHAR(10) DEFAULT NULL,
tmstamp bigint not null,
PRIMARY KEY(`symbol`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;""".format(self.dest_table)
create_sql = create_sql.replace('\n', '')
self.create_table(create_sql)
def get_sql(self, type):
sql = """select {0}
a.ID as id,
a.COMPCODE as company_id,
a.SYMBOL as code,
a.EXCHANGE as exchange,
a.SETYPE as security_type,
a.SESNAME as short_name,
a.SEENGNAME as english_name,
a.DECNUM as decnum,
a.CUR as currency,
a.ISINCODE as isin_code,
a.SEDOLCODE as sedol_code,
a.PARVALUE as pairvalue,
a.TOTALSHARE as total_shares,
a.LISTSTATUS as lists_tatus,
a.LISTDATE as list_date,
a.LISTOPRICE as ipo_price,
a.DELISTDATE as delist_date,
a.DELISTCPRICE as delist_price,
a.CSRCLEVEL1CODE as sfc_industry1_code,
a.CSRCLEVEL1NAME as sfc_industry1_name,
a.CSRCLEVEL2CODE as sfc_industry2_code,
a.CSRCLEVEL2NAME as sfc_industry2_name,
a.GICSLEVEL1CODE as gics_industry1_code,
a.GICSLEVEL1NAME as gics_industry1_name,
a.GICSLEVEL2CODE as gics_industry2_code,
a.GICSLEVEL2NAME as gics_industry2_name,
a.SWLEVEL1CODE as sw_industry1_code,
a.SWLEVEL1NAME as sw_industry1_name,
a.SWLEVEL2CODE as sw_industry2_code,
a.SWLEVEL2NAME as sw_industry2_name,
a.CSILEVEL1CODE as csi_industry1_code,
a.CSILEVEL1NAME as csi_industry1_name,
a.CSILEVEL2CODE as csi_industry2_code,
a.CSILEVEL2NAME as csi_industry2_name,
a.PROVINCECODE as province_code,
a.PROVINCENAME as province_name,
a.CITYCODE as city_code,
a.CITYNAME as city_name,
a.ISVALID as isvalid,
a.ENTRYDATE as entry_date,
a.ENTRYTIME as entry_time,
a.VALUECUR as value_currency,
cast(a.tmstamp as bigint) as tmstamp
from {1} a
where a.LISTSTATUS=1 and (a.EXCHANGE='001002' or a.EXCHANGE='001003') """
if type == 'report':
sql = sql.format('', self.source_table)
return sql
elif type == 'update':
sql += 'and cast(a.tmstamp as bigint) > {2} order by a.tmstamp'
return sql
def get_datas(self, tm):
print('正在查询', self.source_table, '表大于', tm, '的数据')
sql = self.get_sql('update')
sql = sql.format('top 10000', self.source_table, tm).replace('\n', '')
trades_sets = pd.read_sql(sql, self.source)
return trades_sets
def update_table_data(self, tm):
while True:
result_list = self.get_datas(tm)
if not result_list.empty:
result_list['symbol'] = np.where(result_list['exchange'] == '001002',
result_list['code'] + '.XSHG',
result_list['code'] + '.XSHE')
result_list.drop(['code'], axis=1, inplace=True)
try:
result_list.to_sql(name=self.dest_table, con=self.destination, if_exists='append', index=False)
except Exception as e:
print(e.orig.msg)
self.insert_or_update(result_list)
max_tm = result_list['tmstamp'][result_list['tmstamp'].size - 1]
self.utils.update_update_log(max_tm)
tm = max_tm
else:
break
def do_update(self):
max_tm = self.utils.get_max_tm_source()
log_tm = self.utils.get_max_tm_log()
if max_tm > log_tm:
self.update_table_data(log_tm)
def update_report(self, count, end_date):
self.utils.update_report(count, end_date, self.get_sql('report'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--update', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
processor = SyncSecurityInfo()
processor.create_dest_tables()
processor.do_update()
elif args.update:
processor = SyncSecurityInfo()
processor.do_update()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: ??
@author: li
@file: __init__.py.py
@time: 2019-07-02 16:09
"""
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import os
import sys
import pdb
import argparse
from datetime import datetime
sys.path.append('..')
import config
from utillities.sync_util import SyncUtil
class SyncTradeDate(object):
def __init__(self):
self._unit = SyncUtil()
self._dir = config.RECORD_BASE_DIR + 'trade_date/'
def do_update(self, start_date, end_date, count):
trade_sets = self._unit.get_trades_ago('001002', start_date, end_date, count)
trade_sets.rename(columns={'TRADEDATE': 'trade_date'}, inplace=True)
# 本地保存
file_name = self._dir + 'trade_date.csv'
if os.path.exists(str(file_name)):
os.remove(str(file_name))
trade_sets.to_csv(file_name, encoding='utf-8')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--start_date', type=int, default=20070101)
parser.add_argument('--end_date', type=int, default=0)
parser.add_argument('--count', type=int, default=-1)
parser.add_argument('--rebuild', type=bool, default=False)
parser.add_argument('--schedule', type=bool, default=False)
args = parser.parse_args()
if args.rebuild:
if args.end_date == 0:
end_date = int(str(datetime.now().date()).replace('-', ''))
else:
end_date = args.end_date
processor = SyncTradeDate()
# processor.create_dest_tables()
processor.do_update(args.start_date, end_date, args.count)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python
# coding=utf-8
import time
class TimeCommon(object):
@classmethod
def get_end_time(cls, end_date, end_time):
s = time.mktime(time.strptime(str(end_date) + ' ' + str(end_time), '%Y%m%d %H:%M:%S'))
return int(s)
This diff is collapsed.
from ultron.cluster.invoke.submit_tasks import submit_task
submit_task.submit_packet('ly100002', 'factor')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment