| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- # coding:utf-8
- from datetime import datetime as dt
- import socket
- import pandas as pd
- import numpy as np
- from sqlalchemy import create_engine, text
- from jqdatasdk import *
- import pymysql
- import multiprocessing as mp
- from multiprocessing import freeze_support
- import math
- import talib as ta
- import os
- import traceback
- import random
- import logging
- from myindicator import myind
- import psutil
- from apscheduler.schedulers.blocking import BlockingScheduler
- # 显示最大行与列
- pd.set_option('display.max_rows', None)
- pd.set_option('display.max_columns', None)
- # 设置日志
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- # 创建连接池
- engine = create_engine(
- 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
- engine_tech = create_engine(
- 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
- engine_tech2 = create_engine(
- 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_tech?charset=utf8')
- def err_call_back(err):
- logging.error(f'进程池出错~ error:{str(err)}')
- traceback.print_exc()
- def tech_anal(stock, fre, hlfx_pool, hlfx_pool_daily, err_list):
- t_signals = 0
- global engine
- global engine_tech
- global engine_tech2
- try:
- con_engine = engine.connect()
- con_engine_tech = engine_tech.connect()
- con_engine_tech2 = engine_tech2.connect()
- try:
- table_name = f'{stock}_{fre}'
- # 从engine中读取table_name表存入df
- df = pd.read_sql_table(table_name, con=engine)
- df.dropna(axis=0, how='any')
- except BaseException as e:
- print(f"{stock}读取有问题")
- traceback.print_exc()
- err_list.append(stock)
- else:
- if len(df) != 0:
- # 计算技术指标
- try:
- myind.get_macd_data(df)
- df_temp, t_signals = myind.get_hlfx(df)
- myind.get_ris(df)
- myind.get_bias(df)
- myind.get_wilr(df)
- df = pd.merge(df, df_temp, on='time', how='left')
- df['HL'].fillna(value='-', inplace=True)
- df = df.reset_index(drop=True)
- except BaseException as e:
- print(f'{stock}计算有问题', e)
- else:
- try:
- df = df.replace([np.inf, -np.inf], np.nan)
- df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
- df.to_sql('%s_1d' % stock, con=engine_tech2, index=False, if_exists='replace')
- except BaseException as e:
- print(f'{stock}存储有问题', e)
- traceback.print_exc()
- err_list.append(stock)
- else:
- err_list.append(stock)
- print(f'{stock}数据为空')
- finally:
- if stock in hlfx_pool and t_signals == 2:
- hlfx_pool.remove(stock)
- elif stock not in hlfx_pool and t_signals == 1:
- hlfx_pool.append(stock)
- hlfx_pool_daily.append(stock)
- con_engine.close()
- con_engine_tech.close()
- con_engine_tech2.close()
- # print(f"{stock}, {T_signals}, '\n', {df_temp.head(20)}")
- # print(f'{stock}计算完成!')
- except Exception as e:
- logging.error(f'子进程{os.getpid()}问题在这里~~ error:{str(e)}')
- traceback.print_exc()
- engine.dispose()
- engine_tech.dispose()
- engine_tech2.dispose()
- # 分割列表
- def split_list(lst, num_parts):
- avg = len(lst) // num_parts
- rem = len(lst) % num_parts
- partitions = []
- start = 0
- for i in range(num_parts):
- end = start + avg + (1 if i < rem else 0)
- partitions.append(lst[start:end])
- start = end
- return partitions
- # 多进程实现技术指标计算
- def ind():
- # 记录开始时间
- start_time = dt.now()
- fre = '1d'
- num_cpus = mp.cpu_count()
- print(f"{socket.gethostname()}共有{num_cpus}个核心\n{start_time.strftime('%Y-%m-%d %H:%M:%S')}开始计算{fre}技术指标")
- # 连接数据库 获取股票列表
- conn_engine_hlfx_pool = create_engine(
- 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
- con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
- # stocks = xtdata.get_stock_list_in_sector('沪深A股')
- stocks = pd.read_sql_query(
- text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
- con_engine_hlfx_pool.close()
- print(f'股票列表长度为{len(stocks)}')
- err_list, hlfx_pool, hlfx_pool_daily = mp.Manager().list(), mp.Manager().list(), mp.Manager().list() # 定义共享列表
- # 多进程执行tech_anal方法
- pool = mp.Pool(processes=num_cpus)
- # 保存AsyncResult对象的列表
- async_results = []
- for stock in stocks:
- async_result = pool.apply_async(tech_anal, args=(stock, fre, hlfx_pool, hlfx_pool_daily, err_list,),
- error_callback=err_call_back)
- async_results.append(async_result)
- pool.close()
- pool.join()
- # 统计返回为 None 的结果数量
- none_count = 0
- for i, result_async in enumerate(async_results):
- result = result_async.get() # 获取任务的结果
- # print(f"The result of task {i} is: {result}")
- if result is None:
- none_count += 1
- print(
- f"共计算{none_count}/{i+1},\n当日信号:{len(hlfx_pool_daily)},\n持续检测为:{len(hlfx_pool)}, \n错误列表:{err_list}")
- # 将list转换为字符串
- results_list = ','.join(set(hlfx_pool))
- results_list_daily = ','.join(set(hlfx_pool_daily))
- # 建立数据库连接
- db_pool = pymysql.connect(host='localhost',
- user='root',
- port=3307,
- password='r6kEwqWU9!v3',
- database='hlfx_pool')
- db_pool2 = pymysql.connect(host='localhost',
- user='root',
- port=3308,
- password='r6kEwqWU9!v3',
- database='hlfx_pool')
- # 将list插入数据库
- cursor = db_pool.cursor()
- cursor2 = db_pool2.cursor()
- sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
- sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
- results_list_daily)
- try:
- cursor.execute(sql)
- cursor.execute(sql2)
- cursor2.execute(sql)
- cursor2.execute(sql2)
- db_pool.commit()
- db_pool2.commit()
- except Exception as e:
- print(f'1d存入有问题', e)
- # db_pool.rollback()
- finally:
- cursor.close()
- db_pool.close()
- cursor2.close()
- db_pool2.close()
- # 记录结束时间
- end_time = dt.now()
- print(f"运行时间:{end_time - start_time}")
- if __name__ == '__main__':
- logger = mp.log_to_stderr()
- # logger.setLevel(logging.DEBUG)
- freeze_support()
- # 创建一个0-23的列表,用于设置cpu亲和度
- cpu_list = list(range(24))
- pus = psutil.Process()
- pus.cpu_affinity(cpu_list)
- ind()
|