Код: Выделить всё
@Service
class KafkaProfileService {
Map fetchQueueMap = new HashMap();
ReentrantLock fetchLock = new ReentrantLock();
/**
Etc
**/
class MappingFetchThread implements Runnable {
public void run() {
final int DELAY = 5000;
Thread.currentThread().setName("mapping-fetch-thread");
Long targetTime = System.currentTimeMillis() + DELAY;
while(true) {
if(System.currentTimeMillis() < targetTime) {
try {
Thread.sleep(5000);
}
catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
else {
fetchQueuedRoutingValues();
targetTime = System.currentTimeMillis() + DELAY;
}
}
}
}
private void fetchQueuedRoutingValues() {
HashMap toBeFetched = new HashMap();
fetchLock.lock();
try {
for(String profileName : fetchQueueMap.keySet()) {
/*etc, fetch logic is not in question here */
}
}
finally {
fetchLock.unlock();
}
}
private void addToFetchQueue(String profileName, List routingValues) {
fetchLock.lock();
try {
if(fetchQueueMap.get(profileName) == null) {
fetchQueueMap.put(profileName, new ArrayList());
}
fetchQueueMap.get(profileName).addAll(routingValues);
}
finally {
fetchLock.unlock();
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/798 ... y-in-junit
Мобильная версия