晚上看了会 Medium 上的博客,发现了一篇关于使用 Go 来实现并发的获取数据。

我在作者原有的基础上改造了一下,限制了最大的数量,不知道有没有什么疏漏,请大家指教指教。

另外我也在思考关于 GC 的问题,如果代码中在某一次循环中走了<-ctx.Done()的分支,直接返回了结果,productsChan 会被怎么回收?我应该如何关闭相关的 channel ?还是让程序自己处理?

Medium 原文地址

我的改造:

package main

import "sync"
import "runtime"
import "fmt"

var LIST_PRODUCT_TYPE = [100000]string{"food", "electronics", "clothing","...more"} // ......非常多的数据需要查询

type GetListProductResponse struct {
    Data []ProductListResponse `json:"data"`
}

type ProductListResponse struct {
    Code         string `json:"code"`
    Name         string `json:"name"`
    Price        string `json:"price"`
    Status       bool   `json:"status"`
}

func getProducts(ctx context.Context, req *GetProductListRequest) (*GetListProductResponse, error) {
    // calling endpoint 3rd party
    // parse to response
    // and return the data
    return &productList, nil
}

func main() {
    ctx, cancelFunc := context.WithCancel(ctx)
	defer cancelFunc()
    wg := sync.WaitGroup{}
    doneChan := make(chan struct{}, 1)
    productsChan := make(chan *GetListProductResponse)
    errChan := make(chan error)
    // LIST_PRODUCT_TYPE 数量非常大,需要限制最大的并发数量
    maxConcurrency := 5
    semaphore := make(chan struct{}, maxConcurrency)
    wg.Add(len(LIST_PRODUCT_TYPE))
    for key := range LIST_PRODUCT_TYPE {
        req := &GetProductListRequest{
            ProductType: LIST_PRODUCT_TYPE[key],
        }
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case semaphore<-struct{}{}:
            go func() {
                defer wg.Done()
                defer func() { <-semaphore }()
                products, err := getProductList(ctx, req)
                if err != nil {
                    errChan <- err
                    return
                }
                productsChan <- products
            }()
        }
     }

     go func() {
         wg.Wait()
         doneChan <- struct{}{}
     }()

     var (
         catalogues GetListProductResponse
         data       []ProductListResponse
     )
 
     for {
         select {
         case <-ctx.Done():
             return nil, ctx.Err()
         case err := <-errChan:
             return nil, err
         case products := <-productsChan:
             data = append(data, products.Data...)
             catalogues.Data = data
         case <-doneChan:
         return &catalogues, nil
     }
   }
}
举报· 310 次点击
登录 注册 站外分享
2 条回复  
dcalsky 小成 4 天前
新手自己写容易出问题,推荐: https://github.com/sourcegraph/conc
supuwoerc 楼主 小成 4 天前
@dcalsky 感谢,我学习下这个库的实现
返回顶部