У нас есть такой код, который трижды повторяет вызов API в рамках механизма опроса. Если он не завершится, он создаст дело, вызвав другой API. Происходит следующее: в журналах я вижу два потока — основной и пул потоков, создающие новые дела. Таким образом, идентификатор отслеживания из пула потоков thead принимает значение null и, таким образом, создает дублирующиеся потоки.
Вот соответствующая часть кода. LLM предлагает мне добавить тайм-аут ниже вызова отмены, однако с этим сталкивается другой API, в котором нет тайм-аута. Основной поток
if (entity != null && entity.getId() != null) {
Future future = null;
try {
log.info("Waiting for polling result for id : {} trackingId: {}", entity.getId(), trackingId);
OrderRequest finalEntity = entity;
String finalProductOrderId = productOrderId;
Callable task = () -> checkGetStatusAPIForCeaseLine(customerId, finalProductOrderId, routeEnv, sc, trackingId, finalEntity, req);
future = executor.submit(task);
CeaseLineResponse result = future.get(futureTimeoutSecondsForCeaseLine, TimeUnit.SECONDS);
return result;
} catch (TimeoutException te) {
log.warn("Polling did not complete in time for id: {} trackingId: {}", entity.getId(), trackingId);
future.cancel(true);
return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, req, httpHeaders, trackingId);
} catch (Exception e) {
log.error("Error while waiting for poll result", e);
return CeaseLineResponse.builder()
.httpCode(500)
.message("FAILED")
.orderNo(entity.getBillerOrderNo())
.orderActionId(entity.getBillerOrderActionNo())
.build();
}
}
entity.setRejectReason("Order created but still processing - case Number: " + caseNumber);
entity.setStatus(OrderStatus.SUBMITTED);
final OrderRequest save = ceaseOrderRepo.save(entity);
log.info("SUBMITTED (polling finished) for cease line. Updated row: {} trackingId: {}", save, entity.getTrackingId());
return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
}
Это то, что происходит в запросе и создании нового дела с использованием API
u_correlation_id\":\"null:9001962483:9001962484
Вот что LLM предложила мне добавить после вызова отмены:
// Add a small delay to ensure the polling thread has stopped
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
и прерванная проверка здесь"
for (int i = 0; i < maxAttemptForCeaseLine; i++) {
// Check if thread has been interrupted/cancelled
if (Thread.currentThread().isInterrupted()) {
log.info("Polling cancelled for trackingId: {}", trackingId);
return null;
}
Что здесь не так? Есть мысли?
Изменить: такое поведение наблюдается только при попыткеCount=3, а не для 0,1 и 2. Изменить 2: добавление минимального примера. Однако это не может воспроизвести состояние гонки
package com.order.service.concurrent;
@RunWith(MockitoJUnitRunner.class)
public class DoubleCaseCreationTest {
private static final Logger log = LoggerFactory.getLogger(DoubleCaseCreationTest.class);
// Test configuration
private static final int TIMEOUT_SECONDS = 1;
private static final int POLLING_ATTEMPTS = 3;
private static final int[] POLLING_INTERVALS = {1,1,1}; // 1 second each = 5 seconds total
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
executor = Executors.newFixedThreadPool(5);
amaCaseCallCount = new AtomicInteger(0);
// Mock repository save
when(ceaseOrderRepo.save(any(com..matrix.order.service.entity.OrderRequest.class)))
.thenAnswer(invocation -> invocation.getArgument(0));
// Mock to return IN_PROGRESS (never completes)
OrderStatusResponse inProgressResponse = new OrderStatusResponse();
Message message = new Message();
message.setStatus("IN_PROGRESS");
inProgressResponse.setMessage(message);
/**
* TEST 1: Demonstrates the BUG - Two AMA cases are created
*
* Timeline:
* T=0s: Main thread submits polling task
* T=0-2s: Executor thread polls (attempts 1-2)
* T=2s: Main thread times out
* T=2s: Main thread creates AMA case #1
* T=2-5s: Executor thread continues polling (attempts 3-5)
* T=5s: Executor thread exhausts retries, creates AMA case #2
*
* Result: 2 cases created
*/
@Test
public void testBuggyVersion_CreatesTwoCases() throws Exception {
log.info("\n========================================");
log.info("TEST: BUGGY VERSION - Expects 2 Cases");
log.info("========================================\n");
// Reset counter
amaCaseCallCount.set(0);
// Track AMA case creation calls
AtomicInteger mainThreadCalls = new AtomicInteger(0);
AtomicInteger executorThreadCalls = new AtomicInteger(0);
ceaseOrderService.setAmaCaseCreationCallback((threadName) -> {
int count = amaCaseCallCount.incrementAndGet();
log.warn("AMA CASE #{} CREATED by thread: {}", count, threadName);
if (threadName.contains("main") || threadName.contains("Test worker")) {
mainThreadCalls.incrementAndGet();
} else if (threadName.contains("pool")) {
executorThreadCalls.incrementAndGet();
}
});
// Create test data
com..matrix.order.service.entity.OrderRequest entity = createTestOrder();
CreateAndSubmitSuspendResumeCeaseOrderRequest request = new CreateAndSubmitSuspendResumeCeaseOrderRequest();
// Execute BUGGY version
long startTime = System.currentTimeMillis();
CeaseLineResponse response = ceaseOrderService.createCeaseOrderBuggy(
"customer123",
entity,
"order123",
request,
"demo",
"sc123",
"track123"
);
long endTime = System.currentTimeMillis();
// Wait for executor thread to finish
log.info("\nWaiting for executor thread to complete...");
//Thread.sleep(4000); // Wait for remaining polling attempts
// Results
int totalCases = amaCaseCallCount.get();
log.info("\n========================================");
log.info("BUGGY VERSION RESULTS:");
log.info("Total AMA cases created: {}", totalCases);
log.info("Cases by main thread: {}", mainThreadCalls.get());
log.info("Cases by executor thread: {}", executorThreadCalls.get());
log.info("========================================\n");
// Assertions - BUGGY version creates 2 cases
assertEquals("BUGGY: Should create 2 cases (demonstrating the bug)", 2, totalCases);
assertEquals("Main thread should create 1 case", 1, mainThreadCalls.get());
assertEquals("Executor thread should create 1 case", 1, executorThreadCalls.get());
assertEquals("Should return 202 ACCEPTED", 202, response.getHttpCode());
}
/**
* Service class that implements both BUGGY and FIXED versions
*/
class CeaseOrderService {
private static final Logger log = LoggerFactory.getLogger(CeaseOrderService.class);
private final Service Service;
private final CeaseOrderRepository ceaseOrderRepo;
private final ExecutorService executor;
private final int timeoutSeconds;
private final int maxPollingAttempts;
private final int[] pollingIntervals;
// Callback for testing
private java.util.function.Consumer amaCaseCreationCallback;
future = executor.submit(task);
CeaseLineResponse result = future.get(timeoutSeconds, TimeUnit.SECONDS);
return result;
} catch (TimeoutException te) {
log.warn("[BUGGY] Timeout occurred! Main thread creating case - trackingId: {}", trackingId);
future.cancel(true);
// BUG: No future cancellation!
// Executor thread continues running and will create its own case
for (int i = 0; i < maxPollingAttempts; i++) {
// FIX: Check for interruption at start of loop
if (Thread.currentThread().isInterrupted()) {
log.info("[FIXED POLL] Detected interruption at loop start, exiting cleanly");
return null;
}
entity.setRejectReason("Order created but still processing - case: " + caseNumber);
entity.setStatus(com..matrix.order.service.domain.OrderStatus.SUBMITTED);
ceaseOrderRepo.save(entity);
У нас есть такой код, который трижды повторяет вызов API в рамках механизма опроса. Если он не завершится, он создаст дело, вызвав другой API. Происходит следующее: в журналах я вижу два потока — основной и пул потоков, создающие новые дела. Таким образом, идентификатор отслеживания из пула потоков thead принимает значение null и, таким образом, создает дублирующиеся потоки. Вот соответствующая часть кода. LLM предлагает мне добавить тайм-аут ниже вызова отмены, однако с этим сталкивается другой API, в котором нет тайм-аута. [b]Основной поток[/b] if (entity != null && entity.getId() != null) { Future future = null; try { log.info("Waiting for polling result for id : {} trackingId: {}", entity.getId(), trackingId); OrderRequest finalEntity = entity; String finalProductOrderId = productOrderId; Callable task = () -> checkGetStatusAPIForCeaseLine(customerId, finalProductOrderId, routeEnv, sc, trackingId, finalEntity, req); future = executor.submit(task); CeaseLineResponse result = future.get(futureTimeoutSecondsForCeaseLine, TimeUnit.SECONDS); return result; } catch (TimeoutException te) { log.warn("Polling did not complete in time for id: {} trackingId: {}", entity.getId(), trackingId); future.cancel(true); return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, req, httpHeaders, trackingId); } catch (Exception e) { log.error("Error while waiting for poll result", e); return CeaseLineResponse.builder() .httpCode(500) .message("FAILED") .orderNo(entity.getBillerOrderNo()) .orderActionId(entity.getBillerOrderActionNo()) .build(); } }
entity.setRejectReason("Order created but still processing - case Number: " + caseNumber); entity.setStatus(OrderStatus.SUBMITTED); final OrderRequest save = ceaseOrderRepo.save(entity); log.info("SUBMITTED (polling finished) for cease line. Updated row: {} trackingId: {}", save, entity.getTrackingId());
return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber); }
Это то, что происходит в запросе и создании нового дела с использованием API u_correlation_id\":\"null:9001962483:9001962484 Вот что LLM предложила мне добавить после вызова отмены: // Add a small delay to ensure the polling thread has stopped try { Thread.sleep(100); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); }
и прерванная проверка здесь" for (int i = 0; i < maxAttemptForCeaseLine; i++) { // Check if thread has been interrupted/cancelled if (Thread.currentThread().isInterrupted()) { log.info("Polling cancelled for trackingId: {}", trackingId); return null; }
Что здесь не так? Есть мысли? Изменить: такое поведение наблюдается только при попыткеCount=3, а не для 0,1 и 2. [b]Изменить 2[/b]: добавление минимального примера. Однако это не может воспроизвести состояние гонки package com.order.service.concurrent;
@RunWith(MockitoJUnitRunner.class) public class DoubleCaseCreationTest { private static final Logger log = LoggerFactory.getLogger(DoubleCaseCreationTest.class);
// Test configuration private static final int TIMEOUT_SECONDS = 1; private static final int POLLING_ATTEMPTS = 3; private static final int[] POLLING_INTERVALS = {1,1,1}; // 1 second each = 5 seconds total
@Before public void setUp() { MockitoAnnotations.initMocks(this); executor = Executors.newFixedThreadPool(5); amaCaseCallCount = new AtomicInteger(0);
// Mock repository save when(ceaseOrderRepo.save(any(com..matrix.order.service.entity.OrderRequest.class))) .thenAnswer(invocation -> invocation.getArgument(0));
// Mock to return IN_PROGRESS (never completes) OrderStatusResponse inProgressResponse = new OrderStatusResponse(); Message message = new Message(); message.setStatus("IN_PROGRESS"); inProgressResponse.setMessage(message);
/** * TEST 1: Demonstrates the BUG - Two AMA cases are created * * Timeline: * T=0s: Main thread submits polling task * T=0-2s: Executor thread polls (attempts 1-2) * T=2s: Main thread times out * T=2s: Main thread creates AMA case #1 * T=2-5s: Executor thread continues polling (attempts 3-5) * T=5s: Executor thread exhausts retries, creates AMA case #2 * * Result: 2 cases created */ @Test public void testBuggyVersion_CreatesTwoCases() throws Exception { log.info("\n========================================"); log.info("TEST: BUGGY VERSION - Expects 2 Cases"); log.info("========================================\n");
// Reset counter amaCaseCallCount.set(0);
// Track AMA case creation calls AtomicInteger mainThreadCalls = new AtomicInteger(0); AtomicInteger executorThreadCalls = new AtomicInteger(0);
ceaseOrderService.setAmaCaseCreationCallback((threadName) -> { int count = amaCaseCallCount.incrementAndGet(); log.warn("AMA CASE #{} CREATED by thread: {}", count, threadName);
if (threadName.contains("main") || threadName.contains("Test worker")) { mainThreadCalls.incrementAndGet(); } else if (threadName.contains("pool")) { executorThreadCalls.incrementAndGet(); } });
// Create test data com..matrix.order.service.entity.OrderRequest entity = createTestOrder(); CreateAndSubmitSuspendResumeCeaseOrderRequest request = new CreateAndSubmitSuspendResumeCeaseOrderRequest();
// Execute BUGGY version long startTime = System.currentTimeMillis(); CeaseLineResponse response = ceaseOrderService.createCeaseOrderBuggy( "customer123", entity, "order123", request, "demo", "sc123", "track123" ); long endTime = System.currentTimeMillis();
// Wait for executor thread to finish log.info("\nWaiting for executor thread to complete..."); //Thread.sleep(4000); // Wait for remaining polling attempts
// Results int totalCases = amaCaseCallCount.get(); log.info("\n========================================"); log.info("BUGGY VERSION RESULTS:"); log.info("Total AMA cases created: {}", totalCases); log.info("Cases by main thread: {}", mainThreadCalls.get()); log.info("Cases by executor thread: {}", executorThreadCalls.get()); log.info("========================================\n");
// Assertions - BUGGY version creates 2 cases assertEquals("BUGGY: Should create 2 cases (demonstrating the bug)", 2, totalCases); assertEquals("Main thread should create 1 case", 1, mainThreadCalls.get()); assertEquals("Executor thread should create 1 case", 1, executorThreadCalls.get()); assertEquals("Should return 202 ACCEPTED", 202, response.getHttpCode()); }
/** * Service class that implements both BUGGY and FIXED versions */ class CeaseOrderService { private static final Logger log = LoggerFactory.getLogger(CeaseOrderService.class);
private final Service Service; private final CeaseOrderRepository ceaseOrderRepo; private final ExecutorService executor; private final int timeoutSeconds; private final int maxPollingAttempts; private final int[] pollingIntervals;
// Callback for testing private java.util.function.Consumer amaCaseCreationCallback;
future = executor.submit(task); CeaseLineResponse result = future.get(timeoutSeconds, TimeUnit.SECONDS); return result;
} catch (TimeoutException te) { log.warn("[BUGGY] Timeout occurred! Main thread creating case - trackingId: {}", trackingId); future.cancel(true); // BUG: No future cancellation! // Executor thread continues running and will create its own case
for (int i = 0; i < maxPollingAttempts; i++) { // FIX: Check for interruption at start of loop if (Thread.currentThread().isInterrupted()) { log.info("[FIXED POLL] Detected interruption at loop start, exiting cleanly"); return null; }
entity.setRejectReason("Order created but still processing - case: " + caseNumber); entity.setStatus(com..matrix.order.service.domain.OrderStatus.SUBMITTED); ceaseOrderRepo.save(entity);