Go并发编程--分布式并发原语:etcd中的队列、栅栏和STM
队列
分布式队列
可以在一个节点将元素放入队列,在另外一个节点把它取出
创建方法:NewQueue
func NewQueue(client *v3.Client, keyPrefix string) *Queue
两个方法: 出队 入队
// 入队
func (q *Queue) Enqueue(val string) error
//出队
func (q *Queue) Dequeue() (string, error)
分布式队列实现:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"strings"
"github.com/coreos/etcd/clientv3"
recipe "github.com/coreos/etcd/contrib/recipes"
)
var (
addr = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
queueName = flag.String("name", "my-test-queue", "queue name")
)
func main() {
flag.Parse()
// 解析etcd地址
endpoints := strings.Split(*addr, ",")
// 创建etcd的client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建/获取队列
q := recipe.NewQueue(cli, *queueName)
// 从命令行读取命令
consolescanner := bufio.NewScanner(os.Stdin)
for consolescanner.Scan() {
action := consolescanner.Text()
items := strings.Split(action, " ")
switch items[0] {
case "push": // 加入队列
if len(items) != 2 {
fmt.Println("must set value to push")
continue
}
q.Enqueue(items[1]) // 入队
case "pop": // 从队列弹出
v, err := q.Dequeue() // 出队
if err != nil {
log.Fatal(err)
}
fmt.Println(v) // 输出出队的元素
case "quit", "exit": //退出
return
default:
fmt.Println("unknown action")
}
}
}
优先级队列
优先级队列(PriorityQueue):用法和队列类似,也提供了出队和入队的操作, 不过在入队的时候,除了需要把一个值加入到队列,还需要提供uint16类型的一个整数,作为此值的优先级,优先级高的元素会优先出队;
优先级队列测试:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"github.com/coreos/etcd/clientv3"
recipe "github.com/coreos/etcd/contrib/recipes"
)
var (
addr = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
queueName = flag.String("name", "my-test-queue", "queue name")
)
func main() {
flag.Parse()
// 解析etcd地址
endpoints := strings.Split(*addr, ",")
// 创建etcd的client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建/获取队列
q := recipe.NewPriorityQueue(cli, *queueName)
// 从命令行读取命令
consolescanner := bufio.NewScanner(os.Stdin)
for consolescanner.Scan() {
action := consolescanner.Text()
items := strings.Split(action, " ")
switch items[0] {
case "push": // 加入队列
if len(items) != 3 {
fmt.Println("must set value and priority to push")
continue
}
pr, err := strconv.Atoi(items[2]) // 读取优先级
if err != nil {
fmt.Println("must set uint16 as priority")
continue
}
q.Enqueue(items[1], uint16(pr)) // 入队
case "pop": // 从队列弹出
v, err := q.Dequeue() // 出队
if err != nil {
log.Fatal(err)
}
fmt.Println(v) // 输出出队的元素
case "quit", "exit": //退出
return
default:
fmt.Println("unknown action")
}
}
}
栅栏
在分布式环境中,会遇到这样的场景:一组节点协同工作,共同等待一个信号,在信号未出现前,这些节点会被阻塞住, 而一旦信号出现,这些阻塞的节点就会同时开始继续执行下一步的任务;
Barrier:分布式栅栏
如果持有Barrier的节点释放了它,所有等待这个Barrier的节点就不会被阻塞,而是会继续执行;
创建方法
func NewBarrier(client *v3.Client, key string) *Barrier
三个方法
func (b *Barrier) Hold() error
func (b *Barrier) Release() error
func (b *Barrier) Wait() error
Hold:创建一个Barrier; 如果Barrier已经创建好了,有节点调用Wait方法,就会被阻塞;
Release:释放Barrier; 打开栅栏,如果使用了该方法,所有被阻塞的节点都会被放行,继续执行;
Wait:阻塞当前的调用者,直到这个Barrier被release; 如果这个栅栏不存在,调用者不会被阻塞,而是会继续执行;
测试代码:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"strings"
"github.com/coreos/etcd/clientv3"
recipe "github.com/coreos/etcd/contrib/recipes"
)
var (
addr = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
barrierName = flag.String("name", "my-test-queue", "barrier name")
)
func main() {
flag.Parse()
// 解析etcd地址
endpoints := strings.Split(*addr, ",")
// 创建etcd的client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建/获取栅栏
b := recipe.NewBarrier(cli, *barrierName)
// 从命令行读取命令
consolescanner := bufio.NewScanner(os.Stdin)
for consolescanner.Scan() {
action := consolescanner.Text()
items := strings.Split(action, " ")
switch items[0] {
case "hold": // 持有这个barrier
b.Hold()
fmt.Println("hold")
case "release": // 释放这个barrier
b.Release()
fmt.Println("released")
case "wait": // 等待barrier被释放
b.Wait()
fmt.Println("after wait")
case "quit", "exit": //退出
return
default:
fmt.Println("unknown action")
}
}
}
DoubleBarrier:计数栅栏
在初始化计数型栅栏的时候,必须提供参与节点的数量,当这些数量的节点都Enter或者Leave的时候,这个栅栏就会放开;
func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier
两个方法
Enter:当调用者调用Enter时,会被阻塞,直到一共有count个节点调用了 Enter,这count个被阻塞的节点才能继续执行;
可以利用它编排一组节点,让这些节点在同一个时刻开始执行任务
Leave:让一组节点在同一个时刻完成任务
如果想让一组节点在同一个时刻完成任务,就可以调用Leave方法;
func (b *DoubleBarrier) Enter() error
func (b *DoubleBarrier) Leave() error
测试代码:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"strings"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
recipe "github.com/coreos/etcd/contrib/recipes"
)
var (
addr = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
barrierName = flag.String("name", "my-test-doublebarrier", "barrier name")
count = flag.Int("c", 2, "")
)
func main() {
flag.Parse()
// 解析etcd地址
endpoints := strings.Split(*addr, ",")
// 创建etcd的client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建session
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
// 创建/获取栅栏
b := recipe.NewDoubleBarrier(s1, *barrierName, *count)
// 从命令行读取命令
consolescanner := bufio.NewScanner(os.Stdin)
for consolescanner.Scan() {
action := consolescanner.Text()
items := strings.Split(action, " ")
switch items[0] {
case "enter": // 持有这个barrier
b.Enter()
fmt.Println("enter")
case "leave": // 释放这个barrier
b.Leave()
fmt.Println("leave")
case "quit", "exit": //退出
return
default:
fmt.Println("unknown action")
}
}
}
分布式栅栏和计数型栅栏控制的是不同节点、不同进程的执行; 当需要协调一组分布式节点在某个时间点同时运行的时候,可以使用etcd提供的这些并发原语;
STM
STM(Software Transactional Memory,软件事务内存)
etcd提供了在一个事务中对多个key的更新功能,这一组key的操作要么全部成功,要么全部失败;
etcd的事务实现方式是基于CAS方式实现,融合了Get、Put和Delete操作;
Txn().If(cond1, cond2, ...).Then(op1, op2, ...,).Else(op1’, op2’, …)
作用
简化多个key的操作,并且提供事务功能
方法
type STM interface {
Get(key ...string) string
Put(key, val string, opts ...v3.OpOption)
Rev(key string) int64
Del(key string)
}
Get
Put
Receive
Delete
示例代码:
package main
import (
"context"
"flag"
"fmt"
"log"
"math/rand"
"strings"
"sync"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
)
var (
addr = flag.String("addr", "http://127.0.0.1:2379", "etcd addresses")
)
func main() {
flag.Parse()
// 解析etcd地址
endpoints := strings.Split(*addr, ",")
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 设置5个账户,每个账号都有100元,总共500元
totalAccounts := 5
for i := 0; i < totalAccounts; i++ {
k := fmt.Sprintf("accts/%d", i)
if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
log.Fatal(err)
}
}
// STM的应用函数,主要的事务逻辑
exchange := func(stm concurrency.STM) error {
// 随机得到两个转账账号
from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
if from == to {
// 自己不和自己转账
return nil
}
// 读取账号的值
fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
fromV, toV := stm.Get(fromK), stm.Get(toK)
fromInt, toInt := 0, 0
fmt.Sscanf(fromV, "%d", &fromInt)
fmt.Sscanf(toV, "%d", &toInt)
// 把源账号一半的钱转账给目标账号
xfer := fromInt / 2
fromInt, toInt = fromInt-xfer, toInt+xfer
// 把转账后的值写回
stm.Put(fromK, fmt.Sprintf("%d", fromInt))
stm.Put(toK, fmt.Sprintf("%d", toInt))
return nil
}
// 启动10个goroutine进行转账操作
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
log.Fatal(serr)
}
}
}()
}
wg.Wait()
// 检查账号最后的数目
sum := 0
accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix()) // 得到所有账号
if err != nil {
log.Fatal(err)
}
for _, kv := range accts.Kvs { // 遍历账号的值
v := 0
fmt.Sscanf(string(kv.Value), "%d", &v)
sum += v
log.Printf("account %s: %d", kv.Key, v)
}
log.Println("account sum is", sum) // 总数
}
总结
除了etcd,Zookeeper也提供了类似的并发原语,不过只提供了Java的库,没有提供合适的Go库; Consul并没有开发这些并发原语的计划;
也可以使用Redis实现分布式锁,或者基于MySQL实现;
refs
https://time.geekbang.org/column/article/312590