Настройка пула TCP-соединений с правильной настройкой повторного подключения для соединений, повторно подключающихся к тPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Настройка пула TCP-соединений с правильной настройкой повторного подключения для соединений, повторно подключающихся к т

Сообщение Anonymous »

Поэтому я пытаюсь настроить пул соединений, чтобы можно было протестировать свое приложение с гораздо большим количеством запросов, не нарушая файловый дескриптор и другие ограничения на моем компьютере (M1 Macbook air).
Теперь пул соединений, который я хотел реализовать, закрывает соединение при возврате обратно в пул после использования для запроса, поскольку эта реализация пула соединений используется тестовым клиентом, который взаимодействует с сервером, а сервер по умолчанию закрывает соединение на нем. заканчиваться после каждого запроса, как и должно быть. Ну, это спорно, но на моем сервере приложений не нужно держать соединение открытым дольше, чем нужно. Поэтому мы также закрываем соединение при его возврате, что должно освободить дескриптор файла, а затем мы создаем новое соединение и привязываем его к тому же исходному порту, что и предыдущее.
Вот код моего текущего класса пула соединений, о комментариях я расскажу позже.

Код: Выделить всё

import socket
from queue import Queue, Empty
import threading
import time
import struct
class ConnectionPool:
def __init__(self, ip, port, pool_size=10,source_port_start=30000,source_port_end=40000):
self.ip = ip
self.port = port
self.pool_size = pool_size
self.source_port_start = source_port_start
self.next_source_port = source_port_start
self.source_port_end = source_port_end
self.pool = Queue(maxsize=pool_size)
self.lock = threading.Lock()
self._create_pool()

def _create_pool(self):
for _ in range(self.pool_size):
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Attempt to bind to a unique source port within range
with self.lock:
for port in range(self.next_source_port, self.source_port_end):
try:
conn.bind(('', port))
# print(f"Bound to source port: {port}")
self.next_source_port = port + 1
break
except OSError as e:
print(f"Port {port} is in use, trying next port.")
continue
else:
print("No available ports in range.")
conn.close()
return None  # No available port to bind

try:
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# conn.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 10)
# Set TCP keep-alive parameters
# This might require platform-specific configuration
# Setting the keep-alive intervals

# Keep idle time before sending the first keepalive probe (in seconds)

# Interval between keepalive probes (in seconds)
# conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)

# Maximum number of keepalive probes before declaring the connection dead
# conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 10)
conn.connect((self.ip, self.port))
self.pool.put((conn, port))
except socket.error as e:
print(f"Connection failed: {e}")
conn.close()

def get_connection(self):
try:
return self.pool.get_nowait()
except Empty:
return None

def return_connection(self, conn_tuple):
# Close the socket forcefully
conn_tuple[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
conn_tuple[0].close()
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# conn.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 10) check if source port
conn.bind(('', conn_tuple[1]))
conn.connect((self.ip, self.port))
# if not self.pool.full():
#     self.pool.put((conn,conn_tuple[1]))
self.pool.put((conn,conn_tuple[1]))

def close_all(self):
while not self.pool.empty():
conn_tuple = self.pool.get_nowait()
conn_tuple[0].close()
как вы можете видеть, метод обратного соединения закрывает соединение, снова открывает соединение и привязывает его к тому же исходному порту, что и раньше.
Проблема в том, что , когда я использую эту реализацию, она работает до тех пор, пока не будет выполнено 2000 запросов одновременно, но когда я увеличиваю количество запросов до 4000 и выше, в какой-то момент она зависает, и мне приходится выйти с сигналом sig int.
Я более склонен полагать, что это связано с тем, как мы закрываем и снова открываем соединение, возможно, нужна какая-то дополнительная настройка сокетов, о которой я здесь не знаю.
Каждый раз Я выхожу с помощью sig int, журнал ошибок показывает, что что-то не так с вызовом conn.recv, и я не думаю, что проблема в моей тестовой логике, я думаю, что что-то не так с самим соединением, которое, очевидно, было установлено либо при создании соединения, либо при создании соединения. метод обратного соединения.
Теперь о том, что я пробовал до сих пор, сначала

Код: Выделить всё

conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
без этой опции он даже не смог обработать до 200 одновременных запросов, которые, я думаю, можно назвать соединением, хотя для него было вызвано закрытие, это все равно может занять пару дополнительных циклы процессора, чтобы он действительно закрылся из-за состояния TIME_WAIT, чтобы предотвратить доставку задержанных пакетов в новое соединение, и если в это время откроется другой, эта опция позволит нам привязаться к порту, который все еще находится в состоянии TIME_WAIT.
Как видите, была также предпринята попытка сделать соединения долгоживущими с помощью Keep Alive и зондов, но это оказалось неудачей, поскольку тесты начали тормозить гораздо раньше. Я не знаю, как это объяснить, но я думаю, это к лучшему, поскольку мы хотим закрыть соединение сразу после использования и повторно подключиться к новому соединению, а сделать эти соединения долгоживущими противоречило бы тому, что мы делаем. пытаясь сделать , я могу ошибаться.
И самым важным изменением, которое сделало возможным первый тест одновременных запросов в 2000 году, было использование

Код: Выделить всё

conn_tuple[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
этот параметр перед фактическим вызовом close.
Это заставляет сокет немедленно закрыться, отбрасывая все неотправленные данные.
Без этого тесты чаще зависали на полпути и даже не могли правильно протестировать до 100 одновременных запросов с размером пула 10.
после этих изменений тест смог успешно выполниться для 2000 одновременных запросов с размером пула 10.
Также пробовал

Код: Выделить всё

 if conn_tuple[0].fileno() != -1:
try:
# Try setting SO_LINGER to close the socket gracefully
conn_tuple[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('hh', 1, 0))
except OSError as e:
print(f"Warning: SO_LINGER failed with error {e}")
Здесь я пытался проверить, были ли какие-либо ошибки при установке этой опции, и иногда возникали сообщения в журнале: «Предупреждение: ошибка SO_LINGER с ошибкой, неверный аргумент», не удалось не могу понять, что это было.
Но это не работает для 4000 и выше, так как нам исправить и улучшить его, чтобы он мог использовать пул для теоретического разговора даже до уровня миллион.

Подробнее здесь: https://stackoverflow.com/questions/791 ... ctions-rec
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»