# coding:utf-8 from datetime import datetime as dt import socket import pandas as pd import numpy as np from sqlalchemy import create_engine, text from jqdatasdk import * import pymysql import multiprocessing as mp from multiprocessing import freeze_support import math import talib as ta import os import traceback import random import logging from myindicator import myind import psutil from apscheduler.schedulers.blocking import BlockingScheduler # 显示最大行与列 pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 创建连接池 engine = create_engine( 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8') engine_tech = create_engine( 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8') engine_tech2 = create_engine( 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_tech?charset=utf8') def err_call_back(err): logging.error(f'进程池出错~ error:{str(err)}') traceback.print_exc() def tech_anal(stock, fre, hlfx_pool, hlfx_pool_daily, err_list): t_signals = 0 global engine global engine_tech global engine_tech2 try: con_engine = engine.connect() con_engine_tech = engine_tech.connect() con_engine_tech2 = engine_tech2.connect() try: table_name = f'{stock}_{fre}' # 从engine中读取table_name表存入df df = pd.read_sql_table(table_name, con=engine) df.dropna(axis=0, how='any') except BaseException as e: print(f"{stock}读取有问题") traceback.print_exc() err_list.append(stock) else: if len(df) != 0: # 计算技术指标 try: myind.get_macd_data(df) df_temp, t_signals = myind.get_hlfx(df) myind.get_ris(df) myind.get_bias(df) myind.get_wilr(df) df = pd.merge(df, df_temp, on='time', how='left') df['HL'].fillna(value='-', inplace=True) df = df.reset_index(drop=True) except BaseException as e: print(f'{stock}计算有问题', e) else: try: df = df.replace([np.inf, -np.inf], np.nan) df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace') df.to_sql('%s_1d' % stock, con=engine_tech2, index=False, if_exists='replace') except BaseException as e: print(f'{stock}存储有问题', e) traceback.print_exc() err_list.append(stock) else: err_list.append(stock) print(f'{stock}数据为空') finally: if stock in hlfx_pool and t_signals == 2: hlfx_pool.remove(stock) elif stock not in hlfx_pool and t_signals == 1: hlfx_pool.append(stock) hlfx_pool_daily.append(stock) con_engine.close() con_engine_tech.close() con_engine_tech2.close() # print(f"{stock}, {T_signals}, '\n', {df_temp.head(20)}") # print(f'{stock}计算完成!') except Exception as e: logging.error(f'子进程{os.getpid()}问题在这里~~ error:{str(e)}') traceback.print_exc() engine.dispose() engine_tech.dispose() engine_tech2.dispose() # 分割列表 def split_list(lst, num_parts): avg = len(lst) // num_parts rem = len(lst) % num_parts partitions = [] start = 0 for i in range(num_parts): end = start + avg + (1 if i < rem else 0) partitions.append(lst[start:end]) start = end return partitions # 多进程实现技术指标计算 def ind(): # 记录开始时间 start_time = dt.now() fre = '1d' num_cpus = mp.cpu_count() print(f"{socket.gethostname()}共有{num_cpus}个核心\n{start_time.strftime('%Y-%m-%d %H:%M:%S')}开始计算{fre}技术指标") # 连接数据库 获取股票列表 conn_engine_hlfx_pool = create_engine( 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8') con_engine_hlfx_pool = conn_engine_hlfx_pool.connect() # stocks = xtdata.get_stock_list_in_sector('沪深A股') stocks = pd.read_sql_query( text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",") con_engine_hlfx_pool.close() print(f'股票列表长度为{len(stocks)}') err_list, hlfx_pool, hlfx_pool_daily = mp.Manager().list(), mp.Manager().list(), mp.Manager().list() # 定义共享列表 # 多进程执行tech_anal方法 pool = mp.Pool(processes=num_cpus) # 保存AsyncResult对象的列表 async_results = [] for stock in stocks: async_result = pool.apply_async(tech_anal, args=(stock, fre, hlfx_pool, hlfx_pool_daily, err_list,), error_callback=err_call_back) async_results.append(async_result) pool.close() pool.join() # 统计返回为 None 的结果数量 none_count = 0 for i, result_async in enumerate(async_results): result = result_async.get() # 获取任务的结果 # print(f"The result of task {i} is: {result}") if result is None: none_count += 1 print( f"共计算{none_count}/{i+1},\n当日信号:{len(hlfx_pool_daily)},\n持续检测为:{len(hlfx_pool)}, \n错误列表:{err_list}") # 将list转换为字符串 results_list = ','.join(set(hlfx_pool)) results_list_daily = ','.join(set(hlfx_pool_daily)) # 建立数据库连接 db_pool = pymysql.connect(host='localhost', user='root', port=3307, password='r6kEwqWU9!v3', database='hlfx_pool') db_pool2 = pymysql.connect(host='localhost', user='root', port=3308, password='r6kEwqWU9!v3', database='hlfx_pool') # 将list插入数据库 cursor = db_pool.cursor() cursor2 = db_pool2.cursor() sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list) sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list_daily) try: cursor.execute(sql) cursor.execute(sql2) cursor2.execute(sql) cursor2.execute(sql2) db_pool.commit() db_pool2.commit() except Exception as e: print(f'1d存入有问题', e) # db_pool.rollback() finally: cursor.close() db_pool.close() cursor2.close() db_pool2.close() # 记录结束时间 end_time = dt.now() print(f"运行时间:{end_time - start_time}") if __name__ == '__main__': logger = mp.log_to_stderr() # logger.setLevel(logging.DEBUG) freeze_support() # 创建一个0-23的列表,用于设置cpu亲和度 cpu_list = list(range(24)) pus = psutil.Process() pus.cpu_affinity(cpu_list) ind()