Как правильно объединить несколько потоков, чтобы устранение дребезга в одном не влияло на другой?Android

Форум для тех, кто программирует под Android
Ответить
Anonymous
 Как правильно объединить несколько потоков, чтобы устранение дребезга в одном не влияло на другой?

Сообщение Anonymous »

У меня есть два гипотетических потока данных intFlow и stringFlow, которые я хочу объединить и передать в пользовательский интерфейс. stringFlow работает несколько нестабильно, поэтому я использую debounce(2000), чтобы «замедлить его». Однако потоки данных в intFlow передаются неторопливо, поэтому этот вызов debouce не должен влиять на них.
Я пробовал такие функции, как join()
Я пробовал такие функции, как merge() code>, но это не сработало, потому что intFlow ожидает stringFlow, что мне не нужно.
Я также пробовал FlatMapMerge{}, но это тоже не сработало, и, к сожалению, я не понимаю почему, потому что я понимаю эту функцию так, что она асинхронна и передаваемые ей потоки не должны блокировать друг друга.
В итоге я использовал merge(), но поскольку типы данных разные и мне нужно было позже объединить оба данных, я применил хак:

Код: Выделить всё

val intFlow = MutableStateFlow(0)
val stringFlow = MutableStateFlow("")

fun transform(result: suspend (s: String, i: Int) -> Flow): Flow {
return merge(intFlow, stringFlow.debounce(2000))
.map {
if (it is Int) Pair(stringFlow.value, it) else Pair(it as String, intFlow.value)
}
.filter { (s, i) -> s.isNotBlank() && i > 0 }
.distinctUntilChanged()
.flatMapLatest { (s, i) -> result(s, i) }
//    return intFlow.flatMapMerge { i ->
//        stringFlow.debounce(2000).map { s -> Pair(s, i) }
//    }
//        .filter { (s, i) -> s.isNotBlank() && i > 0 }
//        .distinctUntilChanged()
//        .flatMapLatest { (s, i) -> result(s, i) }
}

val combinedFlow = transform { s, i ->
return@transform flow {
emit("$s::$i")
}
}

fun getCurrentTime(): String {
val currentDateTime = LocalDateTime.now()
val formatter = DateTimeFormatter.ofPattern("HH:mm:ss:SSS")
return currentDateTime.format(formatter) + " | ${
currentDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
}"
}

suspend fun main() {
val mainJob = CoroutineScope(Dispatchers.Default).launch {
println("${getCurrentTime()} - Updating String to 'a'")
stringFlow.update { "a" }

println("${getCurrentTime()} - Updating Int to 1")
intFlow.update { 1 }

println("${getCurrentTime()} - Updating String to 'a'")
stringFlow.update { "a" }

println("${getCurrentTime()} - Updating Int to 2")
intFlow.update { 2 }

println("${getCurrentTime()} - Delaying for 5 seconds")
delay(5000)

println("${getCurrentTime()} - Updating String with 'b'")
stringFlow.update { "b"  }

println("${getCurrentTime()} - Updating Int to 3")
intFlow.update { 3 }
}

val collectJob = CoroutineScope(Dispatchers.Default).launch {
combinedFlow.collect {
println("${getCurrentTime()} - Combined: $it-------- ")
}
}

mainJob.join()
collectJob.join()
}
Это работает, но меня беспокоит взлом:

Код: Выделить всё

.map {
if (it is Int) Pair(stringFlow.value, it) else Pair(it as String, intFlow.value)
}
И если бы я потенциально мог столкнуться с проблемами параллелизма, например, с использованием устаревших данных или чем-то еще.
Используя FlatMapMerge{}, я понимаю это вывод:

Код: Выделить всё

21:14:25:883 | 1713273265883 - Updating String to 'a'
21:14:25:891 | 1713273265891 - Updating Int to 1
21:14:25:892 | 1713273265892 - Updating String to 'a'
21:14:25:892 | 1713273265892 - Updating Int to 2
21:14:25:892 | 1713273265892 - Delaying for 5 seconds
21:14:27:929 | 1713273267929 - Combined: a::2--------
21:14:30:898 | 1713273270898 - Updating String with 'b'
21:14:30:899 | 1713273270899 - Updating Int to 3
21:14:32:910 | 1713273272910 - Combined: b::2--------
21:14:32:910 | 1713273272910 - Combined: b::3--------
И существует задержка около 2 секунд между публикацией int и его получением.
Сравнивая это с merge():

Код: Выделить всё

21:15:54:969 | 1713273354969 - Updating String to 'a'
21:15:54:974 | 1713273354974 - Updating Int to 1
21:15:54:975 | 1713273354975 - Updating String to 'a'
21:15:54:975 | 1713273354975 - Updating Int to 2
21:15:54:975 | 1713273354975 - Delaying for 5 seconds
21:15:55:009 | 1713273355009 - Combined: a::2--------
21:15:59:986 | 1713273359986 - Updating String with 'b'
21:15:59:987 | 1713273359987 - Updating Int to 3
21:15:59:991 | 1713273359991 - Combined: b::3--------
И между публикацией и получением почти нет задержки.

Подробнее здесь: https://stackoverflow.com/questions/783 ... n-one-affe
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Android»