(https://docs.oracle.com/en/java/javase/ ... reads.html)
довольно ясно, что синхронизированный блок вызывает закрепление потока, или собственные методы, или...
Все началось с другого вопроса SO о MySQL и виртуальных потоках, и это показало что VT не распараллеливает операторы mysql должным образом (в драйверах до 9.0.0, например 8.4). Хотя я наверняка смогу воспроизвести это в ката.... Давайте попробуем!
Я создал 12 задач по 1 потоку на задачу, каждая задача выполняет 3 блокирующая операция 1000 мс внутри синхронизированного блока. На каждую задачу должно уйти 3x1000 мс = 3 секунды. По крайней мере, в ветках платформы. И эти потоки платформы всегда делают это предсказуемо.
Я пишу 4 варианта операций блокировки, как Object.wait() внутри ssynced, как простой Thread.sleep(), как условие ReentrantLock. .await() и в качестве входного потока сокета read(). Мои деньги были в сети прочитаны, так как это сделал драйвер mysql. Кроме того, из исходного кода jdk мы видим, что object.wait() и thread.sleep() стали дружественными к VT. Я даже попробовал, но удалил настройку Pipe Input/Output Stream, потому что все они основаны на одной и той же базовой синхронизации.
В соответствии с документом, в VT я должен ожидать синхронизированные и собственные методы, но не повторные блокировки, чтобы занять больше времени, но это будет зависеть от того, сколько потоков несущей существует... Они не говорят.
Если я предполагаю, что существует столько же потоков несущей, сколько существует являются ядрами, то у меня есть 4 перевозчика. Если бы я действительно закрепил VT, я должен был бы ожидать, что мои задачи будут выполняться только по 4 одновременно, и все задачи завершатся к отметке в 9 секунд.
Я хотел это доказать. Поэтому я написал этот сложный тест ниже.
И мне не удалось закрепить ни одного несущего потока; 12 задач заканчиваются почти одновременно на отметке 3 секунды.
Что еще интересно, я запустил 12 потоков, но в какой-то момент было использовано 14 носителей! Я лол. Отличный вариант оптимизации, которая должна сократить число потоков!
Чтобы увидеть идентификатор переносимого потока, я использовал JNA для извлечения ядра 32 GetCurrentThreadId().
Зависимость Maven:
Код: Выделить всё
net.java.dev.jna
jna
5.15.0
net.java.dev.jna
jna-platform
5.15.0
Код: Выделить всё
package vttests;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadFactory;
import com.sun.jna.Library;
import com.sun.jna.Native;
public class K32 {
record TD(Integer pid, Integer tid) {}
public interface MyKernel32 extends Library {
int GetCurrentProcessId();
int GetCurrentThreadId();
}
static MyKernel32 K32;
static {
try {
K32 = Native.load("kernel32", MyKernel32.class);
} catch (Throwable t) {
t.printStackTrace(System.out);
}
}
static Integer pid() {
return K32 != null ? K32.GetCurrentProcessId() : null;
}
static Integer tid() {
return K32 != null ? K32.GetCurrentThreadId() : null;
}
// static ThreadLocal thrDetailsTL = ThreadLocal.withInitial(() -> new TD(pid(), tid()));
static void p(Object msg) {
// TD td = thrDetailsTL.get(); //it is WRONG to cache this info, because the carrier thread (tid) can change!!!!!!!!
System.out.println("["+ (Thread.currentThread().isVirtual() ? "V" : "P")+ "/"
+ Thread.currentThread().getName()
+ "("+ pid()//+ td.pid
+ "."+ tid()//+ td.tid
+ ")]: "+ msg);
}
public static void main(String[] args) throws InterruptedException {
Set tids = Collections.synchronizedSet(new TreeSet());
Runnable r = () -> {
for(int i=0; i< 10; i++) {
int tid = tid();
tids.add(tid);
p("Hello "+i+" ("+tid+")");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
p(" "+i+": "+e);
}
}
};
ThreadFactory tf = Thread.ofVirtual().factory();
Thread t1 = tf.newThread(r);
Thread t2 = tf.newThread(r);
t1.start();
t2.start();
//t1.join();
//t2.join();
Thread.sleep(6000);
p("thread ids seen: "+tids); //I usually see more tids than I started! something is wrong with VTs.
}
}
Код: Выделить всё
package vttests;
import static vttests.K32.p;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;
public class TestCustomBlockageWithVTs {
static final int PORT = 42000;
static void testVirtualThreads(String title, boolean useVT, Supplier taskFactory) throws InterruptedException, ExecutionException {
int cores = Runtime.getRuntime().availableProcessors();
int N = 3 * cores;
p("\n\n=========== "+title+" "+(useVT?"using VT":"using platform")+" threads ===============\n\n");
p("Processors count = " + cores);
p("Threads to create = " + N);
System.out.println();
long t0;
long d;
ExecutorService es = useVT ? Executors.newVirtualThreadPerTaskExecutor() : Executors.newThreadPerTaskExecutor(Thread.ofPlatform().factory());
try (es) {
t0 = System.nanoTime();
List> futures = IntStream
.range(0, N)
.mapToObj(i -> taskFactory.get())
.map(es::submit)
.toList();
//--------
p("waiting for all futures...");
for(Future future: futures) {
future.get();
}
d = System.nanoTime()-t0;
p("waiting futures took "+1e-9*d+" sec");
//--------
p("shutting down es and await termination...");
t0 = System.nanoTime();
es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
d = System.nanoTime()-t0;
p("shut down took "+1e-9*d+" sec");
//--------
p("closing executor...");
t0 = System.nanoTime();
}
d = System.nanoTime() - t0;
p("closing executor took " + 1e-9 * d + " sec");
p("The end.");
}
public static void main(String[] args) throws Exception {
int[] ia = {0};
int delayms = 1000;
Function taskFactory;
taskFactory = rec -> new SyncWaitTask(ia[0]++, delayms, rec);
ia[0] = 0;
runTest("synchronized waits", true, taskFactory);
ia[0] = 0;
runTest("synchronized waits", false, taskFactory);
taskFactory = rec -> new RLWaitTask(ia[0]++, delayms, rec);
ia[0] = 0;
runTest("reentrant lock waits", true, taskFactory);
ia[0] = 0;
runTest("reentrant lock waits", false, taskFactory);
taskFactory = rec -> new SleepTask(ia[0]++, delayms, rec);
ia[0] = 0;
runTest("sleep", true, taskFactory);
ia[0] = 0;
runTest("sleep", false, taskFactory);
ExecutorService srv = startServer();
taskFactory = rec -> new NetTask(ia[0]++, delayms, rec);
ia[0] = 0;
runTest("sleep", true, taskFactory);
ia[0] = 0;
runTest("sleep", false, taskFactory);
srv.shutdownNow();
}
static void runTest(String title, boolean useVT, Function taskFactory) throws InterruptedException, ExecutionException {
Map tid2id = new ConcurrentHashMap();
Map id2tid = new ConcurrentHashMap();
BiConsumer recorder2 = (tid,taskId) -> {
tid2id.computeIfAbsent(tid, _tid -> new TreeSet()).add(taskId);
id2tid.computeIfAbsent(taskId, _id -> new TreeSet()).add(tid);
};
testVirtualThreads(title, useVT, () -> taskFactory.apply(recorder2));
System.out.println("----- thread-taskid mappings ("+tid2id.size()+") ----");
tid2id.forEach( (tid,ids) -> System.out.println("Thread "+tid+" was shared among tasks ids "+ids));
System.out.println("----- task-thread mappings ("+id2tid.size()+") ----");
id2tid.forEach( (id,tids) -> System.out.println("Task "+id+" was carried with threads ids "+tids));
}
static abstract class AbstractBlockingTask implements Runnable {
int taskId;
int delayms;
String indent;
BiConsumer recorder;
AbstractBlockingTask(int taskId, int delayms, BiConsumer recorder2) {
this.taskId = taskId;
this.delayms = delayms;
this.recorder = recorder2;
indent = "\t".repeat(taskId) +"#";
}
@Override
public void run() {
p(indent + "task " + taskId + " started.");
work();
p(indent + "task " + taskId + " ended.");
}
void work() {
for(int i=1; i {
try (ServerSocket ss = new ServerSocket(PORT)) {
while(!Thread.interrupted()) {
Socket s = ss.accept();
es.submit(() -> serviceOne(s));
}
} catch (IOException e) {
e.printStackTrace();
}
});
return es;
}
static void serviceOne(Socket s) {
try {
try(s) {
InputStream is = s.getInputStream();
DataInputStream dis = new DataInputStream(is);
int delayms = dis.readInt();
OutputStream os = s.getOutputStream();
Thread.sleep(delayms);
os.write(42);
os.flush();
os.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
static void connectAndRead(int port, int delayms) {
try (Socket s = new Socket("localhost", port)) {
OutputStream os = s.getOutputStream();
DataOutputStream dos = new DataOutputStream(os);
dos.writeInt(delayms);
dos.flush();
InputStream is = s.getInputStream();
is.read();//should be delayed
} catch (IOException e) {
e.printStackTrace();
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/792 ... llel-as-sh
Мобильная версия