Неблокирующая очередь с использованием JCToolsJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Неблокирующая очередь с использованием JCTools

Сообщение Anonymous »

Я реализовал очередь Multi-Producer Multi-Consumer (MPMC) без блокировок с семантикой фиксации на Java. Очередь предназначена для сценариев, в которых объекты должны быть обработаны только один раз, с поддержкой повторной постановки в очередь в случае сбоя.
Основные особенности
Без блокировок: для безопасности потоков используются операции CAS вместо блокировок.
Семантика фиксации: потребитель должен явно зафиксировать после обработки объекта.
Дедупликация объекта: один и тот же объект (по идентификатору) может быть добавлен несколько раз, но существует в очереди только один раз.
Запросить Поддержка: Неудачная обработка может вызвать автоматическую повторную постановку в очередь FIFO: Поддерживает порядок для сценариев с одним производителем
Сценарий использования Эта очередь полезна, когда у вас:
Несколько производителей добавляют рабочие элементы, идентифицированные уникальными идентификаторами
Несколько потребителей, обрабатывающих эти элементы
Необходимо убедиться, что элемент не обрабатывается дважды одновременно
Хотите запросить элементы, которые не удалось обработать.
Есть ли способ улучшить функцию добавления, например, с точки зрения производительности?
Реализация без блокировки:
import org.jctools.queues.MpmcUnboundedXaddArrayQueue;
import org.jctools.maps.NonBlockingHashMapLong;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

/**
* PRODUCTION-READY: Lock-free MPMC queue with commit semantics
*
* FIXES:
* 1. Added ADDING state to prevent visibility issues
* 2. Proper cleanup on CAS failures
* 3. No more "Failed to transition to QUEUED" errors
* 4. Maintains FIFO ordering
* 5. No lost wake-ups
* 6. Thread-safe for multiple producers and consumers
*
* @param entity type that must have a stable identity (implement getId())
*/
public class QueueLockFree implements QueueWithCommit {

private final MpmcUnboundedXaddArrayQueue readyQueue;
private final NonBlockingHashMapLong entityStore;
private final NonBlockingHashMapLong stateMap;
private final QueueMonitoring monitoring;
private final AtomicLong waitingConsumers = new AtomicLong(0);
private final Set consumerThreads = ConcurrentHashMap.newKeySet();

// ✅ FIXED: Added ADDING state
private enum State {
ADDING, // Being added (not visible to contains() or takeable yet)
QUEUED, // In ready queue (visible and takeable)
PROCESSING, // Taken by a consumer
PENDING_REQUEUE // Waiting to be requeued after commit
}

@SuppressWarnings("unchecked")
public QueueLockFree() {
this((QueueMonitoring) QueueMonitoring.VOID);
}

public QueueLockFree(QueueMonitoring monitoring) {
this.readyQueue = new MpmcUnboundedXaddArrayQueue(1024);
this.entityStore = new NonBlockingHashMapLong();
this.stateMap = new NonBlockingHashMapLong();
this.monitoring = monitoring;
}

/**
* Add entity to queue (non-blocking, thread-safe for multiple producers)
* ✅ PRODUCTION FIX: Uses ADDING state to make the operation atomic
*/
@Override
public void add(T entity) {
if (entity == null) {
throw new NullPointerException("Entity cannot be null");
}

long id = entity.getId();
int spinCount = 0;

while (true) {
EntityState currentState = stateMap.get(id);

if (currentState == null) {
// ✅ NEW ENTITY - Use 4-step atomic addition

// Step 1: Reserve the ID with ADDING state (not visible yet)
if (stateMap.putIfAbsent(id, EntityState.ADDING) != null) {
// Another thread reserved it first, retry
if (++spinCount < 10) {
Thread.onSpinWait();
} else if (spinCount < 20) {
Thread.yield();
} else {
LockSupport.parkNanos(1_000);
spinCount = 0;
}
continue;
}

// Step 2: Store the entity (we own the ID now) - allocate wrapper only after reserving
entityStore.put(id, new EntityWrapper(entity));

// Step 3: Add to ready queue
readyQueue.offer(id);

// Step 4: Atomically transition ADDING → QUEUED (now visible and takeable)
if (!stateMap.replace(id, EntityState.ADDING, EntityState.QUEUED)) {
// This should NEVER happen - we own this ID
// But handle it defensively
entityStore.remove(id);
stateMap.remove(id);
throw new IllegalStateException("Concurrent modification during add() for entity " + id);
}

monitoring.add(entity);
wakeupConsumers();
return;

} else if (currentState.state == State.ADDING) {
// ✅ Another thread is currently adding this entity
// Wait for them to finish
if (++spinCount < 10) {
Thread.onSpinWait();
} else if (spinCount < 20) {
Thread.yield();
} else {
LockSupport.parkNanos(1_000);
spinCount = 0;
}
continue;

} else if (currentState.state == State.PROCESSING) {
// Entity is being processed, mark for requeue
if (stateMap.replace(id, currentState, EntityState.PENDING_REQUEUE)) {
// Update entity if replacement strategy says so
EntityWrapper existing = entityStore.get(id);
if (existing != null && replace(existing.entity, entity)) {
existing.entity = entity;
}
return;
}
Thread.onSpinWait();

} else if (currentState.state == State.QUEUED) {
// Already queued, potentially replace
EntityWrapper existing = entityStore.get(id);
if (existing != null && replace(existing.entity, entity)) {
existing.entity = entity;
}
return;

} else if (currentState.state == State.PENDING_REQUEUE) {
// Already pending, potentially replace
EntityWrapper existing = entityStore.get(id);
if (existing != null && replace(existing.entity, entity)) {
existing.entity = entity;
}
return;
}
}
}

@Override
public void addAll(Collection

Подробнее здесь: https://stackoverflow.com/questions/798 ... ng-jctools
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»