Ошибка синтаксического анализатора ruamel.yaml при использовании Pyflink через DockerPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Ошибка синтаксического анализатора ruamel.yaml при использовании Pyflink через Docker

Сообщение Anonymous »

Я пытаюсь запустить задание flink, которое потребляет и передает данные из/в источник Kafka. Чтобы отправить задание, я использую библиотеку pyflink.
Я использую:
Python 3.8.0
Flink 1.19.0
Файл Docker для этого проекта:

Код: Выделить всё

version: "3.0"

services:

zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- uwb_streaming_net

broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
expose:
- '29092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
networks:
- uwb_streaming_net

jobmanager:
image: pyflink:latest
ports:
- "8081:8081"
expose:
- "6123"
command:  jobmanager # cp opt/flink/conf/config.yaml opt/flink && jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./opt/flink:/flink
- ./kafka_test:/kafka_test
networks:
- uwb_streaming_net

taskmanager:
image: pyflink:latest
depends_on:
- jobmanager
expose:
- "6121"
- "6122"
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
networks:
- uwb_streaming_net

app:
# /flink/bin/flink run -py /taskscripts/app.py --jobmanager jobmanager:8081 --target local
image: flink-app
build:
context: .
dockerfile: ./App.DockerFile
depends_on:
- broker
entrypoint:
- tail
command:
- -f
- /dev/null
volumes:
- ./opt/flink:/opt/flink
- ./kafka_test:/kafka_test
networks:
- uwb_streaming_net

networks:
uwb_streaming_net:
external: false
name: uwb_streaming_net
Для задания и диспетчера задач flink мне пришлось использовать собственное изображение вместо официального изображения flink. Это изображение добавляет Python в эти контейнеры

Код: Выделить всё

FROM flink:latest

# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.

RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev liblzma-dev zlib1g-dev libbz2-dev libffi-dev && \
wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
tar -xvf Python-3.8.0.tgz && \
cd Python-3.8.0 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd ..  && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# install PyFlink

# COPY apache-flink*.tar.gz /
RUN pip3 install --upgrade pip

RUN wget https://files.pythonhosted.org/packages/6e/08/b36307d608aa76f14bfc972a0b1ff15920cdee54414864c9dca4cff5d065/apache-flink-libraries-1.19.0.tar.gz && \
pip3 install apache-flink-libraries*.tar.gz && \
rm -rf apache-flink-libraries*.tar.gz

RUN wget https://files.pythonhosted.org/packages/4f/d7/633fefca40de6522bb704e9daaabecff27c15c521cede6a0450cedba8b9a/apache-flink-1.19.0.tar.gz && \
pip3 install apache-flink*.tar.gz &&  \
rm -rf apache-flink*.tar.gz
Питон для отправки задания завершается с ошибкой при выполнении команды tbl_env.execute_sql(src_ddl) для любого значения, присвоенного переменной src_ddl:

Код: Выделить всё

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
import os
from pyflink.java_gateway import get_gateway

def main():

jvm = get_gateway().jvm
jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()

# Initialize the StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/flink-sql-connector-kafka_2.11-1.13.0.jar")

settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.build()

# create table environment
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)

src_ddl = """
CREATE TABLE incomming (
msg VARCHAR,
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'broker:29092,localhost:9092',
'properties.group.id' = 'mygroup',
'format'= 'json'
);
"""
tbl_env.execute_sql(src_ddl)

tbl=tbl_env.from_path('incomming')
tbl.print_schema()

sql = """
SELECT
msg,
FROM incomming;
"""

message = tbl_env.sql_query(sql)
message.print_schema()

snk_ddl = """
CREATE TABLE outcome (
msg VARCHAR,
) WITH (
'connector' = 'kafka',
'topic' = 'test_from_kafka',
'properties.bootstrap.servers' = 'broker:29092,localhost:9092',
'properties.group.id' = 'mygroup',
'format'= 'json'
);
"""

tbl_env.execute_sql(snk_ddl)

message.execute_insert('outcome').wait()

# Execute the job
env.execute("Kafka Streaming Job")

if __name__ == '__main__':
main()
Вот что я вижу в журнале исключений:

Код: Выделить всё

Traceback (most recent call last):
File "kafka_test/flink.py", line 76, in 
main()
File "kafka_test/flink.py", line 42, in main
tbl_env.execute_sql(src_ddl)
File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 836, in execute_sql
File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1613, in _before_execute
File "/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1548, in _add_jars_to_j_env_config
File "/flink/opt/python/pyflink.zip/pyflink/common/configuration.py", line 83, in parse_jars_value
File "/usr/local/lib/python3.8/site-packages/ruamel/yaml/main.py", line 451, in load
return constructor.get_single_data()
File "/usr/local/lib/python3.8/site-packages/ruamel/yaml/constructor.py", line 112, in get_single_data
node = self.composer.get_single_node()
File "_ruamel_yaml.pyx", line 707, in ruamel.yaml.clib._ruamel_yaml.CParser.get_single_node
File "_ruamel_yaml.pyx", line 904, in ruamel.yaml.clib._ruamel_yaml.CParser._parse_next_event
ruamel.yaml.parser.ParserError: did not find expected 
in "", line 1, column 44
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code:  1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 14 more
Примеры, предоставленные Apache, работают нормально, но ни один из них не использует команду Execute_sql()
Я пытался обновить ruamel.yaml, проверьте разные значения src_ddl. Не знаю, что еще попробовать.

Подробнее здесь: https://stackoverflow.com/questions/783 ... ver-docker
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»