Go并发编程--分布式并发原语:etcd中的队列、栅栏和STM

发布于 2020-11-27 · 本文总共 7065 字 · 阅读大约需要 21 分钟

队列

分布式队列

可以在一个节点将元素放入队列,在另外一个节点把它取出

创建方法:NewQueue

func NewQueue(client *v3.Client, keyPrefix string) *Queue

两个方法: 出队 入队

// 入队
func (q *Queue) Enqueue(val string) error
//出队
func (q *Queue) Dequeue() (string, error)

分布式队列实现:

package main


import (
    "bufio"
    "flag"
    "fmt"
    "log"
    "os"
    "strings"


    "github.com/coreos/etcd/clientv3"
    recipe "github.com/coreos/etcd/contrib/recipes"
)


var (
    addr      = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
    queueName = flag.String("name", "my-test-queue", "queue name")
)


func main() {
    flag.Parse()


    // 解析etcd地址
    endpoints := strings.Split(*addr, ",")


    // 创建etcd的client
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()


    // 创建/获取队列
    q := recipe.NewQueue(cli, *queueName)


    // 从命令行读取命令
    consolescanner := bufio.NewScanner(os.Stdin)
    for consolescanner.Scan() {
        action := consolescanner.Text()
        items := strings.Split(action, " ")
        switch items[0] {
        case "push": // 加入队列
            if len(items) != 2 {
                fmt.Println("must set value to push")
                continue
            }
            q.Enqueue(items[1]) // 入队
        case "pop": // 从队列弹出
            v, err := q.Dequeue() // 出队
            if err != nil {
                log.Fatal(err)
            }
            fmt.Println(v) // 输出出队的元素
        case "quit", "exit": //退出
            return
        default:
            fmt.Println("unknown action")
        }
    }
}

优先级队列

优先级队列(PriorityQueue):用法和队列类似,也提供了出队和入队的操作, 不过在入队的时候,除了需要把一个值加入到队列,还需要提供uint16类型的一个整数,作为此值的优先级,优先级高的元素会优先出队;

优先级队列测试:

package main

import (
    "bufio"
    "flag"
    "fmt"
    "log"
    "os"
    "strconv"
    "strings"

    "github.com/coreos/etcd/clientv3"
    recipe "github.com/coreos/etcd/contrib/recipes"
)


var (
    addr      = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
    queueName = flag.String("name", "my-test-queue", "queue name")
)


func main() {
    flag.Parse()


    // 解析etcd地址
    endpoints := strings.Split(*addr, ",")


    // 创建etcd的client
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()


    // 创建/获取队列
    q := recipe.NewPriorityQueue(cli, *queueName)


    // 从命令行读取命令
    consolescanner := bufio.NewScanner(os.Stdin)
    for consolescanner.Scan() {
        action := consolescanner.Text()
        items := strings.Split(action, " ")
        switch items[0] {
        case "push": // 加入队列
            if len(items) != 3 {
                fmt.Println("must set value and priority to push")
                continue
            }
            pr, err := strconv.Atoi(items[2]) // 读取优先级
            if err != nil {
                fmt.Println("must set uint16 as priority")
                continue
            }
            q.Enqueue(items[1], uint16(pr)) // 入队
        case "pop": // 从队列弹出
            v, err := q.Dequeue() // 出队
            if err != nil {
                log.Fatal(err)
            }
            fmt.Println(v) // 输出出队的元素
        case "quit", "exit": //退出
            return
        default:
            fmt.Println("unknown action")
        }
    }
}

栅栏

在分布式环境中,会遇到这样的场景:一组节点协同工作,共同等待一个信号,在信号未出现前,这些节点会被阻塞住, 而一旦信号出现,这些阻塞的节点就会同时开始继续执行下一步的任务;

Barrier:分布式栅栏

如果持有Barrier的节点释放了它,所有等待这个Barrier的节点就不会被阻塞,而是会继续执行;

创建方法

func NewBarrier(client *v3.Client, key string) *Barrier

三个方法


func (b *Barrier) Hold() error
func (b *Barrier) Release() error
func (b *Barrier) Wait() error

Hold:创建一个Barrier; 如果Barrier已经创建好了,有节点调用Wait方法,就会被阻塞;

Release:释放Barrier; 打开栅栏,如果使用了该方法,所有被阻塞的节点都会被放行,继续执行;

Wait:阻塞当前的调用者,直到这个Barrier被release; 如果这个栅栏不存在,调用者不会被阻塞,而是会继续执行;

测试代码:

package main


import (
    "bufio"
    "flag"
    "fmt"
    "log"
    "os"
    "strings"


    "github.com/coreos/etcd/clientv3"
    recipe "github.com/coreos/etcd/contrib/recipes"
)


var (
    addr        = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
    barrierName = flag.String("name", "my-test-queue", "barrier name")
)


func main() {
    flag.Parse()


    // 解析etcd地址
    endpoints := strings.Split(*addr, ",")


    // 创建etcd的client
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()


    // 创建/获取栅栏
    b := recipe.NewBarrier(cli, *barrierName)


    // 从命令行读取命令
    consolescanner := bufio.NewScanner(os.Stdin)
    for consolescanner.Scan() {
        action := consolescanner.Text()
        items := strings.Split(action, " ")
        switch items[0] {
        case "hold": // 持有这个barrier
            b.Hold()
            fmt.Println("hold")
        case "release": // 释放这个barrier
            b.Release()
            fmt.Println("released")
        case "wait": // 等待barrier被释放
            b.Wait()
            fmt.Println("after wait")
        case "quit", "exit": //退出
            return
        default:
            fmt.Println("unknown action")
        }
    }
}

DoubleBarrier:计数栅栏

在初始化计数型栅栏的时候,必须提供参与节点的数量,当这些数量的节点都Enter或者Leave的时候,这个栅栏就会放开;

func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier

两个方法

Enter:当调用者调用Enter时,会被阻塞,直到一共有count个节点调用了 Enter,这count个被阻塞的节点才能继续执行;

可以利用它编排一组节点,让这些节点在同一个时刻开始执行任务

Leave:让一组节点在同一个时刻完成任务

如果想让一组节点在同一个时刻完成任务,就可以调用Leave方法;

func (b *DoubleBarrier) Enter() error
func (b *DoubleBarrier) Leave() error

测试代码:

package main


import (
    "bufio"
    "flag"
    "fmt"
    "log"
    "os"
    "strings"


    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    recipe "github.com/coreos/etcd/contrib/recipes"
)


var (
    addr        = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
    barrierName = flag.String("name", "my-test-doublebarrier", "barrier name")
    count       = flag.Int("c", 2, "")
)


func main() {
    flag.Parse()


    // 解析etcd地址
    endpoints := strings.Split(*addr, ",")


    // 创建etcd的client
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()
    // 创建session
    s1, err := concurrency.NewSession(cli)
    if err != nil {
        log.Fatal(err)
    }
    defer s1.Close()


    // 创建/获取栅栏
    b := recipe.NewDoubleBarrier(s1, *barrierName, *count)


    // 从命令行读取命令
    consolescanner := bufio.NewScanner(os.Stdin)
    for consolescanner.Scan() {
        action := consolescanner.Text()
        items := strings.Split(action, " ")
        switch items[0] {
        case "enter": // 持有这个barrier
            b.Enter()
            fmt.Println("enter")
        case "leave": // 释放这个barrier
            b.Leave()
            fmt.Println("leave")
        case "quit", "exit": //退出
            return
        default:
            fmt.Println("unknown action")
        }
    }
}

分布式栅栏和计数型栅栏控制的是不同节点、不同进程的执行; 当需要协调一组分布式节点在某个时间点同时运行的时候,可以使用etcd提供的这些并发原语;

STM

STM(Software Transactional Memory,软件事务内存)

etcd提供了在一个事务中对多个key的更新功能,这一组key的操作要么全部成功,要么全部失败;

etcd的事务实现方式是基于CAS方式实现,融合了Get、Put和Delete操作;

Txn().If(cond1, cond2, ...).Then(op1, op2, ...,).Else(op1, op2, )

作用

简化多个key的操作,并且提供事务功能

方法

type STM interface {
  Get(key ...string) string
  Put(key, val string, opts ...v3.OpOption)
  Rev(key string) int64
  Del(key string)
}

Get

Put

Receive

Delete

示例代码:

package main


import (
    "context"
    "flag"
    "fmt"
    "log"
    "math/rand"
    "strings"
    "sync"


    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
)


var (
    addr = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
)


func main() {
    flag.Parse()


    // 解析etcd地址
    endpoints := strings.Split(*addr, ",")


    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()


    // 设置5个账户,每个账号都有100元,总共500元
    totalAccounts := 5
    for i := 0; i < totalAccounts; i++ {
        k := fmt.Sprintf("accts/%d", i)
        if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
            log.Fatal(err)
        }
    }


    // STM的应用函数,主要的事务逻辑
    exchange := func(stm concurrency.STM) error {
        // 随机得到两个转账账号
        from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
        if from == to {
            // 自己不和自己转账
            return nil
        }
        // 读取账号的值
        fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
        fromV, toV := stm.Get(fromK), stm.Get(toK)
        fromInt, toInt := 0, 0
        fmt.Sscanf(fromV, "%d", &fromInt)
        fmt.Sscanf(toV, "%d", &toInt)


        // 把源账号一半的钱转账给目标账号
        xfer := fromInt / 2
        fromInt, toInt = fromInt-xfer, toInt+xfer


        // 把转账后的值写回
        stm.Put(fromK, fmt.Sprintf("%d", fromInt))
        stm.Put(toK, fmt.Sprintf("%d", toInt))
        return nil
    }


    // 启动10个goroutine进行转账操作
    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
                    log.Fatal(serr)
                }
            }
        }()
    }
    wg.Wait()


    // 检查账号最后的数目
    sum := 0
    accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix()) // 得到所有账号
    if err != nil {
        log.Fatal(err)
    }
    for _, kv := range accts.Kvs { // 遍历账号的值
        v := 0
        fmt.Sscanf(string(kv.Value), "%d", &v)
        sum += v
        log.Printf("account %s: %d", kv.Key, v)
    }


    log.Println("account sum is", sum) // 总数
}

总结

除了etcd,Zookeeper也提供了类似的并发原语,不过只提供了Java的库,没有提供合适的Go库; Consul并没有开发这些并发原语的计划;

也可以使用Redis实现分布式锁,或者基于MySQL实现;

refs

https://time.geekbang.org/column/article/312590




本博客所有文章采用的授权方式为 自由转载-非商用-非衍生-保持署名 ,转载请务必注明出处,谢谢。
声明:
本博客欢迎转发,但请保留原作者信息!
博客地址:邱文奇(qiuwenqi)的博客;
内容系本人学习、研究和总结,如有雷同,实属荣幸!
阅读次数:

文章评论

comments powered by Disqus


章节列表