У меня проблемы с вставкой данных, которые я получаю из Telegram, используя Telethon и PostgreSQL. ⇐ Python
У меня проблемы с вставкой данных, которые я получаю из Telegram, используя Telethon и PostgreSQL.
Я написал небольшой класс для обработки сообщений и извлечения из него некоторого контента. Сейчас я пытаюсь отследить эти данные с помощью телемарафона и вставить их в свою базу данных postgresql. Я также пытался вставить их с помощью метода iter_messages, с которым у меня не возникло проблем, судя по журналам, которые я проверил, телемарафон работает нормально с живым извлечением. Но я не смог понять, что не так со вставками в базу данных.
client = TelegramClient('test', api_id, api_hash) # Создаем очередь сообщений message_queue = очередь.Очередь() # Получение сообщений и организация очереди @client.on(events.NewMessage(чаты=группы)) асинхронная защита handle_new_message (событие): сообщение = событие.сообщение \# Распечатать сообщение в том виде, в каком оно пришло print(f"Новое сообщение: {message.text}") \# Добавляем сообщение в очередь на обработку message_queue.put(сообщение) асинхронная защита print_asterisk_ periodically(): пока правда: Распечатать('*-*-\*') ожидайте asyncio.sleep(150)
# Рабочий обработки сообщений защита message_processing_worker(): пока правда: сообщение = message_queue.get() пытаться: Cdata.insert_replymentions(сообщение) # Проверяем, есть ли у сообщения контрактный адрес результат = Cdata.contract_message(сообщение) если результат не Нет: print("***** Я работаю *****") conn = psycopg2.connect(**db_params) # Здесь обрабатываем результат (сообщение и адрес контракта) сообщение, адрес_контракта = результат Cdata.insert_contract_address(contract_address,conn) Cdata.insert_telegram_messages(сообщение,contract_address,conn) конн.закрыть() кроме исключения как e: print(f"Ошибка обработки вставки базы данных: {str(e)}") окончательно: message_queue.task_done() число_работников = 2 рабочие = \[\] для \_ в диапазоне (num_workers): рабочий = threading.Thread(target=message_processing_worker) рабочий.start() рабочие.append(работник) асинхронное определение main(): ожидайте client.start() ожидайте asyncio.gather(client.run_until_disconnected(),print_asterisk_ periodically() ) asyncio.run(main()) \#Используемые функции @staticmethod защита Contract_message (сообщение): адрес_контракта = Нет cmes = Нет # Проверьте, имеет ли сообщение атрибуты «сообщение», «чат», «заголовок» и «id». if hasattr(message, 'message') и hasattr(message, 'chat') и hasattr(message.chat, 'title') и hasattr(message, 'id'): # Непосредственный поиск шаблона адреса контракта Ethereum match = re.search(r"0x[a-fA-F0-9]{40}", str(message.message)) если совпадение и https://www.pinksale.finance/launchpad/ отсутствует в str(message.message): адрес контракта = match.group(0) if ( token_address := Cdata.return_base_token(contract_address)) не имеет значения None: cmes=сообщение вернуть cmes, адрес_токена @staticmethod защита Insert_replymentions (сообщение): conn = psycopg2.connect(**db_params) курсор = conn.cursor() курсор.execute("ВЫБРАТЬ символ, имя ИЗ static_fundamental_data") монеты = курсор.fetchall() # Выходим из функции, если запрос монет возвращает None если монет нет: конн.закрыть() возвращаться # Отфильтровать значения None для символов и названий монет coin_symbols = [coin[0] для монеты в монетах, если coin[0] не равно None] coin_names = [coin[1] для монеты в монетах, если coin[1] не равно None] # Обработка случая, когда сообщение не имеет текста message_text = message.text, если hasattr(message, 'text') else '' если не текст_сообщения: конн.закрыть() возвращаться курсор.execute("ВЫБРАТЬ символ, имя ИЗ static_fundamental_data") монеты = курсор.fetchall() coin_symbols = [coin[0] для монеты в монетах] coin_names = [coin[1] для монеты в монетах] message_text = message.text, если hasattr(message, 'text') else '' # Проверьте, упоминается ли монета в тексте сообщения is_coin_упомянутый = любой(re.search(rf'\b{re.escape(symbol)}\b', message_text, re.I) для символа в coin_symbols) или любой(re.search(rf'\b{re.escape) (имя)}\b', message_text, re.I) для имени в coin_names) #Проверяем, является ли сообщение ответом на другое сообщение is_reply = bool(message.reply_to_msg_id) адрес_контракта = Нет если is_reply: Answer_message_id = message.reply_to_msg_id Answer_mes_id = f"{message.chat.title}:-:{replied_message_id}" # Проверяем, существует ли идентификатор ответного сообщения в базе данных курсор.execute("SELECT COUNT(*) FROM telegram_messages WHERE telethon_id = %s", (replied_mes_id,)) счетчик = курсор.fetchone()[0] is_reply_in_db = количество > 0 # Если ответное сообщение в базе данных если is_reply_in_db: курсор.execute("ВЫБРАТЬ telegram_messages.telethon_id ИЗ telegram_messages INNER JOIN static_fundamental_data ON telegram_messages.contract_address_address = static_fundamental_data.contract_address_address WHERE telegram_messages.telethon_id = %s", (replied_mes_id,)) db_message = курсор.fetchone() если db_message: cmes,contract_address = Cdata.contract_message(db_message) если is_coin_упоминается, а не Contract_address: # Найдите адрес контракта, соответствующий символу монеты или имени, указанному в сообщении. упомянутый_символ_или_имя = next((символ для символа в coin_symbols if re.search(rf'\b{re.escape(symbol)}\b', message_text, re.I)), None) или \ next((имя для имени в coin_names, если re.search(rf'\b{re.escape(name)}\b', message_text, re.I)), None) #Если название или символ контракта есть в тексте сообщения, получите адрес контракта если упомянутый_символ_или_имя: курсор.execute("ВЫБЕРИТЕ адрес контракта_адрес ИЗ статических_фундаментальных_данных ГДЕ символ = %s ИЛИ имя = %s", (упомянутый_символ_или_имя, упомянутый_символ_или_имя)) адрес контракта = курсор.fetchone()[0] # Если монета упоминается в таблице сообщений или это ответ на сообщение о продвижении монеты, которое находится в базе данных, вставьте их в базу данных со ссылкой на монету. если is_coin_упоминается или (is_reply и is_reply_in_db): Cdata.insert_telegram_messages(сообщение, адрес контракта,conn) Cdata.insert_first_view_data(сообщение,подключение) Cdata.insert_message_data(сообщение,подключение) конн.коммит() конн.закрыть() @staticmethod Защиту Insert_telegram_messages (сообщение, адрес_контракта, соединение): telethon_id = Cdata.message_to_id(сообщение)
курсор = conn.cursor() курсор.execute(""" ВСТАВИТЬ В telegram_messages ( Сообщения, телетон_ид, контракт_адрес_адрес ) ЗНАЧЕНИЯ (%s,%s,%s) НА КОНФЛИКТЕ (telethon_id) НИЧЕГО НЕ ДЕЛАТЬ """, (текст сообщения, телетон_ид, адрес_контракта )) конн.коммит() @staticmethod Защиту Insert_contract_address(contract_address,conn):
курсор = conn.cursor() курсор.execute(""" INSERT INTO контракт_адрес (адрес) ЗНАЧЕНИЯ (%s) О КОНФЛИКТЕ (адрес) НИЧЕГО НЕ ДЕЛАТЬ """, (адрес_контракта,)) конн.коммит() Я ожидал, что при вставке в базу данных не возникнет проблем, поскольку я заранее попробовал сделать это с помощью метода iter_messages. Лог-файлы мне тоже не помогли.
Я написал небольшой класс для обработки сообщений и извлечения из него некоторого контента. Сейчас я пытаюсь отследить эти данные с помощью телемарафона и вставить их в свою базу данных postgresql. Я также пытался вставить их с помощью метода iter_messages, с которым у меня не возникло проблем, судя по журналам, которые я проверил, телемарафон работает нормально с живым извлечением. Но я не смог понять, что не так со вставками в базу данных.
client = TelegramClient('test', api_id, api_hash) # Создаем очередь сообщений message_queue = очередь.Очередь() # Получение сообщений и организация очереди @client.on(events.NewMessage(чаты=группы)) асинхронная защита handle_new_message (событие): сообщение = событие.сообщение \# Распечатать сообщение в том виде, в каком оно пришло print(f"Новое сообщение: {message.text}") \# Добавляем сообщение в очередь на обработку message_queue.put(сообщение) асинхронная защита print_asterisk_ periodically(): пока правда: Распечатать('*-*-\*') ожидайте asyncio.sleep(150)
# Рабочий обработки сообщений защита message_processing_worker(): пока правда: сообщение = message_queue.get() пытаться: Cdata.insert_replymentions(сообщение) # Проверяем, есть ли у сообщения контрактный адрес результат = Cdata.contract_message(сообщение) если результат не Нет: print("***** Я работаю *****") conn = psycopg2.connect(**db_params) # Здесь обрабатываем результат (сообщение и адрес контракта) сообщение, адрес_контракта = результат Cdata.insert_contract_address(contract_address,conn) Cdata.insert_telegram_messages(сообщение,contract_address,conn) конн.закрыть() кроме исключения как e: print(f"Ошибка обработки вставки базы данных: {str(e)}") окончательно: message_queue.task_done() число_работников = 2 рабочие = \[\] для \_ в диапазоне (num_workers): рабочий = threading.Thread(target=message_processing_worker) рабочий.start() рабочие.append(работник) асинхронное определение main(): ожидайте client.start() ожидайте asyncio.gather(client.run_until_disconnected(),print_asterisk_ periodically() ) asyncio.run(main()) \#Используемые функции @staticmethod защита Contract_message (сообщение): адрес_контракта = Нет cmes = Нет # Проверьте, имеет ли сообщение атрибуты «сообщение», «чат», «заголовок» и «id». if hasattr(message, 'message') и hasattr(message, 'chat') и hasattr(message.chat, 'title') и hasattr(message, 'id'): # Непосредственный поиск шаблона адреса контракта Ethereum match = re.search(r"0x[a-fA-F0-9]{40}", str(message.message)) если совпадение и https://www.pinksale.finance/launchpad/ отсутствует в str(message.message): адрес контракта = match.group(0) if ( token_address := Cdata.return_base_token(contract_address)) не имеет значения None: cmes=сообщение вернуть cmes, адрес_токена @staticmethod защита Insert_replymentions (сообщение): conn = psycopg2.connect(**db_params) курсор = conn.cursor() курсор.execute("ВЫБРАТЬ символ, имя ИЗ static_fundamental_data") монеты = курсор.fetchall() # Выходим из функции, если запрос монет возвращает None если монет нет: конн.закрыть() возвращаться # Отфильтровать значения None для символов и названий монет coin_symbols = [coin[0] для монеты в монетах, если coin[0] не равно None] coin_names = [coin[1] для монеты в монетах, если coin[1] не равно None] # Обработка случая, когда сообщение не имеет текста message_text = message.text, если hasattr(message, 'text') else '' если не текст_сообщения: конн.закрыть() возвращаться курсор.execute("ВЫБРАТЬ символ, имя ИЗ static_fundamental_data") монеты = курсор.fetchall() coin_symbols = [coin[0] для монеты в монетах] coin_names = [coin[1] для монеты в монетах] message_text = message.text, если hasattr(message, 'text') else '' # Проверьте, упоминается ли монета в тексте сообщения is_coin_упомянутый = любой(re.search(rf'\b{re.escape(symbol)}\b', message_text, re.I) для символа в coin_symbols) или любой(re.search(rf'\b{re.escape) (имя)}\b', message_text, re.I) для имени в coin_names) #Проверяем, является ли сообщение ответом на другое сообщение is_reply = bool(message.reply_to_msg_id) адрес_контракта = Нет если is_reply: Answer_message_id = message.reply_to_msg_id Answer_mes_id = f"{message.chat.title}:-:{replied_message_id}" # Проверяем, существует ли идентификатор ответного сообщения в базе данных курсор.execute("SELECT COUNT(*) FROM telegram_messages WHERE telethon_id = %s", (replied_mes_id,)) счетчик = курсор.fetchone()[0] is_reply_in_db = количество > 0 # Если ответное сообщение в базе данных если is_reply_in_db: курсор.execute("ВЫБРАТЬ telegram_messages.telethon_id ИЗ telegram_messages INNER JOIN static_fundamental_data ON telegram_messages.contract_address_address = static_fundamental_data.contract_address_address WHERE telegram_messages.telethon_id = %s", (replied_mes_id,)) db_message = курсор.fetchone() если db_message: cmes,contract_address = Cdata.contract_message(db_message) если is_coin_упоминается, а не Contract_address: # Найдите адрес контракта, соответствующий символу монеты или имени, указанному в сообщении. упомянутый_символ_или_имя = next((символ для символа в coin_symbols if re.search(rf'\b{re.escape(symbol)}\b', message_text, re.I)), None) или \ next((имя для имени в coin_names, если re.search(rf'\b{re.escape(name)}\b', message_text, re.I)), None) #Если название или символ контракта есть в тексте сообщения, получите адрес контракта если упомянутый_символ_или_имя: курсор.execute("ВЫБЕРИТЕ адрес контракта_адрес ИЗ статических_фундаментальных_данных ГДЕ символ = %s ИЛИ имя = %s", (упомянутый_символ_или_имя, упомянутый_символ_или_имя)) адрес контракта = курсор.fetchone()[0] # Если монета упоминается в таблице сообщений или это ответ на сообщение о продвижении монеты, которое находится в базе данных, вставьте их в базу данных со ссылкой на монету. если is_coin_упоминается или (is_reply и is_reply_in_db): Cdata.insert_telegram_messages(сообщение, адрес контракта,conn) Cdata.insert_first_view_data(сообщение,подключение) Cdata.insert_message_data(сообщение,подключение) конн.коммит() конн.закрыть() @staticmethod Защиту Insert_telegram_messages (сообщение, адрес_контракта, соединение): telethon_id = Cdata.message_to_id(сообщение)
курсор = conn.cursor() курсор.execute(""" ВСТАВИТЬ В telegram_messages ( Сообщения, телетон_ид, контракт_адрес_адрес ) ЗНАЧЕНИЯ (%s,%s,%s) НА КОНФЛИКТЕ (telethon_id) НИЧЕГО НЕ ДЕЛАТЬ """, (текст сообщения, телетон_ид, адрес_контракта )) конн.коммит() @staticmethod Защиту Insert_contract_address(contract_address,conn):
курсор = conn.cursor() курсор.execute(""" INSERT INTO контракт_адрес (адрес) ЗНАЧЕНИЯ (%s) О КОНФЛИКТЕ (адрес) НИЧЕГО НЕ ДЕЛАТЬ """, (адрес_контракта,)) конн.коммит() Я ожидал, что при вставке в базу данных не возникнет проблем, поскольку я заранее попробовал сделать это с помощью метода iter_messages. Лог-файлы мне тоже не помогли.
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как отправлять сообщения пользователям из группы Telegram с помощью Telethon?
Anonymous » » в форуме Python - 0 Ответы
- 30 Просмотры
-
Последнее сообщение Anonymous
-