Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
472 lines
9.2 KiB
472 lines
9.2 KiB
package zkhelper |
|
|
|
import ( |
|
"errors" |
|
"fmt" |
|
"path" |
|
"strings" |
|
"sync" |
|
"time" |
|
|
|
etcderr "github.com/coreos/etcd/error" |
|
"github.com/coreos/go-etcd/etcd" |
|
zk "github.com/ngaut/go-zookeeper/zk" |
|
"github.com/ngaut/log" |
|
"github.com/ngaut/pools" |
|
) |
|
|
|
var ( |
|
singleInstanceLock sync.Mutex |
|
etcdInstance *etcdImpl |
|
) |
|
|
|
type PooledEtcdClient struct { |
|
c *etcd.Client |
|
} |
|
|
|
func (c *PooledEtcdClient) Close() { |
|
|
|
} |
|
|
|
func (e *etcdImpl) Seq2Str(seq int64) string { |
|
return fmt.Sprintf("%d", seq) |
|
} |
|
|
|
type etcdImpl struct { |
|
sync.Mutex |
|
cluster string |
|
pool *pools.ResourcePool |
|
indexMap map[string]uint64 |
|
} |
|
|
|
func convertToZkError(err error) error { |
|
//todo: convert other errors |
|
if ec, ok := err.(*etcd.EtcdError); ok { |
|
switch ec.ErrorCode { |
|
case etcderr.EcodeKeyNotFound: |
|
return zk.ErrNoNode |
|
case etcderr.EcodeNotFile: |
|
case etcderr.EcodeNotDir: |
|
case etcderr.EcodeNodeExist: |
|
return zk.ErrNodeExists |
|
case etcderr.EcodeDirNotEmpty: |
|
return zk.ErrNotEmpty |
|
} |
|
} |
|
|
|
return err |
|
} |
|
|
|
func convertToZkEvent(watchPath string, resp *etcd.Response, err error) zk.Event { |
|
//log.Infof("convert event from path:%s, %+v, %+v", watchPath, resp, resp.Node.Key) |
|
var e zk.Event |
|
|
|
if err != nil { |
|
e.Err = convertToZkError(err) |
|
e.State = zk.StateDisconnected |
|
return e |
|
} |
|
|
|
e.State = zk.StateConnected |
|
|
|
e.Path = resp.Node.Key |
|
if len(resp.Node.Key) > len(watchPath) { |
|
e.Type = zk.EventNodeChildrenChanged |
|
return e |
|
} |
|
|
|
switch resp.Action { |
|
case "set": |
|
e.Type = zk.EventNodeDataChanged |
|
case "delete": |
|
e.Type = zk.EventNodeDeleted |
|
case "update": |
|
e.Type = zk.EventNodeDataChanged |
|
case "create": |
|
e.Type = zk.EventNodeCreated |
|
case "expire": |
|
e.Type = zk.EventNotWatching |
|
} |
|
|
|
return e |
|
} |
|
|
|
func NewEtcdConn(zkAddr string) (Conn, error) { |
|
singleInstanceLock.Lock() |
|
defer singleInstanceLock.Unlock() |
|
if etcdInstance != nil { |
|
return etcdInstance, nil |
|
} |
|
|
|
p := pools.NewResourcePool(func() (pools.Resource, error) { |
|
cluster := strings.Split(zkAddr, ",") |
|
for i, addr := range cluster { |
|
if !strings.HasPrefix(addr, "http://") { |
|
cluster[i] = "http://" + addr |
|
} |
|
} |
|
newClient := etcd.NewClient(cluster) |
|
newClient.SetConsistency(etcd.STRONG_CONSISTENCY) |
|
return &PooledEtcdClient{c: newClient}, nil |
|
}, 10, 10, 0) |
|
|
|
etcdInstance = &etcdImpl{ |
|
cluster: zkAddr, |
|
pool: p, |
|
indexMap: make(map[string]uint64), |
|
} |
|
|
|
log.Infof("new etcd %s", zkAddr) |
|
if etcdInstance == nil { |
|
return nil, errors.New("unknown error") |
|
} |
|
|
|
return etcdInstance, nil |
|
} |
|
|
|
func (e *etcdImpl) Get(key string) (data []byte, stat zk.Stat, err error) { |
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
|
|
defer e.pool.Put(conn) |
|
c := conn.(*PooledEtcdClient).c |
|
|
|
resp, err := c.Get(key, true, false) |
|
if resp == nil { |
|
return nil, nil, convertToZkError(err) |
|
} |
|
|
|
return []byte(resp.Node.Value), nil, nil |
|
} |
|
|
|
func (e *etcdImpl) setIndex(key string, index uint64) { |
|
e.Lock() |
|
defer e.Unlock() |
|
|
|
e.indexMap[key] = index |
|
} |
|
|
|
func (e *etcdImpl) getIndex(key string) uint64 { |
|
e.Lock() |
|
defer e.Unlock() |
|
|
|
index := e.indexMap[key] |
|
|
|
return index |
|
} |
|
|
|
func (e *etcdImpl) watch(key string, children bool) (resp *etcd.Response, stat zk.Stat, watch <-chan zk.Event, err error) { |
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
return nil, nil, nil, err |
|
} |
|
|
|
defer e.pool.Put(conn) |
|
c := conn.(*PooledEtcdClient).c |
|
index := e.getIndex(key) |
|
resp, err = c.Get(key, true, true) |
|
if resp == nil { |
|
return nil, nil, nil, convertToZkError(err) |
|
} |
|
|
|
if index < resp.Node.ModifiedIndex { |
|
index = resp.Node.ModifiedIndex |
|
} |
|
|
|
for _, n := range resp.Node.Nodes { |
|
if n.ModifiedIndex > index { |
|
index = n.ModifiedIndex |
|
} |
|
} |
|
|
|
log.Info("try watch", key) |
|
ch := make(chan zk.Event, 100) |
|
originVal := resp.Node.Value |
|
|
|
go func() { |
|
defer func() { |
|
e.setIndex(key, index) |
|
}() |
|
|
|
for { |
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
log.Error(err) |
|
return |
|
} |
|
|
|
c := conn.(*PooledEtcdClient).c |
|
|
|
resp, err := c.Watch(key, index, children, nil, nil) |
|
e.pool.Put(conn) |
|
|
|
if err != nil { |
|
if ec, ok := err.(*etcd.EtcdError); ok { |
|
if ec.ErrorCode == etcderr.EcodeEventIndexCleared { |
|
index++ |
|
continue |
|
} |
|
} |
|
|
|
log.Warning("watch", err) |
|
ch <- convertToZkEvent(key, resp, err) |
|
return |
|
} |
|
|
|
if key == resp.Node.Key && originVal == string(resp.Node.Value) { //keep alive event |
|
index++ |
|
continue |
|
} |
|
|
|
ch <- convertToZkEvent(key, resp, err) |
|
//update index |
|
if index <= resp.Node.ModifiedIndex { |
|
index = resp.Node.ModifiedIndex + 1 |
|
} else { |
|
index++ |
|
} |
|
return |
|
} |
|
}() |
|
|
|
return resp, nil, ch, nil |
|
} |
|
|
|
func (e *etcdImpl) GetW(key string) (data []byte, stat zk.Stat, watch <-chan zk.Event, err error) { |
|
resp, stat, watch, err := e.watch(key, false) |
|
if err != nil { |
|
return |
|
} |
|
|
|
return []byte(resp.Node.Value), stat, watch, nil |
|
} |
|
|
|
func (e *etcdImpl) Children(key string) (children []string, stat zk.Stat, err error) { |
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
|
|
defer e.pool.Put(conn) |
|
c := conn.(*PooledEtcdClient).c |
|
|
|
resp, err := c.Get(key, true, false) |
|
if resp == nil { |
|
return nil, nil, convertToZkError(err) |
|
} |
|
|
|
for _, c := range resp.Node.Nodes { |
|
children = append(children, path.Base(c.Key)) |
|
} |
|
|
|
return |
|
} |
|
|
|
func (e *etcdImpl) ChildrenW(key string) (children []string, stat zk.Stat, watch <-chan zk.Event, err error) { |
|
resp, stat, watch, err := e.watch(key, true) |
|
if err != nil { |
|
return nil, stat, nil, convertToZkError(err) |
|
} |
|
|
|
for _, c := range resp.Node.Nodes { |
|
children = append(children, path.Base(c.Key)) |
|
} |
|
|
|
return children, stat, watch, nil |
|
} |
|
|
|
func (e *etcdImpl) Exists(key string) (exist bool, stat zk.Stat, err error) { |
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
return false, nil, err |
|
} |
|
|
|
defer e.pool.Put(conn) |
|
c := conn.(*PooledEtcdClient).c |
|
|
|
_, err = c.Get(key, true, false) |
|
if err == nil { |
|
return true, nil, nil |
|
} |
|
|
|
if ec, ok := err.(*etcd.EtcdError); ok { |
|
if ec.ErrorCode == etcderr.EcodeKeyNotFound { |
|
return false, nil, nil |
|
} |
|
} |
|
|
|
return false, nil, convertToZkError(err) |
|
} |
|
|
|
func (e *etcdImpl) ExistsW(key string) (exist bool, stat zk.Stat, watch <-chan zk.Event, err error) { |
|
_, stat, watch, err = e.watch(key, false) |
|
if err != nil { |
|
return false, nil, nil, convertToZkError(err) |
|
} |
|
|
|
return true, nil, watch, nil |
|
} |
|
|
|
const MAX_TTL = 365 * 24 * 60 * 60 |
|
|
|
func (e *etcdImpl) doKeepAlive(key string, ttl uint64) error { |
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
defer e.pool.Put(conn) |
|
c := conn.(*PooledEtcdClient).c |
|
|
|
resp, err := c.Get(key, false, false) |
|
if err != nil { |
|
log.Error(err) |
|
return err |
|
} |
|
|
|
if resp.Node.Dir { |
|
return fmt.Errorf("can not set ttl to directory", key) |
|
} |
|
|
|
//log.Info("keep alive ", key) |
|
resp, err = c.CompareAndSwap(key, resp.Node.Value, ttl, resp.Node.Value, resp.Node.ModifiedIndex) |
|
if err == nil { |
|
return nil |
|
} |
|
|
|
if ec, ok := err.(*etcd.EtcdError); ok && ec.ErrorCode == etcderr.EcodeTestFailed { |
|
return nil |
|
} |
|
|
|
return err |
|
} |
|
|
|
//todo:add test for keepAlive |
|
func (e *etcdImpl) keepAlive(key string, ttl uint64) { |
|
go func() { |
|
for { |
|
time.Sleep(1 * time.Second) |
|
err := e.doKeepAlive(key, ttl) |
|
if err != nil { |
|
log.Error(err) |
|
return |
|
} |
|
} |
|
}() |
|
} |
|
|
|
func (e *etcdImpl) Create(wholekey string, value []byte, flags int32, aclv []zk.ACL) (keyCreated string, err error) { |
|
seq := (flags & zk.FlagSequence) != 0 |
|
tmp := (flags & zk.FlagEphemeral) != 0 |
|
ttl := uint64(MAX_TTL) |
|
if tmp { |
|
ttl = 5 |
|
} |
|
|
|
var resp *etcd.Response |
|
|
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
return "", err |
|
} |
|
|
|
defer e.pool.Put(conn) |
|
c := conn.(*PooledEtcdClient).c |
|
|
|
fn := c.Create |
|
log.Info("create", wholekey) |
|
|
|
if seq { |
|
wholekey = path.Dir(wholekey) |
|
fn = c.CreateInOrder |
|
} else { |
|
for _, v := range aclv { |
|
if v.Perms == PERM_DIRECTORY { |
|
log.Info("etcdImpl:create directory", wholekey) |
|
fn = nil |
|
resp, err = c.CreateDir(wholekey, uint64(ttl)) |
|
if err != nil { |
|
return "", convertToZkError(err) |
|
} |
|
} |
|
} |
|
} |
|
|
|
if fn == nil { |
|
if tmp { |
|
e.keepAlive(wholekey, ttl) |
|
} |
|
return resp.Node.Key, nil |
|
} |
|
|
|
resp, err = fn(wholekey, string(value), uint64(ttl)) |
|
if err != nil { |
|
return "", convertToZkError(err) |
|
} |
|
|
|
if tmp { |
|
e.keepAlive(resp.Node.Key, ttl) |
|
} |
|
|
|
return resp.Node.Key, nil |
|
} |
|
|
|
func (e *etcdImpl) Set(key string, value []byte, version int32) (stat zk.Stat, err error) { |
|
if version == 0 { |
|
return nil, errors.New("invalid version") |
|
} |
|
|
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
defer e.pool.Put(conn) |
|
c := conn.(*PooledEtcdClient).c |
|
|
|
resp, err := c.Get(key, true, false) |
|
if resp == nil { |
|
return nil, convertToZkError(err) |
|
} |
|
|
|
_, err = c.Set(key, string(value), uint64(resp.Node.TTL)) |
|
return nil, convertToZkError(err) |
|
} |
|
|
|
func (e *etcdImpl) Delete(key string, version int32) (err error) { |
|
//todo: handle version |
|
conn, err := e.pool.Get() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
defer e.pool.Put(conn) |
|
c := conn.(*PooledEtcdClient).c |
|
|
|
resp, err := c.Get(key, true, false) |
|
if resp == nil { |
|
return convertToZkError(err) |
|
} |
|
|
|
if resp.Node.Dir { |
|
_, err = c.DeleteDir(key) |
|
} else { |
|
_, err = c.Delete(key, false) |
|
} |
|
|
|
return convertToZkError(err) |
|
} |
|
|
|
func (e *etcdImpl) GetACL(key string) ([]zk.ACL, zk.Stat, error) { |
|
return nil, nil, nil |
|
} |
|
|
|
func (e *etcdImpl) SetACL(key string, aclv []zk.ACL, version int32) (zk.Stat, error) { |
|
return nil, nil |
|
} |
|
|
|
func (e *etcdImpl) Close() { |
|
//how to implement this |
|
}
|
|
|