Код: Выделить всё
import threading
import socket
from strategies import detect_significant_changes
from websocket_manager import WebSocketManager
# Binance API credentials (replace with your own)
api_key = 'your_api_key'
api_secret = 'your_api_secret'
# Global Variables
watchlist = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "ADAUSDT", "XRPUSDT"] # Replace with your desired watchlist
# Initialize WebSocket Manager
ws_manager = WebSocketManager(api_key, api_secret)
# Command Control Flags
start_bot = threading.Event()
stop_bot = threading.Event()
# Socket server settings
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 65432
def handle_websocket_data(msg):
"""
Process the incoming WebSocket data and apply the strategy.
:param msg: WebSocket message data
"""
if not msg:
print("[ERROR] Received empty WebSocket message.")
return
print(f"[DEBUG] Raw WebSocket message: {msg}") # Print the entire message structure
# Ensure the message has the required fields
if isinstance(msg, dict) and 's' in msg and 'k' in msg: # Check if expected fields are present
symbol = msg['s'] # Get the symbol
if 'c' in msg['k']:
close_price = float(msg['k']['c']) # Extract closing price from kline data
print(f"[DEBUG] Symbol: {symbol}, Close Price: {close_price}") # Print to confirm values
detect_significant_changes(symbol, close_price) # Execute strategy
else:
print(f"[ERROR] 'c' (closing price) not found in the kline data for symbol {symbol}. Full message: {msg}")
else:
print(f"[ERROR] Unexpected message format received: {msg}")
def handle_client_connection(client_socket):
"""
Handle incoming commands from the control script.
:param client_socket: Socket connection from control script
"""
command = client_socket.recv(1024).decode()
print(f"Received command: {command}")
if command == "start":
if start_bot.is_set():
# If the bot is already running, treat it as a restart
print("Bot is already running. Restarting...")
ws_manager.disconnect_all_streams()
print("Start command received. Starting the bot...")
start_bot.set()
stop_bot.clear()
print(f"[DEBUG] Setting up callback for WebSocketManager.")
ws_manager.set_callback(handle_websocket_data) # Set callback for WebSocketManager
print(f"[DEBUG] Callback set successfully.")
ws_manager.reconnect_streams(watchlist)
elif command == "pause":
print("Pause command received. Pausing the bot...")
ws_manager.disconnect_all_streams()
stop_bot.set()
start_bot.clear()
elif command == "break":
print("Break command received. Stopping the bot...")
ws_manager.disconnect_all_streams()
start_bot.clear()
print("Exiting...")
client_socket.close()
exit(0)
else:
print("Unknown command. Please enter 'start', 'pause', or 'break'.")
client_socket.close()
def start_server():
"""
Start the socket server to listen for commands.
"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((SERVER_HOST, SERVER_PORT))
server_socket.listen(5)
print(f"Server started, waiting for commands at {SERVER_HOST}:{SERVER_PORT}...")
while True:
client_socket, addr = server_socket.accept()
print(f"Connection established with {addr}")
handle_client_connection(client_socket)
def main():
print("Initializing bot...")
print("Bot initialized and waiting for commands (start, pause, break).")
# Start the server in a separate thread
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()
# Automatically start the bot on the first run
print(f"[DEBUG] Setting up callback for WebSocketManager.")
ws_manager.set_callback(handle_websocket_data)
print(f"[DEBUG] Callback function set: {ws_manager.callback}")
first_run = True
if first_run:
print("[AUTO-START] First run detected. Automatically starting the bot...")
start_bot.set()
stop_bot.clear()
ws_manager.reconnect_streams(watchlist) # Automatically connect to the streams
first_run = False
# Keep the bot running to handle commands in the background
server_thread.join()
if __name__ == "__main__":
main()
Код: Выделить всё
from binance.websocket.um_futures.websocket_client import UMFuturesWebsocketClient
import threading
class WebSocketManager:
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
self.ws_client = None # Initialize as None to avoid attribute errors
self.active_streams = {} # Dictionary to store active streams
self.callback = None # Strategy callback function to handle data
self.stream_lock = threading.Lock()
def initialize_client(self):
"""
Ensure that the WebSocket client is initialized.
"""
if not self.ws_client:
self.ws_client = UMFuturesWebsocketClient()
print("[DEBUG] UMFuturesWebsocketClient initialized.")
def set_callback(self, callback_function):
"""
Set the callback function to be used for handling WebSocket data.
:param callback_function: Function to process the incoming data
"""
self.callback = callback_function
print(f"[DEBUG] Callback function set: {self.callback}")
def connect_stream(self, symbol):
"""
Connect a WebSocket stream for a given symbol.
:param symbol: Symbol to connect (e.g., "BTCUSDT")
"""
self.initialize_client()
if self.callback is None:
raise ValueError("Callback function is not set. Use `set_callback()` before connecting streams.")
# Define the internal callback for handling data
def internal_callback(msg):
print(f"[DEBUG] Raw message received for {symbol}: {msg}") # Log every raw message received
if not msg:
print(f"[ERROR] No data received from WebSocket stream for {symbol}")
return
else:
print(f"[DEBUG] Valid message received for {symbol}")
if self.callback:
print(f"[DEBUG] Passing message for {symbol} to the main handler.")
self.callback(msg) # Pass the message to the strategy handler
else:
print(f"[ERROR] No callback defined to handle data for {symbol}.")
# Add stream for the given symbol
with self.stream_lock:
print(f"[DEBUG] Attempting to connect to WebSocket for {symbol}...")
try:
self.ws_client.kline(
symbol=symbol.lower(),
interval='1m',
callback=internal_callback
)
print(f"[DEBUG] WebSocket connection established for {symbol}.")
self.active_streams[symbol] = True
except Exception as e:
print(f"[ERROR] Failed to connect WebSocket for {symbol}: {e}")
def disconnect_stream(self, symbol):
"""
Disconnect a WebSocket stream for a given symbol.
:param symbol: Symbol to disconnect (e.g., "BTCUSDT")
"""
with self.stream_lock:
if symbol in self.active_streams:
print(f"[DEBUG] Disconnecting WebSocket for {symbol}...")
del self.active_streams[symbol]
self.ws_client.stop()
print(f"[DEBUG] WebSocket for {symbol} disconnected.")
def disconnect_all_streams(self):
"""
Disconnect all active WebSocket streams.
"""
with self.stream_lock:
if self.ws_client:
self.ws_client.stop() # Check if the client is initialized before stopping
print("[DEBUG] All WebSocket connections stopped.")
else:
print("[DEBUG] No active WebSocket client to disconnect.")
self.active_streams = {}
def reconnect_streams(self, symbols):
"""
Reconnect all WebSocket streams for a given list of symbols.
:param symbols: List of symbols to reconnect (e.g., ["BTCUSDT", "ETHUSDT"])
"""
print("[DEBUG] Reconnecting WebSocket streams for all symbols...")
self.disconnect_all_streams()
self.initialize_client() # Ensure the client is re-initialized if needed
for symbol in symbols:
self.connect_stream(symbol)
def close(self):
"""
Safely close the WebSocket client and disconnect all streams.
"""
print("[DEBUG] Closing WebSocketManager...")
self.disconnect_all_streams()
if self.ws_client:
self.ws_client.stop()
print("[DEBUG] All WebSocket connections closed.")
Код: Выделить всё
# strategies.py
# Strategy functions to handle trading logic
# Parameters for strategies (easy to configure)
lookback_period = 60 # Lookback period in seconds
increase_threshold = 0.01 # % increase to trigger a notification
decrease_threshold = -0.01 # % decrease to trigger a notification
tracked_prices = {} # Dictionary to store the price history for each coin
def detect_significant_changes(symbol, price):
print(f"[DEBUG] detect_significant_changes called for {symbol} at price {price}")
if symbol not in tracked_prices:
tracked_prices[symbol] = [price]
else:
tracked_prices[symbol].append(price)
if len(tracked_prices[symbol]) > lookback_period:
tracked_prices[symbol].pop(0)
initial_price = tracked_prices[symbol][0]
percent_change = ((price - initial_price) / initial_price) * 100
print(f"[DEBUG] {symbol}: Initial Price = {initial_price}, Current Price = {price}, Change = {percent_change:.2f}%")
if percent_change >= increase_threshold:
print(f"[INFO] {symbol} has increased by {percent_change:.2f}% over the lookback period.")
elif percent_change
Подробнее здесь: [url]https://stackoverflow.com/questions/79061774/binance-futures-websocket-doesnt-push-informations[/url]