download_data_whole.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from xtquant import xtdata
  2. from datetime import datetime as dt
  3. import pandas as pd
  4. import math
  5. from sqlalchemy import create_engine, text
  6. import multiprocessing as mp
  7. import os
  8. from apscheduler.schedulers.blocking import BlockingScheduler
  9. import traceback
  10. import psutil
  11. import pymysql
  12. pd.set_option('display.max_columns', None) # 设置显示最大行
  13. path = 'C:\\qmt\\userdata_mini'
  14. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  15. cpu_count = mp.cpu_count()
  16. def err_call_back(err):
  17. print(f'问题在这里~ error:{str(err)}')
  18. traceback.print_exc()
  19. def to_sql(stock_list):
  20. print(f'{dt.now()}开始循环入库! MyPid is {os.getpid()}')
  21. m = 0
  22. for stock in stock_list:
  23. eng_w = create_engine('mysql+pymysql://root:r6kEwqWU9!v3@localhost:3308/qmt_stocks_whole?charset=utf8',
  24. pool_recycle=3600, pool_pre_ping=True, pool_size=1)
  25. # 后复权数据
  26. data_back = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='back')
  27. df_back = pd.concat([data_back[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  28. 'amount']], axis=1)
  29. df_back.columns = ['time', 'open_back', 'high_back', 'low_back', 'close_back', 'volume_back', 'amount_back']
  30. df_back['time'] = df_back['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  31. df_back.reset_index(drop=True, inplace=True)
  32. # 前复权数据
  33. data_front = xtdata.get_market_data(field, [stock], '1d', end_time='', count=-1, dividend_type='front')
  34. df_front = pd.concat([data_front[i].loc[stock].T for i in ['time', 'open', 'high', 'low', 'close', 'volume',
  35. 'amount']], axis=1)
  36. df_front.columns = ['time', 'open_front', 'high_front', 'low_front', 'close_front', 'volume_front',
  37. 'amount_front']
  38. df_front['time'] = df_front['time'].apply(lambda x: dt.fromtimestamp(x / 1000.0))
  39. df = pd.merge_asof(df_back, df_front, 'time')
  40. # print(df)
  41. try:
  42. # eng_w.connect().execute(text("truncate table `%s_1d`" % stock))
  43. df.to_sql('%s_1d' % stock, con=eng_w, index=False, if_exists='replace', chunksize=20000)
  44. except BaseException as e:
  45. print(stock, e)
  46. pass
  47. else:
  48. m += 1
  49. eng_w.dispose()
  50. print(f'Pid:{os.getpid()}已经完工了.应入库{len(stock_list)},共入库{m}支个股')
  51. def download_data():
  52. stock_list = xtdata.get_stock_list_in_sector('沪深A股')
  53. stock_list.sort()
  54. results_list = ','.join(set(stock_list))
  55. print(type(results_list))
  56. db_pool = pymysql.connect(host='localhost',
  57. user='root',
  58. port=3308,
  59. password='r6kEwqWU9!v3',
  60. database='hlfx_pool')
  61. cursor_pool = db_pool.cursor()
  62. sql = "INSERT INTO %s (date,securities) VALUES('%s','%s')" % ('stocks_list', dt.now().strftime('%Y-%m-%d %H:%M:%S'), results_list)
  63. cursor_pool.execute(sql)
  64. db_pool.commit()
  65. print(dt.now(), '开始下载!')
  66. # xtdata.download_history_data2(stock_list=stock_list, period='1d', start_time='', end_time='')
  67. print(dt.now(), '下载完成,准备入库!')
  68. step = math.ceil(len(stock_list) / mp.cpu_count())
  69. pool = mp.Pool(processes=mp.cpu_count())
  70. # pool = mp.Pool(processes=8)
  71. # step = math.ceil(len(stock_list) / 8)
  72. for i in range(0, len(stock_list), step):
  73. pool.apply_async(func=to_sql, args=(stock_list[i:i+step],), error_callback=err_call_back)
  74. pool.close()
  75. pool.join()
  76. print(f'今日数据下载完毕 {dt.now()}')
  77. if __name__ == '__main__':
  78. field = ['time', 'open', 'close', 'high', 'low', 'volume', 'amount']
  79. cpu_count = mp.cpu_count()
  80. pus = psutil.Process()
  81. # pus.cpu_affinity([12, 13, 14, 15, 16, 17, 18, 19])
  82. download_data()
  83. # scheduler = BlockingScheduler()
  84. # scheduler.add_job(func=download_data, trigger='cron', day_of_week='0-4', hour='20', minute='05',
  85. # timezone="Asia/Shanghai", max_instances=10)
  86. # try:
  87. # scheduler.start()
  88. # except (KeyboardInterrupt, SystemExit):
  89. # pass