Commit 5bb72e6c authored by 子恒's avatar 子恒

[Optimize]优化流程。

parent 96f4c4d5
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker
import numpy as np
import config
......@@ -14,7 +15,7 @@ class FactorDetailDoc(object):
self._destination = sa.create_engine(db_url)
self._destsession = sessionmaker(bind=self._destination, autocommit=False, autoflush=True)
self._factor_detail_columns = ['factor_type', 'factor_name', 'factor_en_name', 'factor_cn_name', 'direction',
'description','latxt_description', 'unit', 'view_dimension','tag']
'description', 'latxt_description', 'unit', 'view_dimension', 'tag']
def _exec_sql(self, sql):
session = self._destsession()
......@@ -29,6 +30,9 @@ class FactorDetailDoc(object):
def update_factor_detail(self, df):
df = df[self._factor_detail_columns]
df['tag'] = df['tag'].fillna(0)
df['tag'] = df['tag'].astype('int').astype('str')
df['tag'] = df['tag'].replace('0', np.nan)
df.to_sql(name='factor_detail', con=self._destination, if_exists='append', index=False)
def update(self):
......
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import pandas as pd
import numpy as np
import sqlalchemy as sa
......@@ -12,72 +9,60 @@ from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, and_
import config
# In[2]:
file_name = ['rl_data1.xlsx', 'rl_data2.xlsx']
# In[3]:
merge_data = []
for file in file_name:
print('doc/' + file)
if file == 'rl_data1.xlsx':
df = pd.read_excel('doc/' + file, skiprows=1)[['因子名称', '因子中文', '类别', '方向']].rename(
columns={'因子名称': 'factor_name', '因子中文': 'factor_cn_name', '类别': 'factor_type', '方向': 'direction'})
else:
df = pd.read_excel('doc/' + file, skiprows=1)[['因子名', '因子中文名', '因子类别', '方向']].rename(
columns={'因子名': 'factor_name', '因子中文名': 'factor_cn_name', '因子类别': 'factor_type', '方向': 'direction'})
merge_data.append(df)
merge_data = pd.concat(merge_data)
# In[4]:
DB_URL = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.rl_db_user,
config.rl_db_pwd,
config.rl_db_host,
config.rl_db_port,
config.rl_db_database)
# In[5]:
_engine = sa.create_engine(DB_URL)
_session = sessionmaker(bind=_engine, autocommit=False, autoflush=True)
_base = automap_base()
_base.prepare(_engine, reflect=True)
# In[6]:
FactorDetail = _base.classes['factor_detail']
query = select([FactorDetail]).where(
and_(
FactorDetail.flag == 1
))
factor_detail = pd.read_sql(query, _engine)
factor_detail = factor_detail.drop(['direction'], axis=1)
# In[7]:
loads_data = merge_data[['factor_name', 'direction']].merge(factor_detail, on=['factor_name'],
).drop_duplicates(
subset=['factor_name'], keep='first').set_index('factor_name')
# In[ ]:
for factor_name in loads_data.index.tolist():
direction = loads_data.loc[factor_name]['direction']
print(factor_name, direction)
if np.isnan(direction):
continue
session = _session()
session.execute(
'''update {0} set flag = 1, direction = {1}, is_verify=1 where factor_name=\'{2}\' and flag=1'''.format(
'factor_detail', int(loads_data.loc[factor_name]['direction']), factor_name))
session.commit()
session.close()
class FactorDirect(object):
def __init__(self):
self.file_name = ['rl_data1.xlsx', 'rl_data2.xlsx']
def update_direct(self):
merge_data = []
for file in self.file_name:
print('doc/' + file)
if file == 'rl_data1.xlsx':
df = pd.read_excel('doc/' + file, skiprows=1)[['因子名称', '因子中文', '类别', '方向']].rename(
columns={'因子名称': 'factor_name', '因子中文': 'factor_cn_name', '类别': 'factor_type', '方向': 'direction'})
else:
df = pd.read_excel('doc/' + file, skiprows=1)[['因子名', '因子中文名', '因子类别', '方向']].rename(
columns={'因子名': 'factor_name', '因子中文名': 'factor_cn_name', '因子类别': 'factor_type', '方向': 'direction'})
merge_data.append(df)
merge_data = pd.concat(merge_data)
DB_URL = '''mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'''.format(config.rl_db_user,
config.rl_db_pwd,
config.rl_db_host,
config.rl_db_port,
config.rl_db_database)
_engine = sa.create_engine(DB_URL)
_session = sessionmaker(bind=_engine, autocommit=False, autoflush=True)
_base = automap_base()
_base.prepare(_engine, reflect=True)
FactorDetail = _base.classes['factor_detail']
query = select([FactorDetail]).where(
and_(
FactorDetail.flag == 1
))
factor_detail = pd.read_sql(query, _engine)
factor_detail = factor_detail.drop(['direction'], axis=1)
loads_data = merge_data[['factor_name', 'direction']].merge(factor_detail, on=['factor_name'],
).drop_duplicates(
subset=['factor_name'], keep='first').set_index('factor_name')
for factor_name in loads_data.index.tolist():
direction = loads_data.loc[factor_name]['direction']
print(factor_name, direction)
if np.isnan(direction):
continue
session = _session()
session.execute(
'''update {0} set flag = 1, direction = {1}, is_verify=1 where factor_name=\'{2}\' and flag=1'''.format(
'factor_detail', int(loads_data.loc[factor_name]['direction']), factor_name))
session.commit()
session.close()
if __name__ == '__main__':
factor_direct = FactorDirect()
factor_direct.update_direct()
cd /app/factor-detail && git pull
cd /app/factor-detail && python factor_detail_doc.py
cd /app/factor-detail && python factor_direct.py
cd /app/basic-data/sync/data_check && python default.py --table='factor_info' --check=True && python default.py --table='factor_detail' --check=True
cd /app/basic-data/sync/publish && python default.py --source_table='factor_info' --update=True && python default.py --source_table='factor_detail' --update=True
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment