百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

使用 go 协程+Channel,让你的代码执行快到起飞

wptr33 2025-03-25 18:09 10 浏览

作者: horryhuang,腾讯 PCG 后台开发工程师

传统的串行代码执行,逻辑比较简单,当数据量比较大时,执行效率低下,既然我们使用 go,那就利用 go 相对与其他语言的优势,轻量化的协程以及 channel,接下来让我们使用 go 协程+chan,让我们的代码速度来个大的飞跃吧~

背景:最近做了一个需求,是产品小姐姐提的对于流失用户的召回,精简一下流程,首先从表 1 中取出符合多少天未登陆条件的用户 uid,然后利用这个用户 uid 在表 2 中进行比较(如果用户曾经被召回过,会在表 2 中留下一条记录,之后就不再召回),如果表 2 中有该用户的记录,就不做任何操作,如果没有,则触发用户召回的服务。当然实际业务比这个要复杂,但只从这个精简的业务中,也能找到很多优化我们代码的地方,从而提高效率。

第一次尝试 demo:

在接到这个需求的时候,心情非常开心,这不就是我 sql boy 发挥作用的时候了吗?于是,很快我就撸出了代码。大致的 demo 如下(实际业务中不要写魔法数字):

var uidTargetList []int
var uidList []int
var id int
for {
 // 每次从表1中取出100个用户,这里id用户遍历,每次取出数据后,返回最后一个用户记录对应的id,然后使用这个id作为读表的比较条件,防止取出重复用户
 if uidList, id, err := lastLoginTimeStore.GetUnloginUserByPage(id,
  100, startTime, endTime); err != nil {
  rlog.Error("get unlogin user by page err", rlog.Err(err))
 }
 if len(uidList) == 0 {
  break
 }

 for index := range uidList {
  var hasSent bool
  // 判断用户是否被召回过,如果没有,则加入uidTargetList,以便后续触发召回服务
  if hasSent, err := callbackStore.HasSent(uidList[index]); err != nil {
   rlog.Error("get user record error", rlog.Int("uid", uidList[index]), rlog.Err(err))
  }
  if !hasSent {
   uidTargetList = append(uidTargetList, uidList[index])
  }
 }
}

然后我就和产品说,我写好了,服务可以跑了,当天产品就要我先灰度发送,我就发了 16w 用户,正当我服务跑起来准备刷刷 km 时,我发现这个速度也太慢了,大概每分钟居然只能处理 600 个用户,照着这个速度,还不得发到明天,产品可能要把我打死,于是我马上终止了服务,马上进行优化。

第一次优化:

马上我就仔细分析这个服务的瓶颈在哪,这个服务中有 2 次与数据库的交互,这种操作一般就是效率低的缘由。这里的第一次 io 操作从表 1 中取出用户数据,每次取出 100 条记录,如果增加每次取出的数据,可能会带来超时的风险,同时这样的效率提升也比较小,没有量级的提升,很明显,这个 io 操作不是我优化的主要目标。于是我将目标放到了第二个 io 操作,每次只能比较一个用户,这样的效率比较低,所以,我应该优化这个地方,如果我能和第一次 io 操作一样,能够每次比较 100 个用户,这样的提升就是量级了,想到这里,我瞬间又重新寻回了新手程序员的蜜汁自信。

那怎样才能一下比较很多个用户数据,马上,我就想起了可以使用协程啊,有一个用户的数据,就 go 一个协程去比较,这样的效率不就得到了极大的提升,然后我有一次撸起了袖子,又开始干了。这次代码的 demo 感觉就比第一版高端了许多。主要是利用 uidChan 和 uidTargetChan 在多协程中传递数据,uidChan 传递从表 1 中查询出的数据,然后在表 2 中比较,如果符合条件,则将其存入 uidTargetChan,最后再利用 uidTargetList 这个切片,存放所有符合条件的用户 uid。

// uidHandler 创建一个结构体,包括一个等待队列,然后uidChan 用于在多个协程中传递用户uid
type uidHandler struct {
 wg sync.WaitGroup
 uidChan chan int
}
// uidTargetHandler 同样的这个结构体包括一个等待队列,然后uidTargetChan 用于在多个协程中传递符合条件的用户uid
type uidTargetHandler struct {
 wg sync.WaitGroup
 uidTargetChan chan int
}

func test1()  {
 uh := uidHandler{
  wg:      sync.WaitGroup{},
  uidChan: make(chan int, 100),
 }
 uth := uidTargetHandler{
  wg:            sync.WaitGroup{},
  uidTargetChan: make(chan int, 100),
 }
 // 利用协程启动获取targetUid的服务
 go func() {
  getTargetUid(uh, uth)
 }()

  // 记录下这些targetUid,uidTargetList就是最后保存所有符合条件的uid
 var uidTargetList []int
 go func() {
  RecordTargetUid(uth, &uidTargetList)
 }()

 var uidList []int
 var id int
 for {
  // 每次从表1中取出100个用户,这里id用户遍历,每次取出数据后,返回最后一个用户记录对应的id,然后使用这个id作为读表的比较条件,防止取出重复用户
  if uidList, id, err = lastLoginTimeStore.GetUnloginUserByPage(id,
   100, startTime, endTime); err != nil {
   rlog.Error("get unlogin user by page err", rlog.Err(err))
  }
  if len(uidList) == 0 {
   break
  }
    // 将取出的uid直接放入uh.uidChan
  for index := range uidList {
   uh.uidChan <- uidList[index]
   uh.wg.Add(1)
  }
 }
 uh.wg.Wait()
 uth.wg.Wait()
  // 当走到这一步时,所有的目标用户的uid全部保存在 uidTargetList 中了
}

然后我们来看看 getTargetUid 和 RecordTargetUid 的代码:

// getTargetUid 获取目标uid,即可以发送通知的用户
func getTargetUid(uh uidHandler, uth uidTargetHandler)  {
 for {
  uid := <- uh.uidChan
  uh.wg.Done()
    // 对于用户的uid,直接并发去比较,如果符合条件,就放入uth.uidTargetChan
  go func(userUid int) {
   var hasSent bool
   var err error
   if hasSent, err = callbackStore.HasSent(userUid); err != nil {
    rlog.Error("get user record error", rlog.Int("uid", userUid), rlog.Err(err))
   }
   if !hasSent {
    uth.uidTargetChan <- userUid
    uth.wg.Add(1)
   }
  }(uid)
 }
}

// RecordTargetUid 记录下可以发送用户的uid,实际业务中应该是直接利用这些uid去启动后续服务
func RecordTargetUid(uth uidTargetHandler, uidTargetList *[]int)  {
 for {
  uid := <- uth.uidTargetChan
  *uidTargetList = append(*uidTargetList, uid)
  uth.wg.Done()
 }
}

至此,我们就能将所有符合条件的用户 uid 放在 uidTargetList。然后我想着,这样性能就有了量的提升,产品小姐姐待会要夸我真快,真给力。然后我就重启了服务。但。。。,猝不及防的事情又发生了,报了这个“use of closed network connection”错误,经过分析,可能是我协程开了太多了,一下子并发了太多协程去和数据库交互,然后导致出错,进而连接被关闭,最终报了这个错。于是,想着能不能不要并发那么多协程,对同时跑的协程数量进行一个限制。所以又想到了线程池,可以仿造这个概念弄个协程池,但是谷歌了一下,线程池主要就是节省线程的创建和销毁的时间,但是对于协程而言,它的创建和销毁本来就消耗不大,go 的协程本来就是非常轻量的,go 开发中一般也不建议使用线程池。然后我又陷入了深思,代码好难,人生也好难。

第二次优化:

自己的脑瓜不够转了,只能去求助外援。然后我只能去请教了 dayo 大哥,然后 dayo 传授了我一个江湖典藏小诀窍,专治这个毛病。即利用 for 循环,只开启固定的协程去处理这些用户 uid,在服务器可以承载的范围,这样就不会有特别多的协程同时与数据库交互了。利用这个诀窍,我对 getTargetUid 函数进行了小小的修改,就解决了这个问题,getTargetUid 修改后的代码如下:

// getTargetUid 获取目标uid,即可以发送通知的用户
func getTargetUid(uh uidHandler, uth uidTargetHandler)  {
  // 只并发100个协程,然后这些协程循环去从chan中读取并进行相应的处理
 for i := 0; i < 100; i++ {
  go func() {
   for {
    uid := <- uh.uidChan
    uh.wg.Done()
    var hasSent bool
    var err error
    if hasSent, err = callbackStore.HasSent(uid); err != nil {
     rlog.Error("get user record error", rlog.Int("uid", uid), rlog.Err(err))
    }
    if !hasSent {
     uth.uidTargetChan <- uid
     uth.wg.Add(1)
    }
   }
  }()
 }
}

这次,服务又跑起来了,大概每分钟 8000 个用户,速度大大提升,产品小姐姐知道了我的壮举后,对我赞不绝口,菜鸡程序员的快乐又有了,这就是我利用 go 协程提升了服务的效率,总的来说,go 的 chan 非常好用,很方便在多协程间传递数据,chan+协程简直就是利器,还在用线程池的 java 同学听到了都羡慕哭了。

结语

当然这只是优化的一部分,比如你的表中用户记录一共有 2 亿条,这样依次遍历效率仍然太低了,可以将用户数据分段,比如每 100 万个数据分为一段,每一段 go 一个协程去处理,这样读取的效率也有了极大的提升,还可以增加多台服务器等等,这些都可以提升速度,但这些就不是本文的重点啦,大家可以自己试着用多协程+chan 去优化一下自己的代码,提升代码的运行速度吧~

相关推荐

每天一个编程技巧!掌握这7个神技,代码效率飙升200%

“同事6点下班,你却为改BUG加班到凌晨?不是你不努力,而是没掌握‘偷懒’的艺术!本文揭秘谷歌工程师私藏的7个编程神技,每天1分钟,让你的代码从‘能用’变‘逆天’。文末附《Python高效代码模板》,...

Git重置到某个历史节点(Sourcetree工具)

前言Sourcetree回滚提交和重置当前分支到此次提交的区别?回滚提交是指将改动的代码提交到本地仓库,但未推送到远端仓库的时候。...

git工作区、暂存区、本地仓库、远程仓库的区别和联系

很多程序员天天写代码,提交代码,拉取代码,对git操作非常熟练,但是对git的原理并不甚了解,借助豆包AI,写个文章总结一下。Git的四个核心区域(工作区、暂存区、本地仓库、远程仓库)是版本控制的核...

解锁人生新剧本的密钥:学会让往事退场

开篇:敦煌莫高窟的千年启示在莫高窟321窟的《降魔变》壁画前,讲解员指着斑驳色彩说:"画师刻意保留了历代修补痕迹,因为真正的传承不是定格,而是流动。"就像我们的人生剧本,精彩章节永远...

Reset local repository branch to be just like remote repository HEAD

技术背景在使用Git进行版本控制时,有时会遇到本地分支与远程分支不一致的情况。可能是因为误操作、多人协作时远程分支被更新等原因。这时就需要将本地分支重置为与远程分支的...

Git恢复至之前版本(git恢复到pull之前的版本)

让程序回到提交前的样子:两种解决方法:回退(reset)、反做(revert)方法一:gitreset...

如何将文件重置或回退到特定版本(怎么让文件回到初始状态)

技术背景在使用Git进行版本控制时,经常会遇到需要将文件回退到特定版本的情况。可能是因为当前版本出现了错误,或者想要恢复到之前某个稳定的版本。Git提供了多种方式来实现这一需求。...

git如何正确回滚代码(git命令回滚代码)

方法一,删除远程分支再提交①首先两步保证当前工作区是干净的,并且和远程分支代码一致$gitcocurrentBranch$gitpullorigincurrentBranch$gi...

[git]撤销的相关命令:reset、revert、checkout

基本概念如果不清晰上面的四个概念,请查看廖老师的git教程这里我多说几句:最开始我使用git的时候,我并不明白我为什么写完代码要用git的一些列指令把我的修改存起来。后来用多了,也就明白了为什么。gi...

利用shell脚本将Mysql错误日志保存到数据库中

说明:利用shell脚本将MYSQL的错误日志提取并保存到数据库中步骤:1)创建数据库,创建表CreatedatabaseMysqlCenter;UseMysqlCenter;CREATET...

MySQL 9.3 引入增强的JavaScript支持

MySQL,这一广泛采用的开源关系型数据库管理系统(RDBMS),发布了其9.x系列的第三个更新版本——9.3版,带来了多项新功能。...

python 连接 mysql 数据库(python连接MySQL数据库案例)

用PyMySQL包来连接Python和MySQL。在使用前需要先通过pip来安装PyMySQL包:在windows系统中打开cmd,输入pipinstallPyMySQL ...

mysql导入导出命令(mysql 导入命令)

mysql导入导出命令mysqldump命令的输入是在bin目录下.1.导出整个数据库  mysqldump-u用户名-p数据库名>导出的文件名  mysqldump-uw...

MySQL-SQL介绍(mysql sqlyog)

介绍结构化查询语言是高级的非过程化编程语言,允许用户在高层数据结构上工作。它不要求用户指定对数据的存放方法,也不需要用户了解具体的数据存放方式,所以具有完全不同底层结构的不同数据库系统,可以使用相同...

MySQL 误删除数据恢复全攻略:基于 Binlog 的实战指南

在MySQL的世界里,二进制日志(Binlog)就是我们的"时光机"。它默默记录着数据库的每一个重要变更,就像一位忠实的史官,为我们在数据灾难中提供最后的救命稻草。本文将带您深入掌握如...