Проблемы со сбором всех тиковых данных с помощью Concurrent.futures в PythonPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Проблемы со сбором всех тиковых данных с помощью Concurrent.futures в Python

Сообщение Anonymous »

Я разрабатываю программу для вставки данных на уровне тиков для более чем 300 акций, полученных в формате словаря веб-потока от API брокера. У меня успешно работает версия программы без concurrent.futures, но она пропускает некоторые тики из-за высокой скорости поступления данных.
В новой версии, где я включил concurrent.futures , я столкнулся с проблемой, когда различные тики пропускаются и не фиксируются в моей базе данных MySQL. Тики поступают с очень высокой скоростью, и размер очереди быстро заполняется, что приводит к пропущенным тактам.
Вот мой подход к решению этой проблемы с помощью многопроцессорной обработки с использованием библиотеки concurrent.futures. Я новичок в многопроцессорной обработке данных и стремлюсь захватывать как минимум 1 тик в секунду на акцию для более чем 500 акций.
Текущие проблемы:
Размер очереди заполняется быстро, и тики пропускаются.
Пропускается как минимум 5-10 тиков за 10 секунд для каждой акции, что делает данные неполными для приличных аналитических графиков.
Ожидание:
Захват (в таблице MySQL) не менее 1 тика в секунду на акцию для более чем 500 акций.
Полная программа:

Код: Выделить всё

from kiteconnect import KiteTicker, KiteConnect
import datetime
import sys
import pandas as pd
import os
from os import cpu_count
import mysql.connector
import time as tm
from concurrent.futures import ProcessPoolExecutor, as_completed
import queue
import threading

# Change working directory
cwd = os.chdir(r"D:\Trading\Algo Trading\ZERODHA ALGO\account_auth")

# Generate trading session
access_token = open("access_token.txt", 'r').read()
key_secret = open("api_key.txt", 'r').read().split()
kite = KiteConnect(api_key=key_secret[0])
kite.set_access_token(access_token)

# Read MySQL password
mysql_password = open("mysql_pwd.txt", 'r').read()

# MySQL database credentials
db_config = {
'host': 'localhost',
'user': 'root',
'password': mysql_password,
'database': 'ZerodhaAlgoTest'
}

def create_tables(tokens):
db = mysql.connector.connect(**db_config)
c = db.cursor(buffered=True)
for token in tokens:
c.execute(f"""
CREATE TABLE IF NOT EXISTS TOKEN{token} (
ts DATETIME PRIMARY KEY,
price DECIMAL(20, 5),
volume BIGINT,
total_buy_quantity BIGINT DEFAULT 0,
total_sell_quantity BIGINT DEFAULT 0,
volume_consecutive BIGINT DEFAULT 0
)
""")
columns = {
'total_buy_quantity': 'BIGINT DEFAULT 0',
'total_sell_quantity': 'BIGINT DEFAULT 0',
'volume_consecutive': 'BIGINT DEFAULT 0'
}
for col_name, col_type in columns.items():
c.execute(f"""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'TOKEN{token}' AND column_name = '{col_name}'
""")
if not c.fetchone():
c.execute(f"ALTER TABLE TOKEN{token} ADD COLUMN {col_name} {col_type}")

db.commit()
db.close()

def tokenLookup(instrument_df, symbol_list):
token_list = []
for symbol in symbol_list:
try:
token_list.append(int(instrument_df[instrument_df.tradingsymbol == symbol].instrument_token.values[0]))
except:
print(f'Ticker - {symbol} not found in the instrument list of NSE')
return token_list

def symbolLookup(instrument_df, token_list):
symbol_list = []
for token in token_list:
try:
symbol_list.append(instrument_df[instrument_df.instrument_token == token].tradingsymbol.values[0])
except:
print(f'Token - {token} not found in the instrument list of NSE')
return symbol_list

# Load the instrument dataframe globally
instrument_df = pd.read_excel(r"D:\Trading\Algo Trading\ZERODHA ALGO\account_auth\Total_instrument_df.xlsx")
instrument_df = instrument_df[instrument_df['exchange'].isin(['NSE', 'NFO'])]

tickersDf = pd.read_excel(r'D:\Trading\Algo Trading\ZERODHA ALGO\ticks_instruments_list.xlsx')
print('tickers read from file...')
tickers = tickersDf['SYMBOL'].tolist()

def insert_ticks(tick):
db = mysql.connector.connect(**db_config)
c = db.cursor()
if 'exchange_timestamp' not in tick:
print(f"Skipping tick with missing 'exchange_timestamp': {tick['instrument_token']}")
return

tok = "TOKEN"  + str(tick['instrument_token'])
symbol = symbolLookup(instrument_df, [tick['instrument_token']])[0]
vals = [
tick['exchange_timestamp'],
tick['last_price'],
tick['last_traded_quantity'],
tick.get('total_buy_quantity', 0),
tick.get('total_sell_quantity', 0),
tick.get('volume_traded', 0),
symbol
]
token_query = f"""
INSERT INTO {tok} (ts, price, volume, total_buy_quantity, total_sell_quantity, volume_consecutive)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
price = VALUES(price),
volume = VALUES(volume),
total_buy_quantity = VALUES(total_buy_quantity),
total_sell_quantity = VALUES(total_sell_quantity),
volume_consecutive = VALUES(volume_consecutive)
"""
try:
c.execute(token_query, vals[:-1])
print(f'Tick inserted into - {symbol}')
except Exception as e:
print(f'Tick data error for - {tok}: {e}')
db.commit()
db.close()

def is_time_between(start_hour, start_minute, end_hour, end_minute):
now = datetime.datetime.now()
start_time = now.replace(hour=start_hour, minute=start_minute, second=0, microsecond=0)
end_time = now.replace(hour=end_hour, minute=end_minute, second=0, microsecond=0)
return start_time 

Подробнее здесь: [url]https://stackoverflow.com/questions/78743090/trouble-capturing-all-tick-data-with-concurrent-futures-in-python[/url]
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как распараллелить доступ к общему массиву в Python с помощью concurrent.futures?
    Anonymous » » в форуме Python
    0 Ответы
    41 Просмотры
    Последнее сообщение Anonymous
  • Отмена всех задач в случае сбоя с помощью concurrent.futures в Python
    Anonymous » » в форуме Python
    0 Ответы
    13 Просмотры
    Последнее сообщение Anonymous
  • Отмена всех задач в случае сбоя с помощью concurrent.futures в Python
    Anonymous » » в форуме Python
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Отмена всех задач в случае сбоя с помощью concurrent.futures в Python
    Anonymous » » в форуме Python
    0 Ответы
    28 Просмотры
    Последнее сообщение Anonymous
  • Как использовать concurrent.futures в кадре данных Pandas с функцией Apply?
    Anonymous » » в форуме Python
    0 Ответы
    24 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»