编程小白,在写一个多线程目录文件遍历的时候,出现了阻塞问题,求教各位大佬~

通过增大 var taskChan = make(chan string, 1000),chan 缓冲区为 100 万的时候程序不会阻塞

但是我通过打印日志发现 taskChan 占用很小,只有十几,而且存在通道写入失败的情况

taskChan 的缓冲区为 1000 时,阻塞的日志如下:

```shell
[DEBUG]:增加目录,增加 wg, [1802], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1801], taskChan = [5]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [17]
[DEBUG]:任务完成,减小 wg, [1816], taskChan = [4]
[DEBUG]:增加目录,增加 wg, [1803], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1843], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [21]
[DEBUG]:任务完成,减小 wg, [1841], taskChan = [24]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [6]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [13]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1840], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [2]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1845], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1846], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1847], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1848], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1849], taskChan = [0]
```

如果把 taskChan 的缓冲区为 100 万的时候,程序可以正常退出,日志如下:

```shell
[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [3], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [0], taskChan = [0]
[INFO]:目录扫描完毕
[DEBUG]:func GetAllFilePath end
[DEBUG]:func StartScan end
[DEBUG]:func btnStartScanOnclick end
```



代码如下:

```go
package core

import (
        "DopliGo/logs"
        "github.com/panjf2000/ants/v2"
        "os"
        "path/filepath"
        "sync"
        "sync/atomic"
)

func GetAllFilePath(rootPath string) {
        //logs.IsLogDebug = false
        logs.Debug("func GetAllFilePath start")
        // 创建任务通道和结果通道
        var taskChan = make(chan string, 1000000)
        var resultChan = make(chan string, 1000000)
        var wg sync.WaitGroup
        var counter int64 = 0

        // 创建生产者 goroutine 池
        producerPool, _ := ants.NewPoolWithFunc(16, func(i interface{}) {
                produceTasks(i.(string), taskChan, resultChan, &counter, &wg)
        })

        logs.Debug("cap:%d", producerPool.Cap())
        defer producerPool.Release()

        taskChan <- rootPath
        wg.Add(1) // 这里增加计数器
        atomic.AddInt64(&counter, 1)
        logs.Debug("任务开始,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(&counter), len(resultChan))

        // 启动生产者
        go func() {
                //defer logs.Debug("生产者退出")
                for task := range taskChan {
                        err := producerPool.Invoke(task)
                        if err != nil {
                                logs.Error("failed to producerPool Invoke, err: %s", err)
                                return
                        }
                }
        }()

        // 启动结果处理 goroutine
        go func() {
                //defer logs.Debug("消费者退出")
                for result := range resultChan {
                        _ = result
                }
        }()

        // 等待所有任务完成
        wg.Wait()
        close(resultChan)
        close(taskChan)
        logs.Info("目录扫描完毕")
        logs.Debug("func GetAllFilePath end")
}

func produceTasks(rootPath string, taskChan chan string, resultChan chan string, counter *int64, wg *sync.WaitGroup) {
        defer wg.Done() // 确保每次 produceTasks 完成时,调用 Done
        //        logs.Debug("func produceTasks start")

        entries, err := os.ReadDir(rootPath)
        if err != nil {
                logs.Error("failed to read dir: %s , err: %s", rootPath, err)
                return
        }

        for _, entry := range entries {
                path := filepath.Join(rootPath, entry.Name())
                if entry.IsDir() {
                        wg.Add(1)
                        atomic.AddInt64(counter, 1)
                        select {
                        case taskChan <- path:
                                // 发送成功
                        default:
                                // 发送失败,通道已满
                                logs.Error("写入通道失败...")
                        }
                        logs.Debug("增加目录,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
                } else {
                        resultChan <- path
                }
        }

        atomic.AddInt64(counter, -1)
        logs.Debug("任务完成,减小 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
        //logs.Debug("func produceTasks end")
}

```
举报· 117 次点击
登录 注册 站外分享
7 条回复  
zpfhbyx 小成 2024-8-22 19:19:40
select 的时候 写入到 chan 不阻塞  chan 满的时候会直接执行 default
josexy 小成 2024-8-22 19:30:35
```go
                if entry.IsDir() {
                        atomic.AddInt64(counter, 1)
                        select {
                        case taskChan <- path:
                                // 发送成功
                               wg.Add(1)
                        default:
                                // 发送失败,通道已满
                                logs.Error("写入通道失败...")
                        }

```
你把 wg.Add(1) 放到里面,然后 channel 容量设置大点就可以了,这样只有发送成功才处理
matytan 小成 2024-8-22 19:39:11
produceTasks 中 for 循环 wg.add(1)多次,但是只 done 了一次(函数结束)为什么?而且你这个 wg 用的好奇怪
DefoliationM 小成 2024-8-22 19:40:46
default 删了,如果你想控制退出,把 default 换成 context 。
yianing 小成 2024-8-22 22:46:24
@deavorwei #7 #7 taskChan = [%d]", len(resultChan) 打印错变量了
pxllong 小成 2024-8-23 11:39:03
用  runtime/pprof 。
yann123 小成 2024-8-23 14:15:07
default:
wg.Done()
logs.Error("Failed to write to channel...")


写入通过失败了你没有减少锁,所以一直卡住了。
返回顶部