百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

Kotlin 协程五

wptr33 2024-12-14 15:32 31 浏览

上一节主要介绍了协程的同步、异步、上下文、调度。通过前面的介绍,其实主要都是基于挂起函数,然后挂起函数是异步返回单个值,那如何返回多个异步计算的值呢?这就是本节主要要讲述的Kotlin Flows。

Flow之前

在介绍Flow之前,我们看一下还可以通过哪些方式来产生多个值?

集合(collections)

在Kotlin中可以使用集合来表示多个值。例如,我们可以有一个简单的函数,返回一个包含三个数字的列表,然后使用forEach将它们全部打印出来:

fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}

执行结果:

1
2
3

系列(Sequences)

使用阻塞代码来计算这些数字(每次计算需要100毫秒),可以使用一个序列来表示这些数字:

fun simple(): Sequence<Int> = sequence { 
    for (i in 1..3) {
        Thread.sleep(100)
        yield(i) 
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

执行结果:

1
2
3

挂起方法

通过上面这种计算会阻塞正在运行代码的主线程。当这些值由异步代码计算时,我们还可以用挂起修饰符标记简单函数,这样它就可以在不阻塞的情况下执行工作,并以列表的形式返回结果:

suspend fun simple(): List<Int> {
    delay(100)
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

执行结果:

1
2
3

Flows

使用List<Int>结果类型,意味着我们只能一次返回所有值。为了表示正在异步计算的值流,我们可以使用Flow<Int>类型,就像上面使用Sequence<Int>类型用于同步计算值一样:

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100) 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    simple().collect { value -> println(value) } 
}

执行结果

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

来看一下Flow代码有哪些不同之处:

  1. Flow类型的构建器函数称为flow
  2. flow{…}中的代码可以挂起;
  3. simple函数不再用suspend修饰符标记;
  4. 使用emit函数从流发射值;
  5. 使用collect函数从流中收集值。

冷流

Flow流是类似于序列的冷流——流构建器中的代码直到流被收集后才运行。我们可以通过下面的例子更清晰的看出来:

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

执行结果:

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

这就是simple函数(返回流)没有使用挂起修饰符标记的关键原因。simple()调用本身快速返回,不等待任何东西。每次收集时,流都会重新开始,这就是为什么每次调用collect时我们都会看到“流已启动”的原因。

构建Flow

通过flow{…} 来构建Flow是最基本的一种方式。接下来,我们看看还可以通过哪些方式可以构建Flow:

  1. flowOf构建器定义了一个流,它发出一组固定的值。
  2. 可以使用. asflow()扩展函数将各种集合和序列转换为流。
(1..3).asFlow().collect { value -> println(value) }

Flow操作

Flow流可以使用操作符进行转换,就像转换集合和序列一样。中间操作符应用于上游流并返回下游流。这些操作符也是冷操作,就像Flow一样。意味着对这样一个操作符的调用本身并不是一个挂起函数,它会快速执行并返回一个新的转换流。

基本操作符,如mapfilter。这些操作符与序列的一个重要区别是,这些操作符中的代码块可以调用挂起函数。

map

suspend fun performRequest(request: Int): String {
    delay(1000) 
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

执行结果:

response 1
response 2
response 3

transform

它可以用来模仿简单的转换,如map和filter,以及实现更复杂的转换。使用转换操作符,我们可以发射任意次数的任意值。

例如下面这段代码,使用transform我们可以在执行一个长时间运行的异步请求之前发出一个字符串,并在它后面跟着一个响应:

fun main() = runBlocking<Unit> {
  (1..3).asFlow()
      .transform { request ->
          emit("Making request $request") 
          emit(performRequest(request)) 
      }
      .collect { response -> println(response) }
}

执行结果

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

take

take操作符会在达到相应限制时取消流的执行。协程中的取消总是通过抛出异常来执行,这样所有的资源管理函数(比如try{…}finally{…} )在取消的情况下正常运行:

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}       

执行结果:

1
2
Finally in numbers

通过结果可以看出,只取出了前两个值,然后就取消了协程。

Flow是按顺序执行的

Flow流执行规则如下:

  1. 流的每个单独收集都是按顺序执行的,除非使用了操作多个流的特殊操作符。
  2. 集合直接在调用终止操作符的协程中工作。
  3. 默认情况下不会启动新的协程。
  4. 每个发出的值都由上游到下游的所有中间操作符处理,然后传递给终端操作符。

通过下面的例子,过滤偶数并将它们映射到字符串:

fun main() = runBlocking<Unit> {
	(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }    
}

执行结果:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

通过上面的结果,我们可以更直白得看出流的顺序执行规则。

Flow上下文

Flow流的collect操作总是发生在调用协程的上下文中。例如,存在一个simple流,并在指定的上下文中运行,而不管simple流的实现细节:

withContext(context) {
    simple().collect { value ->
        println(value) 
    }
}

默认情况下,flow{…}构建器运行在相应流的收集器提供的上下文中。例如:

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}      

执行结果:

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

simple().collect是从主线程调用,简单的流体也在主线程中调用。

注意:使用withContext的一个陷阱

我们开发过程,对于长时间运行且消耗cpu代码,可能需要在Dispatchers.Default上下文中执行,然后ui更新代码可能需要在Dispatchers.Main上下文中执行。通常Kotlin协程使用withContext更改代码中的上下文,但是flow{…}构建器必须遵守上下文一致性,不允许从不同的上下文emit。

fun simple(): Flow<Int> = flow {
    // 试图通过Dispatchers.Default改变上下文,以执行耗时操作
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) 
            emit(i)
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}            

执行结果:

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at ...

flowOn

上面的异常是指出必须使用flowOn函数来改变流发射上下文。下面的例子显示了更改流上下文的正确方法,它还打印了相应线程的名称,以显示它是如何工作的:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) 
        log("Emitting $i")
        emit(i) 
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}            

执行结果:

[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3

通过执行结果可以看出,flow{…}在后台线程中工作,而在主线程中进行收集。

这里要注意,flowOn操作符改变了流的默认顺序性质。现在收集发生在一个协程(“coroutine#1”)中,而发射发生在另一个协程(“coroutine#2”)中,该协程与收集的协程同时运行在另一个线程中。当上游流必须在其上下文中更改CoroutineDispatcher时,flowOn操作符为其创建一个新的协程。

总结

如果我们之前已经熟悉响应式流或响应式框架(如RxJava和Reactor项目)的人来说,Flow的设计可能看起来非常熟悉。事实上,它的设计灵感来自于Reactive Streams及其各种实现。但是Flow的主要目标是拥有尽可能简单的设计,对Kotlin和suspend友好,并使用结构化并发。

Flow是一个响应式流,可以将其转换为响应式,反之亦然。由kotlinx.coroutines提供转换器:

  • kotlinx-coroutines-reactive for Reactive Streams,
  • kotlinx-coroutines-reactor for Reactor
  • kotlinx-coroutines-rx2/kotlinx-coroutines-rx3 for RxJava2/RxJava3

相关推荐

redis的八种使用场景

前言:redis是我们工作开发中,经常要打交道的,下面对redis的使用场景做总结介绍也是对redis举报的功能做梳理。缓存Redis最常见的用途是作为缓存,用于加速应用程序的响应速度。...

基于Redis的3种分布式ID生成策略

在分布式系统设计中,全局唯一ID是一个基础而关键的组件。随着业务规模扩大和系统架构向微服务演进,传统的单机自增ID已无法满足需求。高并发、高可用的分布式ID生成方案成为构建可靠分布式系统的必要条件。R...

基于OpenWrt系统路由器的模式切换与网页设计

摘要:目前商用WiFi路由器已应用到多个领域,商家通过给用户提供一个稳定免费WiFi热点达到吸引客户、提升服务的目标。传统路由器自带的Luci界面提供了工厂模式的Web界面,用户可通过该界面配置路...

这篇文章教你看明白 nginx-ingress 控制器

主机nginx一般nginx做主机反向代理(网关)有以下配置...

如何用redis实现注册中心

一句话总结使用Redis实现注册中心:服务注册...

爱可可老师24小时热门分享(2020.5.10)

No1.看自己以前写的代码是种什么体验?No2.DooM-chip!国外网友SylvainLefebvre自制的无CPU、无操作码、无指令计数器...No3.我认为CS学位可以更好,如...

Apportable:拯救程序员,IOS一秒变安卓

摘要:还在为了跨平台使用cocos2d-x吗,拯救objc程序员的奇葩来了,ApportableSDK:FreeAndroidsupportforcocos2d-iPhone。App...

JAVA实现超买超卖方案汇总,那个最适合你,一篇文章彻底讲透

以下是几种Java实现超买超卖问题的核心解决方案及代码示例,针对高并发场景下的库存扣减问题:方案一:Redis原子操作+Lua脚本(推荐)//使用Redis+Lua保证原子性publicbo...

3月26日更新 快速施法自动施法可独立设置

2016年3月26日DOTA2有一个79.6MB的更新主要是针对自动施法和快速施法的调整本来内容不多不少朋友都有自动施法和快速施法的困扰英文更新日志一些视觉BUG修复就不翻译了主要翻译自动施...

Redis 是如何提供服务的

在刚刚接触Redis的时候,最想要知道的是一个’setnameJhon’命令到达Redis服务器的时候,它是如何返回’OK’的?里面命令处理的流程如何,具体细节怎么样?你一定有问过自己...

lua _G、_VERSION使用

到这里我们已经把lua基础库中的函数介绍完了,除了函数外基础库中还有两个常量,一个是_G,另一个是_VERSION。_G是基础库本身,指向自己,这个变量很有意思,可以无限引用自己,最后得到的还是自己,...

China&#39;s top diplomat to chair third China-Pacific Island countries foreign ministers&#39; meeting

BEIJING,May21(Xinhua)--ChineseForeignMinisterWangYi,alsoamemberofthePoliticalBureau...

移动工作交流工具Lua推出Insights数据分析产品

Lua是一个适用于各种职业人士的移动交流平台,它在今天推出了一项叫做Insights的全新功能。Insights是一个数据平台,客户可以在上面实时看到员工之间的交流情况,并分析这些情况对公司发展的影响...

Redis 7新武器:用Redis Stack实现向量搜索的极限压测

当传统关系型数据库还在为向量相似度搜索的性能挣扎时,Redis7的RedisStack...

Nginx/OpenResty详解,Nginx Lua编程,重定向与内部子请求

重定向与内部子请求Nginx的rewrite指令不仅可以在Nginx内部的server、location之间进行跳转,还可以进行外部链接的重定向。通过ngx_lua模块的Lua函数除了能实现Nginx...