Для тестирования в локальной среде я настроил приложение для создания кластера из двух экземпляров JVM, работающих на фиксированных портах. Это также приложение с пружинной загрузкой, предоставляющее некоторый API.
Приложение состоит из N (настраиваемых) рабочих процессов и 1 корневого актера (того, который создан для инициализации ActorSystem с определенным корневым поведением). ) и SingletonActor, которые организуют работу, выполняемую рабочими актерами, и получают процедуры завершения или ошибки от работника.
Во время инициализации экземпляра исполнитель создается, а затем добавляется в Receptionist, поскольку ими управляют с помощью RoundRobinRouter, затем в службу было отправлено сообщение RegisterWorkers с использованием шаблона запроса, чтобы гарантировать доставку сообщения.
Затем нам нужно управлять тем, когда экземпляр отключается, и для этой области скоординированная функциональность кажется идеальной. Поэтому я использовал
CoordinatedShutdown.get(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeUnbind(), "shutdownHook", () -> {
shutdownHook();
return CompletableFuture.completedStage(Done.done());
});
shutdownHook отменяет регистрацию работников экземпляра у Receptionist, отправляет сообщение DeregisterWorkers в службу, всегда используя шаблон запроса. До этого момента все работает хорошо.
Затем он отправляет работникам директиву о прекращении. У этого сообщения есть два способа:
- Если актер завершил работу или даже не был запущен, обработчик ответит, что работник выключен после установки внутреннего флаг.
- Если актер выполняет обработку, он прерывает операцию над самим актером и отправляет в службу сообщение о сбое, позволяя службе считать эту обработку между завершенными.Эта часть проблематична. Хотя это работает, если завершение работы выполняется на том же узле кластера одноэлементного актера, на других узлах возникают две проблемы.
Я ожидаю, что эти сообщения будут обработаны в течение короткого времени, максимум несколько секунд, учитывая, что этот субъект не получать много сообщений (1, например, вверх, 1, например, вниз, 1 для обработки работника (обработка, которая может занять несколько минут) и несколько для проверки состояния службы (от контроллера загрузки Spring, срабатывание API)).Другая проблема заключается в том, что в этой части кода Akka когда-нибудь выйдет слишком рано.
В хуке есть цикл, который отправляет актеру команду завершения работы и ожидает ответа. Если возвращаемое значение в порядке (что означает, что работа не обрабатывается), рабочий процесс удаляется из списка рабочих экземпляров, если он возвращает WAIT, он оставляет рабочий процесс для повторной попытки после цикла.
В этой процедуре используется Await. .result для DefaultPromise из scala, чтобы заблокировать выполнение перехватчика до тех пор, пока актер не ответит.
List activeWorkers = new ArrayList();
while(workers.isEmpty()){ //workers is defined globally, assigned at initialization
workers.foreach( worker -> {
Promise.DefaultPromise promise = new Promise.DefaultPromise();
actorSystem.log().error("Shut down actor {}", worker.path().name());
context.ask(Worker.ShutdownStatus.class, worker, Duration.ofSecond(20),
Worker.ShutdownActor::new, (resp, thr) -> {
if(resp == ShutdownStatus.OK){
promise.success(false);
actorSystem.log().error("Actor shut down {}", worker.path().name());
}
else {
promise.success(true);
actorSystem.log().error("Actor still busy {}", worker.path().name());
}
return new Nil(); /*Dummy command with no processing on the calling actor side*/
}
);
boolean res = Await.result(promise, Duration.create(21, TimeUnit.SECONDS)); //For brevity the handling of the exceptions isn't written here. also this Duration is the scala class.
if(res){
activeWorkers.add(worker);
}
});
workers.removeIf( el -> !activeWork.contains(el));
activeWork.clear();
}
actorSystem.log().error("All workers down. End hook");
Однако этот код часто не выполняется полностью. Последнее зарегистрированное сообщение много раз вообще не исполняется, если только все рабочие процессы уже не «Готово».
Я также пытался отправить все ShutdownStatus и только затем ОЖИДАТЬ обещание
Обратите внимание, что здесь нет тайм-аута. Время ожидания для этапа устанавливается вручную на большее время, чем время выхода приложения после инициирования завершения работы.
Сообщение ShutdownActor обрабатывается таким образом в рабочем процессе boolean isShutdown = this.shutdown.getandSet(true);
if(workDone.get()) {
shutdown.resp.tell(ShutdownStatus.OK);
return Behaviors.stopped();
}
if(!isShutdown) {
killswitch.shutdown();
getContext().ask(Service.AckChunk.class, replyTo , Duration.ofSeconds(10), //ReplyTo is the server singleton
res -> Service.Failed(res),
(ret,thr) -> {
workDone.set(true);
return new Nil();
}
}
shutdown.resp.tell(ShutdownStatus.WAIT);
return Behaviors.same();
}
Подробнее здесь: https://stackoverflow.com/questions/790 ... ermination