Пользовательский Spark JDBCDialect не используется в режиме кластераJAVA

Программисты JAVA общаются здесь
Anonymous
Пользовательский Spark JDBCDialect не используется в режиме кластера

Сообщение Anonymous »

У меня есть пользовательский jdbcdialect , и я пытаюсь зарегистрировать его в моем кластере Spark (v3.5.1).
Диалект работает совершенно нормально в локальном режиме. Кроме того, метод CanHandle реализован для всегда возврата true на данный момент. Вот как я исключил URL -адрес DB и диалект из уравнения. Вот как я убедился, что кластер здоров.
Я подготовил минимальный воспроизводимый пример.

Код: Выделить всё

import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.MySQLDialect;
import scala.Option;

public class MemSQL5Dialect extends JdbcDialect {

private static class SQLBuilder extends MySQLDialect.MySQLSQLBuilder {
// My customizations
}

@Override
public Option compileExpression(Expression expr) {
try {
return Option.apply(new SQLBuilder().build(expr));
} catch (Exception e) {
return Option.empty();
}
}

@Override
public boolean canHandle(String url) {
return true;
}
}
реализация приложения:

Код: Выделить всё

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialects;

import java.net.Inet4Address;
import java.util.Properties;

import static org.apache.spark.sql.functions.col;

public class Application {

public static void main(String[] args) throws Exception {
// Cluster is running using Docker, so I use IPv4.
var hostAddress = Inet4Address.getLocalHost().getHostAddress();

// JAR with dialect is copied to the folder inside Docker container, where Bitnami Spark
// keeps all other JARs. Double-checked JAR is there and JAR includes the dialect!
var applicationJarWithDialect = "/opt/bitnami/spark/jars/spark-playground-SNAPSHOT.jar";

var sparkSession = SparkSession.builder()
.appName("Spark Playground")
// When set to "local[*]" works perfectly fine!
.master("spark://localhost:7077")
.config("spark.driver.host", hostAddress)
// I tried also these 2 properties below - did not help.
// .config("spark.driver.extraClassPath", applicationJarWithDialect)
// .config("spark.executor.extraClassPath", applicationJarWithDialect)
.getOrCreate();

// Trying to register the dialect on driver.
JdbcDialects.registerDialect(new MemSQL5Dialect());

var jdbcUrl = String.format("jdbc:mariadb://%s:3306/db", hostAddress);
var jdbcUsername = "root";
var jdbcPassword = "root";
var tableName = "phonebook";

var properties = new Properties();
properties.put("user", jdbcUsername);
properties.put("password", jdbcPassword);
properties.put("driver", "org.mariadb.jdbc.Driver");

try {
sparkSession.read().jdbc(jdbcUrl, tableName, properties)
// If I change filter to something that doesn't require
// dialect changes -- works fine as well!
.filter(col("first_name").startsWith("J"))
.collectAsList()
.forEach(System.out::println);
} finally {
sparkSession.close();
}
}
}
Пожалуйста, помогите понять, чего мне не хватает.


Подробнее здесь: https://stackoverflow.com/questions/795 ... uster-mode

Вернуться в «JAVA»