Задача Akka Согласованное завершение работы BeforeServiceUnbind не завершается до завершения на узле молодого кластераJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Задача Akka Согласованное завершение работы BeforeServiceUnbind не завершается до завершения на узле молодого кластера

Сообщение Anonymous »

У меня возникла проблема с Akka ClimatedShutdown с несколькими узлами кластера.
Для тестирования в локальной среде я настроил приложение для создания кластера из двух экземпляров JVM, работающих на фиксированных портах. Это также приложение с пружинной загрузкой, предоставляющее некоторый API.
Приложение состоит из N (настраиваемых) рабочих процессов и 1 корневого актера (того, который создан для инициализации ActorSystem с определенным корневым поведением). ) и SingletonActor, которые организуют работу, выполняемую рабочими актерами, и получают процедуры завершения или ошибки от работника.
Во время инициализации экземпляра исполнитель создается, а затем добавляется в Receptionist, поскольку ими управляют с помощью RoundRobinRouter, затем в службу было отправлено сообщение RegisterWorkers с использованием шаблона запроса, чтобы гарантировать доставку сообщения.
Затем нам нужно управлять тем, когда экземпляр отключается, и для этой области скоординированная функциональность кажется идеальной. Поэтому я использовал
CoordinatedShutdown.get(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeUnbind(), "shutdownHook", () -> {
shutdownHook();
return CompletableFuture.completedStage(Done.done());
});

shutdownHook отменяет регистрацию работников экземпляра у Receptionist, отправляет сообщение DeregisterWorkers в службу, всегда используя шаблон запроса. До этого момента все работает хорошо.
Затем он отправляет работникам директиву о прекращении. У этого сообщения есть два способа:
  • Если актер завершил работу или даже не был запущен, обработчик ответит, что работник выключен после установки внутреннего флаг.
  • Если актер выполняет обработку, он прерывает операцию над самим актером и отправляет в службу сообщение о сбое, позволяя службе считать эту обработку между завершенными.Эта часть проблематична. Хотя это работает, если завершение работы выполняется на том же узле кластера одноэлементного актера, на других узлах возникают две проблемы.
Вот первая проблема. Если рабочие находятся на другом члене кластера и уважают SingletonActor, службу, рабочий отправляет это сообщение в службу, но служба не обрабатывает его в течение короткого времени, а только после того, как все другие рабочие процессы завершат свою обработку или На этом узле инициируется процедура выключения, что может произойти даже через несколько минут после выключения.

Я ожидаю, что эти сообщения будут обработаны в течение короткого времени, максимум несколько секунд, учитывая, что этот субъект не получать много сообщений (1, например, вверх, 1, например, вниз, 1 для обработки работника (обработка, которая может занять несколько минут) и несколько для проверки состояния службы (от контроллера загрузки Spring, срабатывание API)).Другая проблема заключается в том, что в этой части кода Akka когда-нибудь выйдет слишком рано.
В хуке есть цикл, который отправляет актеру команду завершения работы и ожидает ответа. Если возвращаемое значение в порядке (что означает, что работа не обрабатывается), рабочий процесс удаляется из списка рабочих экземпляров, если он возвращает WAIT, он оставляет рабочий процесс для повторной попытки после цикла.
В этой процедуре используется Await. .result для DefaultPromise из scala, чтобы заблокировать выполнение перехватчика до тех пор, пока актер не ответит.
List activeWorkers = new ArrayList();
while(workers.isEmpty()){ //workers is defined globally, assigned at initialization
workers.foreach( worker -> {
Promise.DefaultPromise promise = new Promise.DefaultPromise();
actorSystem.log().error("Shut down actor {}", worker.path().name());
context.ask(Worker.ShutdownStatus.class, worker, Duration.ofSecond(20),
Worker.ShutdownActor::new, (resp, thr) -> {
if(resp == ShutdownStatus.OK){
promise.success(false);
actorSystem.log().error("Actor shut down {}", worker.path().name());
}
else {
promise.success(true);
actorSystem.log().error("Actor still busy {}", worker.path().name());
}
return new Nil(); /*Dummy command with no processing on the calling actor side*/
}
);
boolean res = Await.result(promise, Duration.create(21, TimeUnit.SECONDS)); //For brevity the handling of the exceptions isn't written here. also this Duration is the scala class.
if(res){
activeWorkers.add(worker);
}
});
workers.removeIf( el -> !activeWork.contains(el));
activeWork.clear();
}
actorSystem.log().error("All workers down. End hook");

Однако этот код часто не выполняется полностью. Последнее зарегистрированное сообщение много раз вообще не исполняется, если только все рабочие процессы уже не «Готово».
Я также пытался отправить все ShutdownStatus и только затем ОЖИДАТЬ обещание
Обратите внимание, что здесь нет тайм-аута. Время ожидания для этапа устанавливается вручную на большее время, чем время выхода приложения после инициирования завершения работы.
Сообщение ShutdownActor обрабатывается таким образом в рабочем процессе boolean isShutdown = this.shutdown.getandSet(true);
if(workDone.get()) {
shutdown.resp.tell(ShutdownStatus.OK);
return Behaviors.stopped();
}
if(!isShutdown) {
killswitch.shutdown();
getContext().ask(Service.AckChunk.class, replyTo , Duration.ofSeconds(10), //ReplyTo is the server singleton
res -> Service.Failed(res),
(ret,thr) -> {
workDone.set(true);
return new Nil();
}
}
shutdown.resp.tell(ShutdownStatus.WAIT);
return Behaviors.same();
}


Подробнее здесь: https://stackoverflow.com/questions/790 ... ermination
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»