Anonymous
Как я отправлял прямые сообщения в Instagram с помощью Python. Я знаю, что Instragram использует протокол mqtt, я пробов
Сообщение
Anonymous » 14 дек 2024, 06:49
импортировать asyncio
импортировать веб-сокеты
импортировать json
импортировать время
импортировать paho.mqtt.client как mqtt
импортировать структуру
класс InstagramWebSocket:
def
init (self, mqtt_sid, client_id, cookie):
self.mqtt_sid = mqtt_sid
self.client_id = client_id
self.cookies = cookie
self.ws = None
self.cursor = None
Код: Выделить всё
async def connect(self):
mqtt_headers = {
"Host": "edge-chat.instagram.com",
"Connection": "Upgrade",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"Upgrade": "websocket",
"Origin": "https://www.instagram.com",
"Sec-WebSocket-Version": "13",
"sec-websocket-key":"key",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
}
uri = f"wss://edge-chat.instagram.com/chat?sid=&cid="
try:
self.ws = await websockets.connect(uri, additional_headers=mqtt_headers)
await self.on_open()
except Exception as e:
print(f"Connection error: {e}")
async def on_open(self):
# Connect MQTT
connect_payload = {
"cmd": "connect",
"protocolId": "MQIsdp",
"clientId": "mqttwsclient",
"protocolVersion": 3,
"clean": True,
"keepalive": 10,
"username": json.dumps({
"u": "userid",
"s": "",
"cp": 3,
"ecp": 10,
"chat_on": True,
"fg": False,
"d": self.client_id,
"ct": "cookie_auth",
"mqtt_sid": "",
"aid": 936619743392459,
"st": [],
"pm": [],
"dc": "",
"no_auto_fg": True,
"gas": None,
"pack": [],
"php_override": "",
"p": None,
"a": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
"aids": None
})
}
await self.ws.send(mqtt.generate(connect_payload))
# Send app settings
app_settings_payload = {
"cmd": "publish",
"messageId": 1,
"qos": 1,
"topic": "/ls_app_settings",
"payload": json.dumps({
"ls_fdid": "",
"ls_sv": ""
})
}
await self.ws.send(mqtt.generate(app_settings_payload))
cursor = None # Define your initial cursor value
ws = None
async def on_message(ws, data):
print(f"on msg from socket: {data}")
# Convert data to hex for comparison
data_hex = data.hex() if isinstance(data, bytes) else data
if data_hex == "42020001":
# Subscribe to /ls_resp
subscribe_packet = {
"cmd": "subscribe",
"qos": 1,
"subscriptions": [
{
"topic": "/ls_resp",
"qos": 0
}
],
"messageId": 3
}
ws.send(json.dumps(subscribe_packet))
# Publish request to /ls_req
publish_packet = {
"cmd": "publish",
"messageId": 5,
"qos": 1,
"dup": False,
"retain": False,
"topic": "/ls_req",
"payload": json.dumps({
"app_id": "936619743392459",
"payload": json.dumps({
"database": 1,
"epoch_id": int(time.time() * 1000)
Подробнее здесь: [url]https://stackoverflow.com/questions/79280031/how-i-sent-direct-messages-instagram-with-python-i-know-instragram-uses-mqtt-pro[/url]
1734148186
Anonymous
импортировать asyncio импортировать веб-сокеты импортировать json импортировать время импортировать paho.mqtt.client как mqtt импортировать структуру класс InstagramWebSocket: def [b]init[/b](self, mqtt_sid, client_id, cookie): self.mqtt_sid = mqtt_sid self.client_id = client_id self.cookies = cookie self.ws = None self.cursor = None [code]async def connect(self): mqtt_headers = { "Host": "edge-chat.instagram.com", "Connection": "Upgrade", "Pragma": "no-cache", "Cache-Control": "no-cache", "Upgrade": "websocket", "Origin": "https://www.instagram.com", "Sec-WebSocket-Version": "13", "sec-websocket-key":"key", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "en", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", } uri = f"wss://edge-chat.instagram.com/chat?sid=&cid=" try: self.ws = await websockets.connect(uri, additional_headers=mqtt_headers) await self.on_open() except Exception as e: print(f"Connection error: {e}") async def on_open(self): # Connect MQTT connect_payload = { "cmd": "connect", "protocolId": "MQIsdp", "clientId": "mqttwsclient", "protocolVersion": 3, "clean": True, "keepalive": 10, "username": json.dumps({ "u": "userid", "s": "", "cp": 3, "ecp": 10, "chat_on": True, "fg": False, "d": self.client_id, "ct": "cookie_auth", "mqtt_sid": "", "aid": 936619743392459, "st": [], "pm": [], "dc": "", "no_auto_fg": True, "gas": None, "pack": [], "php_override": "", "p": None, "a": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36", "aids": None }) } await self.ws.send(mqtt.generate(connect_payload)) # Send app settings app_settings_payload = { "cmd": "publish", "messageId": 1, "qos": 1, "topic": "/ls_app_settings", "payload": json.dumps({ "ls_fdid": "", "ls_sv": "" }) } await self.ws.send(mqtt.generate(app_settings_payload)) cursor = None # Define your initial cursor value ws = None async def on_message(ws, data): print(f"on msg from socket: {data}") # Convert data to hex for comparison data_hex = data.hex() if isinstance(data, bytes) else data if data_hex == "42020001": # Subscribe to /ls_resp subscribe_packet = { "cmd": "subscribe", "qos": 1, "subscriptions": [ { "topic": "/ls_resp", "qos": 0 } ], "messageId": 3 } ws.send(json.dumps(subscribe_packet)) # Publish request to /ls_req publish_packet = { "cmd": "publish", "messageId": 5, "qos": 1, "dup": False, "retain": False, "topic": "/ls_req", "payload": json.dumps({ "app_id": "936619743392459", "payload": json.dumps({ "database": 1, "epoch_id": int(time.time() * 1000) Подробнее здесь: [url]https://stackoverflow.com/questions/79280031/how-i-sent-direct-messages-instagram-with-python-i-know-instragram-uses-mqtt-pro[/url]