import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode, to_date, first, last, max, min, avg, sum, lit
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, DoubleType, LongType, IntegerType
import time
# Cấu hình logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Các thông số kết nối
kafka_topic_name = "finnhub_data"
kafka_bootstrap_servers = "localhost:9092"
mysql_host_name = "localhost"
mysql_port_no = "3306"
mysql_database_name = "finnhub_processed"
mysql_driver_class = "com.mysql.cj.jdbc.Driver" # Cập nhật driver class
mysql_table_name = "processed_trade_data"
mysql_user_name = "tuanle"
mysql_password = "123456"
mysql_jdbc_url = f"jdbc:mysql://{mysql_host_name}:{mysql_port_no}/{mysql_database_name}"
cassandra_host_name = "localhost"
cassandra_port_no = "9042"
cassandra_keyspace_name = "finnhub_data"
cassandra_table_name = "raw_trade_data"
def save_to_cassandra(df, epoch_id):
logger.info(f"Saving batch {epoch_id} to Cassandra")
df_to_save = df.select(
col("symbol"),
(col("trade_time") / 1000).cast("timestamp").alias("trade_time"),
col("price"),
col("volume"),
col("conditions"),
col("company")
)
df_to_save.write \
.format("org.apache.spark.sql.cassandra") \
.mode("append") \
.options(table=cassandra_table_name, keyspace=cassandra_keyspace_name) \
.save()
logger.info(f"Batch {epoch_id} saved to Cassandra successfully")
def process_and_save_to_mysql(spark):
logger.info("Starting to process data from Cassandra and save to MySQL")
df_cassandra = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table=cassandra_table_name, keyspace=cassandra_keyspace_name) \
.load()
df_processed = df_cassandra \
.withColumn("trade_date", to_date(col("trade_time"))) \
.groupBy("symbol", "trade_date") \
.agg(
first("price").alias("open_price"),
last("price").alias("close_price"),
max("price").alias("high_price"),
min("price").alias("low_price"),
avg("price").alias("avg_price"),
sum("volume").alias("total_volume")
) \
.withColumn("processed_time", lit(time.strftime("%Y-%m-%d %H:%M:%S")))
df_processed.write \
.jdbc(url=mysql_jdbc_url,
table=mysql_table_name,
mode="append",
properties={
"user": mysql_user_name,
"password": mysql_password,
"driver": mysql_driver_class
})
logger.info("Data processed and saved to MySQL successfully")
if __name__ == "__main__":
logger.info("Data Processing Application Started ...")
# Khởi tạo Spark Session và định nghĩa cấu hình cần thiết
spark = SparkSession.builder \
.appName("PySpark Structured Streaming with Kafka, Cassandra, and MySQL") \
.config("spark.streaming.stopGracefullyOnShutdown", True) \
.config("spark.jars", f"file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars/jsr305-3.0.0.jar,file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-cassandra-connector_2.12-3.5.1.jar,file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-sql-kafka-0-10_2.12-3.5.2.jar,file:///usr/share/java/mysql-connector-java.jar,file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars/kafka-clients-3.8.0.jar") \
.config("spark.sql.shuffle.partitions", 4) \
.config("spark.cassandra.connection.host", cassandra_host_name) \
.config("spark.cassandra.connection.port", cassandra_port_no) \
.config("spark.sql.mysql.host", mysql_host_name) \
.config("spark.sql.mysql.port", mysql_port_no) \
.config("spark.cassandra.connection.keep_alive_ms", "60000") \
.master("local[4]") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logger.info("Spark Session initialized successfully")
finnhub_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.option("startingOffsets", "latest") \
.load()
logger.info("Printing Schema of finnhub_df:")
finnhub_df.printSchema()
finnhub_schema = StructType([
StructField("data", ArrayType(StructType([
StructField("c", ArrayType(StringType())),
StructField("p", DoubleType()),
StructField("s", StringType()),
StructField("t", LongType()),
StructField("v", IntegerType()),
StructField("company", StringType())
]))),
StructField("type", StringType())
])
finnhub_df1 = finnhub_df.selectExpr("CAST(value AS STRING)", "timestamp")
finnhub_df2 = finnhub_df1.select(from_json(col("value"), finnhub_schema).alias("finnhub"), "timestamp")
finnhub_df3 = finnhub_df2.select("finnhub.data", "timestamp")
finnhub_df4 = finnhub_df3.select(explode("data").alias("trade_data"), "timestamp")
finnhub_df5 = finnhub_df4.select(
col("trade_data.s").alias("symbol"),
col("trade_data.t").alias("trade_time"),
col("trade_data.p").alias("price"),
col("trade_data.v").alias("volume"),
col("trade_data.c").alias("conditions"),
col("trade_data.company").alias("company"),
col("timestamp")
)
query_cassandra = finnhub_df5 \
.writeStream \
.trigger(processingTime='15 seconds') \
.outputMode("append") \
.foreachBatch(save_to_cassandra) \
.start()
try:
while True:
time.sleep(300) # Wait for 5 minutes
process_and_save_to_mysql(spark)
except KeyboardInterrupt:
logger.info("Application interrupted. Stopping streams...")
query_cassandra.stop()
spark.stop()
logger.info("Application stopped successfully")
except Exception as e:
logger.error(f"An error occurred: {str(e)}")
query_cassandra.stop()
spark.stop()
logger.info("Application stopped due to an error")
Ошибка, возникшая после запуска spark_streaming:
Вот краткий обзор ошибок, обнаруженных в журнале:
Отсутствуют файлы JAR: несколько файлов JAR не найдены в указанном каталоге Spark, в том числе:
jsr305-3.0.0.jar
spark-cassandra-connector_2.13-3.5.1.jar< /p>
spark-sql-kafka-0-10_2.13-3.5.2.jar
spark-streaming-kafka-0-10_2.13-3.5.2.jar
kafka -clients-3.8.0.jar
Ошибка, связанная с Kafka : java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater Эта ошибка предполагает, что зависимости, связанные с Kafka, отсутствуют или загружены неправильно.
Ошибка, связанная с Scala: java.lang.NoClassDefFoundError: scala/$less$colon$less Эта ошибка указывает на проблему с зависимостями Scala, возможно, из-за несовместимости версий.
Ошибка, связанная с Cassandra: произошла ошибка при попытке загрузить данные из Cassandra: java.lang.NoClassDefFoundError: scala/$less$colon$less
01.10.2024 10:11:54,931 - ИНФОРМАЦИЯ - Приложение остановлено из-за ошибки
2024-10 -01 10:11:54,931 - ИНФОРМАЦИЯ - Закрытие соединения клиент-сервер Я пытаюсь загрузить весь jar: jsr305-3.0.0.jar, spark-cassandra- Connector_2.12-3.5.1.jar, spark-sql-kafka-0-10_2.12-3.5.2.jar, kafka-clients-3.8.0.jar Затем переместите файлы JAR в папку jars Spark и обновите ПУТЬ К КЛАССУ: Я запускаю команду: spark-shell --version и вот результат: SPARK версии 3.5.2. Использование Scala версии 2.12.18, 64-битной серверной виртуальной машины OpenJDK, 1.8.0_422. Настройки, которые я скачал. : Cassandra 4.1.6, MySQL, Hadoop-3.4.0, mysql-connector-j-9.0.0, kafka_ 2.12-3.8.0, sbt-1.10.2, spark-3.5.2-bin-hadoop3, scala-2.13. 14
Но при запуске Spark я все равно получаю сообщение об ошибке. Как это исправить. Пожалуйста, помогите мне подробно. Всем спасибо
Если можно, что можно переустановить? Пожалуйста, дайте мне совет, чтобы не удалять все и не скачивать заново. Пожалуйста, помогите мне
try: while True: time.sleep(300) # Wait for 5 minutes process_and_save_to_mysql(spark) except KeyboardInterrupt: logger.info("Application interrupted. Stopping streams...") query_cassandra.stop() spark.stop() logger.info("Application stopped successfully") except Exception as e: logger.error(f"An error occurred: {str(e)}") query_cassandra.stop() spark.stop() logger.info("Application stopped due to an error") [/code] [b]Ошибка, возникшая после запуска spark_streaming:[/b] Вот краткий обзор ошибок, обнаруженных в журнале: [list] [*]Отсутствуют файлы JAR: несколько файлов JAR не найдены в указанном каталоге Spark, в том числе: [list] jsr305-3.0.0.jar
[*]spark-cassandra-connector_2.13-3.5.1.jar< /p>
[*]spark-sql-kafka-0-10_2.13-3.5.2.jar
[*]spark-streaming-kafka-0-10_2.13-3.5.2.jar
[*]kafka -clients-3.8.0.jar
[/list]
[*]Ошибка, связанная с Kafka : java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater Эта ошибка предполагает, что зависимости, связанные с Kafka, отсутствуют или загружены неправильно.
[*]Ошибка, связанная с Scala: java.lang.NoClassDefFoundError: scala/$less$colon$less Эта ошибка указывает на проблему с зависимостями Scala, возможно, из-за несовместимости версий.
Ошибка, связанная с Cassandra: произошла ошибка при попытке загрузить данные из Cassandra: java.lang.NoClassDefFoundError: scala/$less$colon$less
[/list] 01.10.2024 10:11:54,931 - ИНФОРМАЦИЯ - Приложение остановлено из-за ошибки 2024-10 -01 10:11:54,931 - ИНФОРМАЦИЯ - Закрытие соединения клиент-сервер [b]Я пытаюсь загрузить весь jar:[/b] jsr305-3.0.0.jar, spark-cassandra- Connector_2.12-3.5.1.jar, spark-sql-kafka-0-10_2.12-3.5.2.jar, kafka-clients-3.8.0.jar [b]Затем переместите файлы JAR в папку jars Spark и обновите ПУТЬ К КЛАССУ:[/b] [b]Я запускаю команду:[/b] spark-shell --version [b] и вот результат:[/b] SPARK версии 3.5.2. Использование Scala версии 2.12.18, 64-битной серверной виртуальной машины OpenJDK, 1.8.0_422. [b]Настройки, которые я скачал. :[/b] Cassandra [b]4.1.6[/b], MySQL, Hadoop-[b]3.4.0,[/b] mysql-connector-j-[b]9.0.0,[/b] kafka_ [b]2.12-3.8.0,[/b] sbt-[b]1.10.2,[/b] spark-[b]3.5.2[/b]-bin-hadoop3, scala-[b]2.13. 14[/b] Но при запуске Spark я все равно получаю сообщение об ошибке. Как это исправить. Пожалуйста, помогите мне подробно. Всем спасибо Если можно, что можно переустановить? Пожалуйста, дайте мне совет, чтобы не удалять все и не скачивать заново. Пожалуйста, помогите мне
ERROR SparkContext: Failed to add home/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-streaming-kafka-0-10_2.13-3.5.2.jar to Spark environment s
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import...
ERROR SparkContext: Failed to add home/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-streaming-kafka-0-10_2.13-3.5.2.jar \
to Spark environment
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import...
Я пытаюсь запустить сеанс Spark в Jupyter Notebook на компьютере EC2 Linux с помощью кода Visual Studio. Мой код выглядит следующим образом:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName( spark_app ).getOrCreate()...
Я сталкиваюсь с вышеуказанным исключением, когда пытаюсь применить метод (ComputeDwt) к входным данным RDD[(Int,ArrayBuffer )].
Я даже использую расширения Параметр сериализации для сериализации объектов в Spark. Вот фрагмент кода....