Я разрабатываю программу для вставки данных на уровне тиков для более чем 300 акций, полученных в формате словаря веб-потока от API брокера. У меня успешно работает версия программы без concurrent.futures, но она пропускает некоторые тики из-за высокой скорости поступления данных.
В новой версии, где я включил concurrent.futures , я столкнулся с проблемой, когда различные тики пропускаются и не фиксируются в моей базе данных MySQL. Тики поступают с очень высокой скоростью, и размер очереди быстро заполняется, что приводит к пропущенным тактам.
Вот мой подход к решению этой проблемы с помощью многопроцессорной обработки с использованием библиотеки concurrent.futures. Я новичок в многопроцессорной обработке данных и стремлюсь захватывать как минимум 1 тик в секунду на акцию для более чем 500 акций.
Текущие проблемы:
Размер очереди заполняется быстро, и тики пропускаются.
Пропускается как минимум 5-10 тиков за 10 секунд для каждой акции, что делает данные неполными для приличных аналитических графиков.
Ожидание:
Захват (в таблице MySQL) не менее 1 тика в секунду на акцию для более чем 500 акций.
Полная программа:
Я разрабатываю программу для вставки данных на уровне тиков для более чем 300 акций, полученных в формате словаря веб-потока от API брокера. У меня успешно работает версия программы без concurrent.futures, но она пропускает некоторые тики из-за высокой скорости поступления данных. В новой версии, где я включил concurrent.futures , я столкнулся с проблемой, когда различные тики пропускаются и не фиксируются в моей базе данных MySQL. Тики поступают с очень высокой скоростью, и размер очереди быстро заполняется, что приводит к пропущенным тактам. Вот мой подход к решению этой проблемы с помощью многопроцессорной обработки с использованием библиотеки concurrent.futures. Я новичок в многопроцессорной обработке данных и стремлюсь захватывать как минимум 1 тик в секунду на акцию для более чем 500 акций. Текущие проблемы: Размер очереди заполняется быстро, и тики пропускаются. Пропускается как минимум 5-10 тиков за 10 секунд для каждой акции, что делает данные неполными для приличных аналитических графиков. Ожидание: Захват (в таблице MySQL) не менее 1 тика в секунду на акцию для более чем 500 акций. Полная программа: [code]from kiteconnect import KiteTicker, KiteConnect import datetime import sys import pandas as pd import os from os import cpu_count import mysql.connector import time as tm from concurrent.futures import ProcessPoolExecutor, as_completed import queue import threading
# Change working directory cwd = os.chdir(r"D:\Trading\Algo Trading\ZERODHA ALGO\account_auth")
def create_tables(tokens): db = mysql.connector.connect(**db_config) c = db.cursor(buffered=True) for token in tokens: c.execute(f""" CREATE TABLE IF NOT EXISTS TOKEN{token} ( ts DATETIME PRIMARY KEY, price DECIMAL(20, 5), volume BIGINT, total_buy_quantity BIGINT DEFAULT 0, total_sell_quantity BIGINT DEFAULT 0, volume_consecutive BIGINT DEFAULT 0 ) """) columns = { 'total_buy_quantity': 'BIGINT DEFAULT 0', 'total_sell_quantity': 'BIGINT DEFAULT 0', 'volume_consecutive': 'BIGINT DEFAULT 0' } for col_name, col_type in columns.items(): c.execute(f""" SELECT column_name FROM information_schema.columns WHERE table_name = 'TOKEN{token}' AND column_name = '{col_name}' """) if not c.fetchone(): c.execute(f"ALTER TABLE TOKEN{token} ADD COLUMN {col_name} {col_type}")
db.commit() db.close()
def tokenLookup(instrument_df, symbol_list): token_list = [] for symbol in symbol_list: try: token_list.append(int(instrument_df[instrument_df.tradingsymbol == symbol].instrument_token.values[0])) except: print(f'Ticker - {symbol} not found in the instrument list of NSE') return token_list
def symbolLookup(instrument_df, token_list): symbol_list = [] for token in token_list: try: symbol_list.append(instrument_df[instrument_df.instrument_token == token].tradingsymbol.values[0]) except: print(f'Token - {token} not found in the instrument list of NSE') return symbol_list
def insert_ticks(tick): db = mysql.connector.connect(**db_config) c = db.cursor() if 'exchange_timestamp' not in tick: print(f"Skipping tick with missing 'exchange_timestamp': {tick['instrument_token']}") return
tok = "TOKEN" + str(tick['instrument_token']) symbol = symbolLookup(instrument_df, [tick['instrument_token']])[0] vals = [ tick['exchange_timestamp'], tick['last_price'], tick['last_traded_quantity'], tick.get('total_buy_quantity', 0), tick.get('total_sell_quantity', 0), tick.get('volume_traded', 0), symbol ] token_query = f""" INSERT INTO {tok} (ts, price, volume, total_buy_quantity, total_sell_quantity, volume_consecutive) VALUES (%s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE price = VALUES(price), volume = VALUES(volume), total_buy_quantity = VALUES(total_buy_quantity), total_sell_quantity = VALUES(total_sell_quantity), volume_consecutive = VALUES(volume_consecutive) """ try: c.execute(token_query, vals[:-1]) print(f'Tick inserted into - {symbol}') except Exception as e: print(f'Tick data error for - {tok}: {e}') db.commit() db.close()
У меня есть следующий фрагмент кода, иллюстрирующий мою проблему:
Каждый поток вычисляет локальные значения, а затем обновляет массив результатов. Предположим, что это обновлять (result += mask ) — очень медленная операция, как ее распараллелить,...
Я использую библиотеку Python concurrent.futures с ThreadPoolExecutor и ProcessPoolExecutor. Я хочу реализовать механизм отмены всех запущенных или невыполненных задач в случае сбоя какой-либо из задач. В частности, я хочу:
Я использую библиотеку Python concurrent.futures с ThreadPoolExecutor и ProcessPoolExecutor. Я хочу реализовать механизм отмены всех запущенных или невыполненных задач в случае сбоя какой-либо из задач. В частности, я хочу:
Я использую библиотеку Python concurrent.futures с ThreadPoolExecutor и ProcessPoolExecutor. Я хочу реализовать механизм отмены всех запущенных или невыполненных задач в случае сбоя какой-либо из задач. В частности, я хочу:
Я пытался распараллелить свой код и потратил некоторое время на изучение Dask, Pandarallel, но по той или иной причине они не сработали, поэтому сейчас я рассматриваю возможность реализации concurrent.futures.
Вот как выглядит моя формулировка...