Невозможно воспроизвести закрепление виртуального потока (jdk 21), но MySQL не так параллелен, как должен быть.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Невозможно воспроизвести закрепление виртуального потока (jdk 21), но MySQL не так параллелен, как должен быть.

Сообщение Anonymous »

Документ jdk 21 о виртуальных потоках (далее «VT»)
(https://docs.oracle.com/en/java/javase/ ... reads.html)
довольно ясно, что синхронизированный блок вызывает закрепление потока, или собственные методы, или...
Все началось с другого вопроса SO о MySQL и виртуальных потоках (Can Virtual Threads улучшать запрос к базе данных на Java?), и это показало, что VT не распараллеливает операторы mysql должным образом (в драйверах до 9.0.0, например 8.4). Хотя я наверняка смогу воспроизвести это в ката.... Давайте попробуем!
Я создал 12 задач по 1 потоку на задачу, каждая задача выполняет 3 блокирующая операция 1000 мс внутри синхронизированного блока. На каждую задачу должно уйти 3x1000 мс = 3 секунды. По крайней мере, в ветках платформы. И эти потоки платформы всегда делают это предсказуемо.
Я пишу 4 варианта операций блокировки, как Object.wait() внутри s, синхронизированный, как простой Thread.sleep(), как условие ReentrantLock. .await() и в качестве входного потока сокета read(). Мои деньги были в сети прочитаны, так как это сделал драйвер mysql. Кроме того, из исходного кода jdk мы видим, что object.wait() и thread.sleep() стали дружественными к VT. Я даже попробовал, но удалил настройку Pipe Input/Output Stream, потому что все они основаны на одной и той же базовой синхронизации.
В соответствии с документом, в VT я должен ожидать синхронизированные и собственные методы, но не повторные блокировки, чтобы занять больше времени, но это будет зависеть от того, сколько потоков несущей существует... Они не говорят.
Если я предполагаю, что существует столько же потоков несущей, сколько существует являются ядрами, то у меня есть 4 перевозчика. Если бы я действительно закрепил VT, я должен был бы ожидать, что мои задачи будут выполняться только по 4 одновременно, и все задачи завершатся к отметке в 9 секунд.
Я хотел это доказать. Поэтому я написал этот сложный тест ниже.
И мне не удалось закрепить ни одного несущего потока; 12 задач заканчиваются почти одновременно на отметке 3 секунды.
Что еще интересно, я запустил 12 потоков, но в какой-то момент было использовано 14 носителей! Я лол. Отличный вариант оптимизации, которая должна сократить число потоков!
Чтобы увидеть идентификатор переносимого потока, я использовал JNA для извлечения ядра 32 GetCurrentThreadId().
Зависимость Maven:

Код: Выделить всё

    
net.java.dev.jna
jna
5.15.0


net.java.dev.jna
jna-platform
5.15.0

Вот небольшой служебный класс K32 с мини-main() для проверки его работоспособности. Это уже интересно...:

Код: Выделить всё

package vttests;

import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadFactory;

import com.sun.jna.Library;
import com.sun.jna.Native;

public class K32 {
record TD(Integer pid, Integer tid) {}

public interface MyKernel32 extends Library  {
int GetCurrentProcessId();
int GetCurrentThreadId();
}

static MyKernel32 K32;
static {
try {
K32 = Native.load("kernel32", MyKernel32.class);
} catch (Throwable t) {
t.printStackTrace(System.out);
}
}

static Integer pid() {
return K32 != null ? K32.GetCurrentProcessId() : null;
}

static Integer tid() {
return K32 != null ? K32.GetCurrentThreadId() : null;
}

//  static ThreadLocal thrDetailsTL = ThreadLocal.withInitial(() -> new TD(pid(), tid()));

static void p(Object msg) {
//      TD td = thrDetailsTL.get(); //it is WRONG to cache this info, because the carrier thread (tid) can change!!!!!!!!

System.out.println("["+ (Thread.currentThread().isVirtual() ? "V" : "P")+ "/"
+ Thread.currentThread().getName()
+ "("+ pid()//+ td.pid
+ "."+ tid()//+ td.tid
+ ")]: "+ msg);
}

public static void main(String[] args) throws InterruptedException {
Set tids = Collections.synchronizedSet(new TreeSet());
Runnable r = () -> {
for(int i=0; i< 10; i++) {
int tid = tid();
tids.add(tid);
p("Hello "+i+" ("+tid+")");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
p("  "+i+": "+e);
}
}
};

ThreadFactory tf = Thread.ofVirtual().factory();
Thread t1 = tf.newThread(r);
Thread t2 = tf.newThread(r);
t1.start();
t2.start();
//t1.join();
//t2.join();
Thread.sleep(6000);

p("thread ids seen: "+tids); //I usually see more tids than I started! something is wrong with VTs.

}

}
А вот большой файл:

Код: Выделить всё

package vttests;

import static vttests.K32.p;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;

public class TestCustomBlockageWithVTs {
static final int PORT = 42000;

static void testVirtualThreads(String title, boolean useVT, Supplier taskFactory) throws InterruptedException, ExecutionException {
int cores = Runtime.getRuntime().availableProcessors();
int N = 3 * cores;

p("\n\n=========== "+title+" "+(useVT?"using VT":"using platform")+" threads ===============\n\n");
p("Processors count  = " + cores);
p("Threads to create = " + N);
System.out.println();

long t0;
long d;

ExecutorService es = useVT ? Executors.newVirtualThreadPerTaskExecutor() : Executors.newThreadPerTaskExecutor(Thread.ofPlatform().factory());
try (es) {
t0 = System.nanoTime();
List> futures = IntStream
.range(0, N)
.mapToObj(i ->  taskFactory.get())
.map(es::submit)
.toList();

//--------

p("waiting for all futures...");
for(Future future: futures) {
future.get();
}
d = System.nanoTime()-t0;
p("waiting futures took "+1e-9*d+" sec");

//--------

p("shutting down es and await termination...");
t0 = System.nanoTime();
es.shutdown();
es.awaitTermination(1, TimeUnit.SECONDS);
d = System.nanoTime()-t0;
p("shut down took "+1e-9*d+" sec");

//--------

p("closing executor...");
t0 = System.nanoTime();
}

d = System.nanoTime() - t0;
p("closing executor took " + 1e-9 * d + " sec");

p("The end.");
}

public static void main(String[] args) throws Exception {
int[] ia = {0};
int delayms = 1000;
Function taskFactory;

taskFactory = rec -> new SyncWaitTask(ia[0]++, delayms, rec);
ia[0] = 0;
runTest("synchronized waits", true, taskFactory);
ia[0] = 0;
runTest("synchronized waits", false, taskFactory);

taskFactory = rec -> new RLWaitTask(ia[0]++, delayms, rec);
ia[0] = 0;
runTest("reentrant lock waits", true, taskFactory);
ia[0] = 0;
runTest("reentrant lock waits", false, taskFactory);

taskFactory = rec -> new SleepTask(ia[0]++, delayms, rec);
ia[0] = 0;
runTest("sleep", true, taskFactory);
ia[0] = 0;
runTest("sleep", false, taskFactory);

try(Closeable startServer = startServer()) {
taskFactory = rec -> new NetTask(ia[0]++, delayms, rec);
ia[0] = 0;
runTest("net read", true, taskFactory);
ia[0] = 0;
runTest("net read", false, taskFactory);
}
}

static void runTest(String title, boolean useVT, Function taskFactory) throws InterruptedException, ExecutionException {
Map tid2id = new ConcurrentHashMap();
Map id2tid = new ConcurrentHashMap();
BiConsumer recorder2 = (tid,taskId) -> {
tid2id.computeIfAbsent(tid, _tid -> new TreeSet()).add(taskId);
id2tid.computeIfAbsent(taskId, _id -> new TreeSet()).add(tid);
};

testVirtualThreads(title, useVT, () -> taskFactory.apply(recorder2));

System.out.println("----- thread-taskid mappings ("+tid2id.size()+") ----");
tid2id.forEach( (tid,ids) -> System.out.println("Thread "+tid+" was shared among tasks ids "+ids));

System.out.println("----- task-thread mappings ("+id2tid.size()+") ----");
id2tid.forEach( (id,tids) -> System.out.println("Task "+id+" was carried with threads ids "+tids));
}

static abstract class AbstractBlockingTask implements Runnable {
int taskId;
int delayms;
String indent;
BiConsumer recorder;

AbstractBlockingTask(int taskId, int delayms, BiConsumer recorder2) {
this.taskId = taskId;
this.delayms = delayms;
this.recorder = recorder2;
indent = "\t".repeat(taskId) +"#";
}

@Override
public void run() {
p(indent + "task " + taskId + " started.");
work();
p(indent + "task " + taskId + " ended.");
}

void work() {
for(int i=1; i

Подробнее здесь: [url]https://stackoverflow.com/questions/79283583/cant-reproduce-virtual-thread-pinning-jdk-21-yet-mysql-not-as-parallel-as-sh[/url]
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»