Трансляция Spark и состояние гонкиJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Трансляция Spark и состояние гонки

Сообщение Anonymous »

В моем приложении потоковой передачи Spark я управляю переменной в узле драйвера и передаю эту переменную всем исполнителям перед выполнением readStream(…)
Но мне нужно обновить широковещательная переменная, когда событие происходит в узле драйвера. Чтобы добиться этого, я выполняю unpersist() для старого широковещательного значения и затем передаю новое значение.
Итак, мой вопрос: мне интересно, это приведет к состоянию гонки, когда функция «func» ниже (выполняемая в исполнителе) извлекает несохраняемую широковещательную переменную? И если да, то как я могу это исправить? Спасибо!
Выглядит примерно так
// Lives on the Driver Node
public class BroadcastWrapper {
Broadcast broadcastVar;
List ls;
JavaSparkContext jsc;

public BroadcastWrapper(JavaSparkContext jsc) {
this.jsc = jsc;
this.ls = new ArrayList();
this.ls.add(“hello”);
this.broadcastVar = jsc.broadcast(ls);
}

public void addAndupdate(String s) {
this.ls.add(s);
this.broadcastVar.unpersist();
this.broadcastVar = this.jsc.broadcast(this.ls);
}
}

И в моем классе Driver я буду делать что-то вроде этого
// JavaSparkContext initialised before
BroadcastWrapper broadcastWrapper = new BroadcastWrapper(javaSparkContext);
Dataset ds = spark.readStream(…);

// func internally calls broadcastWrapper.broadcastVar.value to retrieve the value or list
ds.map(data -> func(broadcastWrapper));


Подробнее здесь: https://stackoverflow.com/questions/790 ... -condition
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»