Goroutine/Channel优雅的可视化,看看如何工作的

本文摘自https://divan.dev/posts/go_concurrency_visualize/,我耐心纠正翻译,致敬作者20分钟

你好,并发世界

最基本的代码 :单channel,单goroutine,一次写入,一次读取。

package main

func main() {
    // create new channel of type int
    ch := make(chan int)

    // start new anonymous goroutine
    go func() {
        // send 42 to channel
        ch <- 42
    }()
    // read from channel
    <-ch
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
result

转到交互式WebGL动画

蓝线代表了时间流逝的goroutines。连接maingo#19的细蓝线是开始和停止goroutine的标记,显示父子关系,最后,红色箭头向我们显示send recv动作。虽然它实际上是两个单独的动作,但我尝试动画为单个事件“从A发送到B”。goroutine名称中的#19是实际的goroutine内部ID,获取ID的方式如下:

func getGID() uint64 {
   b := make([]byte, 64)
   b = b[:runtime.Stack(b, false)]
   b = bytes.TrimPrefix(b, []byte("goroutine "))
   b = b[:bytes.IndexByte(b, ' ')]
   n, _ := strconv.ParseUint(string(b), 10, 64)
   return n
}
1
2
3
4
5
6
7
8

计时器

实际上,您可以使用这种方法构建一个简单的计时器:创建一个channel,启动goroutine,在给定的持续时间后写入此channel并将此channel返回给您的func的调用者。然后呼叫者在准确的时间内阻止从channel读取。我们运行这样的计时器24次并尝试将其可视化。

package main

import "time"

func timer(d time.Duration) <-chan int {
    c := make(chan int)
    go func() {
        time.Sleep(d)
        c <- 1
    }()
    return c
}

func main() {
    for i := 0; i < 24; i++ {
        c := timer(1 * time.Second)
        <-c
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
result

转到交互式WebGL动画

挺简洁吧?我们继续。

乒乓

这个很好的并发性例子是在谷歌Sameer Ajmani的“高级Go并发模式”的精彩演讲中找到的。当然,这种模式不是很先进,但对于那些只熟悉Go并发的人来说,它看起来可能非常新鲜有趣。

在这里,我们有一个channel作为乒乓球比赛的桌子。球是一个整数变量,两个``goroutines-players`“击中”球,增加其值(击中计数器)。

package main

import "time"

func main() {
    var Ball int
    table := make(chan int)
    go player(table)
    go player(table)

    table <- Ball
    time.Sleep(1 * time.Second)
    <-table
}

func player(table chan int) {
    for {
        ball := <-table
        ball++
        time.Sleep(100 * time.Millisecond)
        table <- ball
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
result

转到交互式WebGL动画

此时我建议您单击交互式WebGL动画,并以交互方式进行游戏。您可以减慢动画,加速并以不同角度查看它。

现在,我们运行三名player而不是两名player

    go player(table)
    go player(table)
    go player(table)
1
2
3
result

转到交互式WebGL动画

我们可以在这里看到每个player顺序轮流,你可能想知道为什么会如此。为什么我们在接受球的goroutines中看到这种严格的命令?

答案是因为Go运行时为接收器保持等待FIFO队列(准备好在特定channel接收goroutine),并且在我们的情况下,每个玩家在他将球传到桌子上之后就准备好了。我们用更复杂的例子来检查它并行运行100名乒乓球players

for i := 0; i < 100; i++ {
    go player(table)
}
1
2
3
result

转到交互式WebGL动画

FIFO命令现在很明显,不是吗?我们可以产生一百万个goroutines(它们很廉价),但是对于我们的目标而言是过度的。我们看看不同的东西。例如,常见的消息传递模式。

Fan-In

并发世界中流行的模式之一是所谓的Fan-in模式。这与Fan-out模式相反,我们将在稍后介绍。简而言之,Fan-in是从多个输入读取并将所有多路复用到单个channel的功能。

例如:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration) {
    var i int
    for {
        ch <- i
        i++
        time.Sleep(d)
    }
}

func reader(out chan int) {
    for x := range out {
        fmt.Println(x)
    }
}

func main() {
    ch := make(chan int)
    out := make(chan int)
    go producer(ch, 100*time.Millisecond)
    go producer(ch, 250*time.Millisecond)
    go reader(out)
    for i := range ch {
        out <- i
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
result

进入互动WebGL的动画

正如我们所看到的,第一个producer产生的值每100毫秒,第二个producer每250毫秒,但reader从立即producer接收值。实际上,多路复用发生在main的范围循环中。

Workers

Fan-In的相反模式是Fan-OutWorkers模式。多个goroutine可以从单个channel读取,在CPU内核之间分配大量worker。在Go中,这种模式很容易实现,只需启动一些带有channel作为参数的goroutine,然后只向该channel发送值,分配和多路复用将由Go运行时自动完成:)

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasksCh
        if !ok {
            return
        }
        d := time.Duration(task) * time.Millisecond
        time.Sleep(d)
        fmt.Println("processing task", task)
    }
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
    tasksCh := make(chan int)

    for i := 0; i < workers; i++ {
        go worker(tasksCh, wg)
    }

    for i := 0; i < tasks; i++ {
        tasksCh <- i
    }

    close(tasksCh)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(36)
    go pool(&wg, 36, 50)
    wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
result

转到交互式WebGL动画

有一点值得注意:并行性。正如你所看到的,所有goroutines并行运行,等待channel给他们worker。鉴于上面的动画,很容易发现goroutines几乎立即接受他们的工作。不幸的是,这个动画并没有显示goroutine真正起作用的颜色或只是等待输入,但这个精确的动画是用GOMAXPROCS = 4记录的,所以只有4个goroutine有效并行运行。我们很快就会谈到这个问题。

现在,我们做一些更复杂的事情,启动workers和它们的subworkers

package main

import (
    "fmt"
    "sync"
    "time"
)

const (
    WORKERS    = 5
    SUBWORKERS = 3
    TASKS      = 20
    SUBTASKS   = 10
)

func subworker(subtasks chan int) {
    for {
        task, ok := <-subtasks
        if !ok {
            return
        }
        time.Sleep(time.Duration(task) * time.Millisecond)
        fmt.Println(task)
    }
}

func worker(tasks <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasks
        if !ok {
            return
        }

        subtasks := make(chan int)
        for i := 0; i < SUBWORKERS; i++ {
            go subworker(subtasks)
        }
        for i := 0; i < SUBTASKS; i++ {
            task1 := task * i
            subtasks <- task1
        }
        close(subtasks)
    }
}

func main() {
    var wg sync.WaitGroup
    wg.Add(WORKERS)
    tasks := make(chan int)

    for i := 0; i < WORKERS; i++ {
        go worker(tasks, &wg)
    }

    for i := 0; i < TASKS; i++ {
        tasks <- i
    }

    close(tasks)
    wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
result

转到交互式WebGL动画

Nice。当然,我们可以将workerssubworkers的数量设置得更高,但我试图使动画清晰易懂。

确实存在更酷的Fan-Out模式,例如动态数量的workerssubworkers,通过channel发送channel,但现在应该把Fan-Out的思路清空

Servers

下一个常见的模式类似于Fan-Out,但是在很短的时间内产生了goroutines,只是为了完成一些任务。通常这样实现Servers : 创建一个监听器,在循环中运行accept()并为每个接受的连接启动goroutine。它非常具有表现力,并且允许尽可能简单地实现Servers处理程序。看看这个简单的例子:

package main

import "net"

func handler(c net.Conn) {
    c.Write([]byte("ok"))
    c.Close()
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
result

转到交互式WebGL动画

它并不是很有趣,似乎并发性没有任何变化。当然,后台很复杂,故意向我们隐藏。“简单即复杂”

但是我们回到并发并为我们的Servers添加一些交互。假设每个处理程序都想要异步写入logger。在我们的示例中,logger本身是一个独立的goroutine,可以完成这项工作。

package main

import (
    "fmt"
    "net"
    "time"
)

func handler(c net.Conn, ch chan string) {
    ch <- c.RemoteAddr().String()
    c.Write([]byte("ok"))
    c.Close()
}

func logger(ch chan string) {
    for {
        fmt.Println(<-ch)
    }
}

func server(l net.Listener, ch chan string) {
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c, ch)
    }
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    ch := make(chan string)
    go logger(ch)
    go server(l, ch)
    time.Sleep(10 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
result

转到交互式WebGL动画

很有说服力,不是吗?但是很容易看出,如果请求数量增加并且记录操作需要一些时间(例如准备和编码数据),我们的*logger* goroutine很快就会成为瓶颈。我们可以使用已知的Fan-Out模式。我们开始做吧。

Server+Worker

带有worker示例的Serverslogger的一个高级版本。它不仅可以完成一些工作,还可以使用results channel将其工作结果发送回池中。没什么大不了的,但它将我们的logger示例扩展更加实际。

我们看看代码和动画:

package main

import (
    "net"
    "time"
)

func handler(c net.Conn, ch chan string) {
    addr := c.RemoteAddr().String()
    ch <- addr
    time.Sleep(100 * time.Millisecond)
    c.Write([]byte("ok"))
    c.Close()
}

func logger(wch chan int, results chan int) {
    for {
        data := <-wch
        data++
        results <- data
    }
}

func parse(results chan int) {
    for {
        <-results
    }
}

func pool(ch chan string, n int) {
    wch := make(chan int)
    results := make(chan int)
    for i := 0; i < n; i++ {
        go logger(wch, results)
    }
    go parse(results)
    for {
        addr := <-ch
        l := len(addr)
        wch <- l
    }
}

func server(l net.Listener, ch chan string) {
    for {
        c, err := l.Accept()
        if err != nil {
            continue
        }
        go handler(c, ch)
    }
}

func main() {
    l, err := net.Listen("tcp", ":5000")
    if err != nil {
        panic(err)
    }
    ch := make(chan string)
    go pool(ch, 4)
    go server(l, ch)
    time.Sleep(10 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
result

转到交互式WebGL动画

我们在4个goroutine之间分配工作,有效地提高了logger的吞吐量,但是从这个动画中,我们可以看到logger仍然可能是问题的根源。在分发之前,成千上万的连接汇聚在一个channel中,这可能导致logger再次成为瓶颈。但是,当然,它会发生在更高的负载上。

并发素数筛选

足够的Fan-In/Fan-Out乐趣。我们看看更复杂的并发算法。我最喜欢的一个例子是Concurrent Prime Sieve,可以在“Go Concurrency Patterns”中找到。Prime Sieve或Eratosthenes的Sieve是一种古老的算法,用于查找达到给定限制的素数。它通过以顺序方式消除所有素数的倍数来工作。普通的算法效率不高,特别是在多核机器上。

该算法的并发变体使用goroutines来过滤数字 - 每发现一个主要数据一个goroutine,以及从生成器向过滤器发送数字的channel。当找到prime时,它将通过channel发送到*main*以进行输出。当然,这个算法也不是很有效,特别是如果你想找到大素数并寻找最低的Big O复杂度,但我发现它非常优雅。

// A concurrent prime sieve
package main

import "fmt"

// Send the sequence 2, 3, 4, ... to channel 'ch'.
func Generate(ch chan<- int) {
    for i := 2; ; i++ {
        ch <- i // Send 'i' to channel 'ch'.
    }
}

// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
func Filter(in <-chan int, out chan<- int, prime int) {
    for {
        i := <-in // Receive value from 'in'.
        if i%prime != 0 {
            out <- i // Send 'i' to 'out'.
        }
    }
}

// The prime sieve: Daisy-chain Filter processes.
func main() {
    ch := make(chan int) // Create a new channel.
    go Generate(ch)      // Launch Generate goroutine.
    for i := 0; i < 10; i++ {
        prime := <-ch
        fmt.Println(prime)
        ch1 := make(chan int)
        go Filter(ch, ch1, prime)
        ch = ch1
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
result

转到交互式WebGL动画

随意在交互模式下播放此动画。它真的可以帮助更好地理解这个算法。goroutine 发出所有自然数,从2开始,并且每个新的goroutine过滤特定质倍数 2,3,5,7 ...,把发现的第一个素数发给main函数。如果将其旋转以从顶部看,您将看到从goroutines发送到main的所有数字都是素数。漂亮的算法,尤其在3D下看。

上次更新: 2019-8-19 10:01:45