230715_get_indicators.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. # coding:utf-8
  2. from datetime import datetime as dt
  3. import socket
  4. import pandas as pd
  5. import numpy as np
  6. from sqlalchemy import create_engine, text
  7. from jqdatasdk import *
  8. import pymysql
  9. import multiprocessing as mp
  10. from multiprocessing import freeze_support
  11. import math
  12. import talib as ta
  13. import os
  14. import traceback
  15. import random
  16. import logging
  17. from myindicator import myind
  18. import psutil
  19. from apscheduler.schedulers.blocking import BlockingScheduler
  20. # 显示最大行与列
  21. pd.set_option('display.max_rows', None)
  22. pd.set_option('display.max_columns', None)
  23. # 设置日志
  24. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  25. # 创建连接池
  26. engine = create_engine(
  27. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_whole?charset=utf8')
  28. engine_tech = create_engine(
  29. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/qmt_stocks_tech?charset=utf8')
  30. engine_tech2 = create_engine(
  31. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_tech?charset=utf8')
  32. def err_call_back(err):
  33. logging.error(f'进程池出错~ error:{str(err)}')
  34. traceback.print_exc()
  35. def tech_anal(stock, fre, hlfx_pool, hlfx_pool_daily, err_list):
  36. t_signals = 0
  37. global engine
  38. global engine_tech
  39. global engine_tech2
  40. try:
  41. con_engine = engine.connect()
  42. con_engine_tech = engine_tech.connect()
  43. con_engine_tech2 = engine_tech2.connect()
  44. try:
  45. table_name = f'{stock}_{fre}'
  46. # 从engine中读取table_name表存入df
  47. df = pd.read_sql_table(table_name, con=engine)
  48. df.dropna(axis=0, how='any')
  49. except BaseException as e:
  50. print(f"{stock}读取有问题")
  51. traceback.print_exc()
  52. err_list.append(stock)
  53. else:
  54. if len(df) != 0:
  55. # 计算技术指标
  56. try:
  57. myind.get_macd_data(df)
  58. df_temp, t_signals = myind.get_hlfx(df)
  59. myind.get_ris(df)
  60. myind.get_bias(df)
  61. myind.get_wilr(df)
  62. df = pd.merge(df, df_temp, on='time', how='left')
  63. df['HL'].fillna(value='-', inplace=True)
  64. df = df.reset_index(drop=True)
  65. except BaseException as e:
  66. print(f'{stock}计算有问题', e)
  67. else:
  68. try:
  69. df = df.replace([np.inf, -np.inf], np.nan)
  70. df.to_sql('%s_1d' % stock, con=engine_tech, index=False, if_exists='replace')
  71. df.to_sql('%s_1d' % stock, con=engine_tech2, index=False, if_exists='replace')
  72. except BaseException as e:
  73. print(f'{stock}存储有问题', e)
  74. traceback.print_exc()
  75. err_list.append(stock)
  76. else:
  77. err_list.append(stock)
  78. print(f'{stock}数据为空')
  79. finally:
  80. if stock in hlfx_pool and t_signals == 2:
  81. hlfx_pool.remove(stock)
  82. elif stock not in hlfx_pool and t_signals == 1:
  83. hlfx_pool.append(stock)
  84. hlfx_pool_daily.append(stock)
  85. con_engine.close()
  86. con_engine_tech.close()
  87. con_engine_tech2.close()
  88. # print(f"{stock}, {T_signals}, '\n', {df_temp.head(20)}")
  89. # print(f'{stock}计算完成!')
  90. except Exception as e:
  91. logging.error(f'子进程{os.getpid()}问题在这里~~ error:{str(e)}')
  92. traceback.print_exc()
  93. engine.dispose()
  94. engine_tech.dispose()
  95. engine_tech2.dispose()
  96. # 分割列表
  97. def split_list(lst, num_parts):
  98. avg = len(lst) // num_parts
  99. rem = len(lst) % num_parts
  100. partitions = []
  101. start = 0
  102. for i in range(num_parts):
  103. end = start + avg + (1 if i < rem else 0)
  104. partitions.append(lst[start:end])
  105. start = end
  106. return partitions
  107. # 多进程实现技术指标计算
  108. def ind():
  109. # 记录开始时间
  110. start_time = dt.now()
  111. fre = '1d'
  112. num_cpus = mp.cpu_count()
  113. print(f"{socket.gethostname()}共有{num_cpus}个核心\n{start_time.strftime('%Y-%m-%d %H:%M:%S')}开始计算{fre}技术指标")
  114. # 连接数据库 获取股票列表
  115. conn_engine_hlfx_pool = create_engine(
  116. 'mysql+pymysql://root:r6kEwqWU9!v3@localhost:3307/hlfx_pool?charset=utf8')
  117. con_engine_hlfx_pool = conn_engine_hlfx_pool.connect()
  118. # stocks = xtdata.get_stock_list_in_sector('沪深A股')
  119. stocks = pd.read_sql_query(
  120. text("select securities from %s" % 'stocks_list'), con=con_engine_hlfx_pool).iloc[-1, 0].split(",")
  121. con_engine_hlfx_pool.close()
  122. print(f'股票列表长度为{len(stocks)}')
  123. err_list, hlfx_pool, hlfx_pool_daily = mp.Manager().list(), mp.Manager().list(), mp.Manager().list() # 定义共享列表
  124. # 多进程执行tech_anal方法
  125. pool = mp.Pool(processes=num_cpus)
  126. # 保存AsyncResult对象的列表
  127. async_results = []
  128. for stock in stocks:
  129. async_result = pool.apply_async(tech_anal, args=(stock, fre, hlfx_pool, hlfx_pool_daily, err_list,),
  130. error_callback=err_call_back)
  131. async_results.append(async_result)
  132. pool.close()
  133. pool.join()
  134. # 统计返回为 None 的结果数量
  135. none_count = 0
  136. for i, result_async in enumerate(async_results):
  137. result = result_async.get() # 获取任务的结果
  138. # print(f"The result of task {i} is: {result}")
  139. if result is None:
  140. none_count += 1
  141. print(
  142. f"共计算{none_count}/{i+1},\n当日信号:{len(hlfx_pool_daily)},\n持续检测为:{len(hlfx_pool)}, \n错误列表:{err_list}")
  143. '''
  144. # 将list转换为字符串
  145. results_list = ','.join(set(hlfx_pool))
  146. results_list_daily = ','.join(set(hlfx_pool_daily))
  147. # 建立数据库连接
  148. db_pool = pymysql.connect(host='localhost',
  149. user='root',
  150. port=3307,
  151. password='r6kEwqWU9!v3',
  152. database='hlfx_pool')
  153. db_pool2 = pymysql.connect(host='localhost',
  154. user='root',
  155. port=3308,
  156. password='r6kEwqWU9!v3',
  157. database='hlfx_pool')
  158. # 将list插入数据库
  159. cursor = db_pool.cursor()
  160. cursor2 = db_pool2.cursor()
  161. sql = "INSERT INTO %s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  162. sql2 = "INSERT INTO daily_%s (date,value) VALUES('%s','%s')" % (fre, dt.now().strftime('%Y-%m-%d %H:%M:%S'),
  163. results_list_daily)
  164. try:
  165. cursor.execute(sql)
  166. cursor.execute(sql2)
  167. cursor2.execute(sql)
  168. cursor2.execute(sql2)
  169. db_pool.commit()
  170. db_pool2.commit()
  171. except Exception as e:
  172. print(f'1d存入有问题', e)
  173. # db_pool.rollback()
  174. finally:
  175. cursor.close()
  176. db_pool.close()
  177. cursor2.close()
  178. db_pool2.close()
  179. '''
  180. # 记录结束时间
  181. end_time = dt.now()
  182. print(f"运行时间:{end_time - start_time}")
  183. if __name__ == '__main__':
  184. logger = mp.log_to_stderr()
  185. # logger.setLevel(logging.DEBUG)
  186. freeze_support()
  187. # 创建一个0-23的列表,用于设置cpu亲和度
  188. cpu_list = list(range(24))
  189. pus = psutil.Process()
  190. pus.cpu_affinity(cpu_list)
  191. ind()