Ниже мы делимся нашим пониманием Selector и тем, как он работает. Мы поняли, что, хотя некоторые аспекты Selector являются потокобезопасными, реальное действие, когда поток получает готовые события и обрабатывает их, сегодня может быть только в потоке. вступают в действие в любой момент времени, что не позволяет нам обеспечить параллелизм для получения и обработки этих событий готовности одновременно в нескольких потоках.
Пожалуйста, помогите понять, как добиться вышеописанного.
Селектор можно использовать для мониторинга выбираемых каналов Java NIO, когда канал[ы] готов к событию[ям], которые могут быть Чтение, Запись, Принятие , Connect (в соответствии с зарегистрированным интересом), Selector уведомит нас.
Мы регистрируем наш Socket в Selector и указываем операции, которые нас интересуют (Чтение/Запись/Подключение/Принятие) в чтобы получить событие Ready с помощью API-интерфейса Register.
Вызов Register возвращает нам ключ выбора, который представляет связь между сокетом и селектором.
'Ключ выбора ' содержит информацию о селекторе, канале, интересующем наборе (операциях, в которых заинтересован канал) и готовом наборе (наборе операций, которые готовы и могут быть выполнены без блокировки, подмножестве интересующего набора).
"Селектор" поддерживает три набора ключей.
Отмененный набор ключей:
a. Набор ключей выбора, регистрация которых в селекторе отменена.
b. Отмененный набор ключей не является потокобезопасным.
Зарегистрированный набор ключей:
a. Набор ключей выбора, зарегистрированных в селекторе.
b. Зарегистрированный набор ключей является потокобезопасным.
Выбранный набор ключей:
a. Набор ключей выбора, у которых есть хотя бы одна интересующая операция в состоянии готовности.
b. Выбранный набор ключей не является потокобезопасным.
Мы блокируем наш поток (NETWORK_IO_IN) на артефакте селектора Вызов select(), который разблокируется, когда хотя бы один из зарегистрированных каналов будет готов выполнить интересующую операцию.
Вызов select является потокобезопасным, он внутренне включает три шага:
Сначала он перебирает отмененный набор ключей и освобождает ресурсы относительно канала/сокета, регистрация которых была отменена из селектора.
На втором этапе он опрашивает ОС, все ли каналы готовы выполнить ввод-вывод, на основе их заинтересованного набора.
На третьем этапе он заполняет выбранный ключ set.
Примечание. Все эти шаги внутренне синхронизируются для селектора, затем для отмены и выбранного набора клавиш.
< ol start="8">
[*]Чтобы получить набор выбранных ключей, который содержит ключ выбора (содержащий информацию о канале), готовый к выполнению ввода-вывода, мы используем selectedKeys(), который возвращает нам ссылку из
выбранного набора ключей, который является закрытым элементом селектора.
Мы выполняем итерацию по выбранному ключу установите, определите, какая интересующая операция готова (чтение/запись/подключение/принятие) и выполните этот ввод-вывод.
Добавление потока кода ниже, где у нас есть мы сами не выполняли никакой синхронизации.
Здесь 10 потоков заблокированы при вызове выбора артефакта селектора.
Мы постоянно отправить данные на порт IP:Port, на котором канал/сокет прослушивается и зарегистрирован в селекторе.
Мы получаем события Read всякий раз, когда канал/сокет готов читать данные из ОС буфер.
Наблюдение:
Все потоки блокируются вызов select и выходили один за другим.
Поток, который выходит из вызова select, скажем, T1 получает выбранный набор ключей и работает над ним.
Рассмотрение другого потока T2, который заблокирован при вызове выбора и находится на шаге, упомянутом в 7.c, также является работает с одним и тем же выбранным набором ключей.
Затем мы получаем исключение одновременного изменения, поскольку оба потока пытаются одновременно выполнить изменение выбранного набор ключей.
общественное развлечение AsynIOArtefactLoop () {
while (true) {
println ("Thread " + Thread.currentThread().id + "blocked on select")
// Threads will get blocked on the select calls until there is at least one channel /socket which is ready.
selector.select ()
println ("Thread " + Thread.currentThread().id + "comes out of select")
// Returns the reference of the selected key set.
val selected_key_set = selector.selectedKeys()
val iterator = selected_key_set.iterator()
println ("Thread " + Thread.currentThread().id + “operating on the selected key set”)
while (iterator.hasNext()) {
// Extract the key (selection key object ) and increment the iterator.
val key = iterator.next()
// Remove the Seleciton Key object from the selected key set.
iterator.remove()
println (" Event receive corrsponding to descriptor id " + key.attachment() as Int)
when {
// Channel/Socket is ready for Receive.
key.isReadable -> {
…
// Buffer in which to copy the receive data.
var buffer = ByteBuffer.allocate(1024)
// Get the channel from Selection key object and call the receive.
(key.channel() as DatagramChannel).receive(buffer)
…
// Print the Receive data .
println ("Message Received: ${String(receivedData)}")
}
key.isWritable -> println("Write Ready")
}
}
}
Теперь добавляем поток кода, где мы выполнили синхронизацию сами.
Здесь также блокируются 10 потоков при вызове select артефакта селектора.
Мы постоянно отправляем данные к IP:порту, на котором канал/сокет прослушивается и зарегистрирован в селекторе.
Мы получаем события чтения всякий раз, когда канал/сокет готов читать данные из буфера ОС.< /li>
Наблюдение:
Все потоки блокируются при вызове select и выходят наружу один за другим.
Поток, который выходит из вызова выбора, скажем, T1, получает блокировку selected_key_set, ссылка на который возвращается вызовом selectedKeys().
Учитывая другой поток T2, который заблокирован при вызове выбора и находится на шаге, упомянутом в 7.c, если один из потоков уже заблокировал выбранный_key_set, он не сможет работать над ним, что синхронизируется. весь процесс итерации с вызовом select ().
while (true) {
println ("Thread " + Thread.currentThread().id + "blocked on select")
selector.select ()
println ("Thread " + Thread.currentThread().id + "comes out of select")
val selected_key_set = selector.selectedKeys()
// Here we have synchronize on the selected_key_set which ensures that thread which are blocked on the select call will not be able to use it if another thread is already operating over it.
synchronized (selected_key_set) {
println ("Thread " + Thread.currentThread().id + "blocked on selected key set")
val iterator = selected_key_set.iterator()
while (iterator.hasNext()) {
val key = iterator.next()
println (" Event receive corrsponding to descriptor id " + key.attachment() as Int)
iterator.remove()
when {
key.isReadable -> {
counter++;
var buffer = ByteBuffer.allocate(1024)
(key.channel() as DatagramChannel).receive(buffer)
buffer.flip()
val receivedData = ByteArray(buffer.remaining())
buffer.get(receivedData)
println ("Recv Event Number: "+ counter)
println ("Message Received: ${String(receivedData)}")
}
key.isWritable -> println("Write Ready")
}
}
}
println ("Thread " + Thread.currentThread().id + "comes out of selected key set")
}
}
Здесь мы выполняем синхронизацию выбранного набора ключей, когда мы выполняем обработку событий готовности (чтение/запись/принятие/подключение). ), что позволяет одновременно работать только одному потоку и блокирует другие потоки.
Если у меня многоядерный процессор, я не смогу воспользоваться преимуществом производительности, потому что только один моего потока будет выполнять обработку одновременно, а другие будут ждать этого.
Ниже мы делимся нашим пониманием Selector и тем, как он работает. Мы поняли, что, хотя некоторые аспекты Selector являются потокобезопасными, реальное действие, когда поток получает готовые события и обрабатывает их, сегодня может быть только в потоке. вступают в действие в любой момент времени, что не позволяет нам обеспечить параллелизм для получения и обработки этих событий готовности одновременно в нескольких потоках. Пожалуйста, помогите понять, как добиться вышеописанного. [list] [*]Селектор можно использовать для мониторинга выбираемых каналов Java NIO, когда канал[ы] готов к событию[ям], которые могут быть Чтение, Запись, Принятие , Connect (в соответствии с зарегистрированным интересом), Selector уведомит нас. [*]Мы регистрируем наш Socket в Selector и указываем операции, которые нас интересуют (Чтение/Запись/Подключение/Принятие) в чтобы получить событие Ready с помощью API-интерфейса Register. [*]Вызов Register возвращает нам ключ выбора, который представляет связь между сокетом и селектором. [*]'Ключ выбора ' содержит информацию о селекторе, канале, интересующем наборе (операциях, в которых заинтересован канал) и готовом наборе (наборе операций, которые готовы и могут быть выполнены без блокировки, подмножестве интересующего набора). [*]"Селектор" поддерживает три набора ключей. [/list] [list] [*]Отмененный набор ключей: a. Набор ключей выбора, регистрация которых в селекторе отменена. b. Отмененный набор ключей не является потокобезопасным. [*]Зарегистрированный набор ключей:
a. Набор ключей выбора, зарегистрированных в селекторе. b. Зарегистрированный набор ключей является потокобезопасным. [*]Выбранный набор ключей: a. Набор ключей выбора, у которых есть хотя бы одна интересующая операция в состоянии готовности. b. Выбранный набор ключей не является потокобезопасным. [/list] [list] [*]Мы блокируем наш поток (NETWORK_IO_IN) на артефакте селектора Вызов select(), который разблокируется, когда хотя бы один из зарегистрированных каналов будет готов выполнить интересующую операцию. [*]Вызов select является потокобезопасным, он внутренне включает три шага: [/list] [list] Сначала он перебирает отмененный набор ключей и освобождает ресурсы относительно канала/сокета, регистрация которых была отменена из селектора.[*]На втором этапе он опрашивает ОС, все ли каналы готовы выполнить ввод-вывод, на основе их заинтересованного набора. [*]На третьем этапе он заполняет выбранный ключ set. [*]Примечание. Все эти шаги внутренне синхронизируются для селектора, затем для отмены и выбранного набора клавиш. [/list] < ol start="8"> [*]Чтобы получить набор выбранных ключей, который содержит ключ выбора (содержащий информацию о канале), готовый к выполнению ввода-вывода, мы используем selectedKeys(), который возвращает нам ссылку из выбранного набора ключей, который является закрытым элементом селектора.
[list] [*]Мы выполняем итерацию по выбранному ключу установите, определите, какая интересующая операция готова (чтение/запись/подключение/принятие) и выполните этот ввод-вывод. [/list] Добавление потока кода ниже, где у нас есть мы сами не выполняли никакой синхронизации. [list] [*]Здесь 10 потоков заблокированы при вызове выбора артефакта селектора. [*]Мы постоянно отправить данные на порт IP:Port, на котором канал/сокет прослушивается и зарегистрирован в селекторе. [*]Мы получаем события Read всякий раз, когда канал/сокет готов читать данные из ОС буфер. [*]Наблюдение: [/list] [list] [*]Все потоки блокируются вызов select и выходили один за другим.
[*]Поток, который выходит из вызова select, скажем, T1 получает выбранный набор ключей и работает над ним.
[*]Рассмотрение другого потока T2, который заблокирован при вызове выбора и находится на шаге, упомянутом в 7.c, также является работает с одним и тем же выбранным набором ключей.
[*]Затем мы получаем исключение одновременного изменения, поскольку оба потока пытаются одновременно выполнить изменение выбранного набор ключей. общественное развлечение AsynIOArtefactLoop () { [code]while (true) {
println ("Thread " + Thread.currentThread().id + "blocked on select")
// Threads will get blocked on the select calls until there is at least one channel /socket which is ready.
selector.select ()
println ("Thread " + Thread.currentThread().id + "comes out of select") // Returns the reference of the selected key set.
val selected_key_set = selector.selectedKeys()
val iterator = selected_key_set.iterator()
println ("Thread " + Thread.currentThread().id + “operating on the selected key set”)
while (iterator.hasNext()) {
// Extract the key (selection key object ) and increment the iterator.
val key = iterator.next()
// Remove the Seleciton Key object from the selected key set.
iterator.remove()
println (" Event receive corrsponding to descriptor id " + key.attachment() as Int)
when {
// Channel/Socket is ready for Receive.
key.isReadable -> {
…
// Buffer in which to copy the receive data.
var buffer = ByteBuffer.allocate(1024)
// Get the channel from Selection key object and call the receive.
(key.channel() as DatagramChannel).receive(buffer)
[/list] Теперь добавляем поток кода, где мы выполнили синхронизацию сами. [list] [*]Здесь также блокируются 10 потоков при вызове select артефакта селектора. [*]Мы постоянно отправляем данные к IP:порту, на котором канал/сокет прослушивается и зарегистрирован в селекторе. [*]Мы получаем события чтения всякий раз, когда канал/сокет готов читать данные из буфера ОС.< /li> Наблюдение: [/list] [list] [*]Все потоки блокируются при вызове select и выходят наружу один за другим. [*]Поток, который выходит из вызова выбора, скажем, T1, получает блокировку selected_key_set, ссылка на который возвращается вызовом selectedKeys().[*]Учитывая другой поток T2, который заблокирован при вызове выбора и находится на шаге, упомянутом в 7.c, если один из потоков уже заблокировал выбранный_key_set, он не сможет работать над ним, что синхронизируется. весь процесс итерации с вызовом select (). [/list] public fun select () { [code] while (true) {
println ("Thread " + Thread.currentThread().id + "blocked on select")
selector.select ()
println ("Thread " + Thread.currentThread().id + "comes out of select")
val selected_key_set = selector.selectedKeys() // Here we have synchronize on the selected_key_set which ensures that thread which are blocked on the select call will not be able to use it if another thread is already operating over it.
println (" Event receive corrsponding to descriptor id " + key.attachment() as Int)
iterator.remove() when {
key.isReadable -> { counter++; var buffer = ByteBuffer.allocate(1024)
(key.channel() as DatagramChannel).receive(buffer) buffer.flip() val receivedData = ByteArray(buffer.remaining()) buffer.get(receivedData) println ("Recv Event Number: "+ counter) println ("Message Received: ${String(receivedData)}")
}
key.isWritable -> println("Write Ready") } } } println ("Thread " + Thread.currentThread().id + "comes out of selected key set")
} [/code] } Здесь мы выполняем синхронизацию выбранного набора ключей, когда мы выполняем обработку событий готовности (чтение/запись/принятие/подключение). ), что позволяет одновременно работать только одному потоку и блокирует другие потоки. Если у меня многоядерный процессор, я не смогу воспользоваться преимуществом производительности, потому что только один моего потока будет выполнять обработку одновременно, а другие будут ждать этого.
Мы делимся нашим пониманием селектора и о том, как он работает ниже, то, что мы поняли, что, хотя некоторые аспекты селектора безопасны потока, реальное действие, в котором поток получает готовые события и обрабатывает их, сегодня только на потоке...
В настоящее время я использую API асинхронного обратного вызова gRPC, в основном для потокового RPC. Как я могу установить количество потоков ввода-вывода? Кажется, я не вижу никаких связанных конфигураций.
Я работаю над новым проектом (игровой двигатель для самоуправления) и пытаюсь создать систему журнала. Я хочу, чтобы журнал помогал как можно больше отладки, поэтому я планирую использовать его для записи в файл журнала. Единственная проблема...
Мне нужно обработать один большой репозиторий ( репозиторий исходного кода ) и создать/обновить (или оставить нетронутыми) около 2 миллионов файлов в другом репозитории ( репозиторий метаданных ).
Хотя файлов около 2 млн, большинство из них...
У меня развернуто приложение Flink в AWS Managed Flink, но при попытке реализовать асинхронный ввод-вывод с помощью Redisson (клиент Redis) для подключения к AWS MemoryDB (кластер Redis) я получил ошибку (см. ниже). Мне неясно, как Redisson создает...