IBApi / Трудности с запросом информации о дате публикации доходов через reqWshEventData.Python

Программы на Python
Ответить
Anonymous
 IBApi / Трудности с запросом информации о дате публикации доходов через reqWshEventData.

Сообщение Anonymous »

Цель моего скрипта Python — получить информацию о дате публикации доходов из TWS с помощью функции WSH reqWshEventData, предлагаемой библиотекой Python IBApi.
Сценарий работает без проблем ошибки до конца, но он не может получить данные о событии. Казалось бы, он может получать метаданные, но на следующем этапе, где он должен применить фильтр JSON и получить данные о событиях, ему это не удается.
Кто-нибудь видит, что с ним не так? ?
import json
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time

class IBApiApp(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.contract_id = None
self.metadata_received = False
self.event_data_received = False

def nextValidId(self, orderId: int):
"""Called when connection is established."""
print("Connected to IBKR. Requesting contract details...")
self.request_contract_details()

def error(self, reqId: int, errorCode: int, errorString: str, advancedOrderRejectJson=""):
print(f"Error. ReqId: {reqId}, Code: {errorCode}, Message: {errorString}")

def contractDetails(self, reqId, contractDetails):
"""Callback to receive contract details."""
self.contract_id = contractDetails.contract.conId
print(f"Retrieved contract ID (conId): {self.contract_id}")

def contractDetailsEnd(self, reqId):
"""Called when contract details request is completed."""
if self.contract_id:
print("Successfully retrieved contract ID. Requesting WSH Metadata...")
self.request_wsh_metadata()
else:
print("No contract ID retrieved. Check API settings and permissions.")
self.disconnect()

def wshMetaData(self, reqId: int, data: str):
"""Callback to receive WSH metadata."""
print(f"WSH Metadata received for request {reqId}: {data}")
self.metadata_received = True
# Proceed with event data request if metadata was received
# Uncomment one of the methods below to test each approach
# self.request_wsh_event_data_with_conid()
# self.request_wsh_event_data_with_filter()

def wshEventData(self, reqId: int, dataJson: str):
"""Callback to receive WSH event data."""
print(f"WSH Event Data received for request {reqId}: {dataJson}")
self.event_data_received = True

def request_contract_details(self):
contract = Contract()
contract.symbol = "TSLA" # Change as needed
contract.secType = "STK"
contract.exchange = "SMART"
contract.currency = "USD"

self.reqContractDetails(1, contract)

def request_wsh_metadata(self):
"""Request WSH metadata for the contract."""
if self.contract_id:
print("Requesting WSH Metadata...")
self.reqWshMetaData(1)
else:
print("Contract ID not set. Cannot request WSH metadata.")

def request_wsh_event_data_with_conid(self):
"""Request WSH event data using only the conId, without a JSON filter."""
print(f"Requesting WSH Event Data using only conId: {self.contract_id}")
try:
# Requesting with only the conId as per NB in documentation
self.reqWshEventData(2, self.contract_id, "")
except Exception as e:
print(f"Error in requesting WSH Event Data with conId only: {e}")

def request_wsh_event_data_with_filter(self):
"""Request WSH event data using a JSON filter, without conId."""
# JSON filter structure as specified in IBKR documentation
filter_json = json.dumps({
"wsh_ed": "true", # Request earnings data
"country": "ALL", # Specify all countries
"limit_region": "10",
"event_type": {"wshe_eps": "true"} # Earnings events
})

print("Requesting WSH Event Data with JSON filter only:", filter_json)
try:
# Requesting with JSON filter only as per NB in documentation
self.reqWshEventData(2, 0, filter_json)
except Exception as e:
print(f"Error in requesting WSH Event Data with JSON filter only: {e}")

# Function to run the app in a separate thread
def run_loop(app):
app.run()

# Initialize and start the app
app = IBApiApp()
app.connect("127.0.0.1", 4001, clientId=1)

# Run the IB API in a separate thread to avoid blocking
api_thread = threading.Thread(target=run_loop, args=(app,))
api_thread.start()

# Timeout management for metadata and event data retrieval
metadata_timeout = 30 # seconds
event_data_timeout = 30 # seconds
start_time = time.time()

# Wait for metadata to be received
while not app.metadata_received and time.time() - start_time < metadata_timeout:
time.sleep(1)

if app.metadata_received:
# Reset start time for event data retrieval
start_time = time.time()
# Wait for event data to be received
while not app.event_data_received and time.time() - start_time < event_data_timeout:
time.sleep(1)
else:
print("WSH Metadata not received within timeout period.")

# Check if event data was received or not
if app.event_data_received:
print("WSH Event Data successfully retrieved.")
else:
print("WSH Event Data not found or timed out.")

# Disconnect and clean up
app.disconnect()
api_thread.join()


Подробнее здесь: https://stackoverflow.com/questions/791 ... reqwsheven
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»