Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
398 lines
10 KiB
398 lines
10 KiB
package couchbase |
|
|
|
import ( |
|
"log" |
|
"sync" |
|
"time" |
|
|
|
"fmt" |
|
"github.com/couchbase/gomemcached" |
|
"github.com/couchbase/gomemcached/client" |
|
"github.com/couchbase/goutils/logging" |
|
) |
|
|
|
// A UprFeed streams mutation events from a bucket. |
|
// |
|
// Events from the bucket can be read from the channel 'C'. Remember |
|
// to call Close() on it when you're done, unless its channel has |
|
// closed itself already. |
|
type UprFeed struct { |
|
C <-chan *memcached.UprEvent |
|
|
|
bucket *Bucket |
|
nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes |
|
output chan *memcached.UprEvent // Same as C but writeably-typed |
|
outputClosed bool |
|
quit chan bool |
|
name string // name of this UPR feed |
|
sequence uint32 // sequence number for this feed |
|
connected bool |
|
killSwitch chan bool |
|
closing bool |
|
wg sync.WaitGroup |
|
dcp_buffer_size uint32 |
|
data_chan_size int |
|
} |
|
|
|
// UprFeed from a single connection |
|
type FeedInfo struct { |
|
uprFeed *memcached.UprFeed // UPR feed handle |
|
host string // hostname |
|
connected bool // connected |
|
quit chan bool // quit channel |
|
} |
|
|
|
type FailoverLog map[uint16]memcached.FailoverLog |
|
|
|
// GetFailoverLogs, get the failover logs for a set of vbucket ids |
|
func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) { |
|
|
|
// map vbids to their corresponding hosts |
|
vbHostList := make(map[string][]uint16) |
|
vbm := b.VBServerMap() |
|
if len(vbm.VBucketMap) < len(vBuckets) { |
|
return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", |
|
vbm.VBucketMap, vBuckets) |
|
} |
|
|
|
for _, vb := range vBuckets { |
|
masterID := vbm.VBucketMap[vb][0] |
|
master := b.getMasterNode(masterID) |
|
if master == "" { |
|
return nil, fmt.Errorf("No master found for vb %d", vb) |
|
} |
|
|
|
vbList := vbHostList[master] |
|
if vbList == nil { |
|
vbList = make([]uint16, 0) |
|
} |
|
vbList = append(vbList, vb) |
|
vbHostList[master] = vbList |
|
} |
|
|
|
failoverLogMap := make(FailoverLog) |
|
for _, serverConn := range b.getConnPools(false /* not already locked */) { |
|
|
|
vbList := vbHostList[serverConn.host] |
|
if vbList == nil { |
|
continue |
|
} |
|
|
|
mc, err := serverConn.Get() |
|
if err != nil { |
|
logging.Infof("No Free connections for vblist %v", vbList) |
|
return nil, fmt.Errorf("No Free connections for host %s", |
|
serverConn.host) |
|
|
|
} |
|
// close the connection so that it doesn't get reused for upr data |
|
// connection |
|
defer mc.Close() |
|
failoverlogs, err := mc.UprGetFailoverLog(vbList) |
|
if err != nil { |
|
return nil, fmt.Errorf("Error getting failover log %s host %s", |
|
err.Error(), serverConn.host) |
|
|
|
} |
|
|
|
for vb, log := range failoverlogs { |
|
failoverLogMap[vb] = *log |
|
} |
|
} |
|
|
|
return failoverLogMap, nil |
|
} |
|
|
|
func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) { |
|
return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE) |
|
} |
|
|
|
// StartUprFeed creates and starts a new Upr feed |
|
// No data will be sent on the channel unless vbuckets streams are requested |
|
func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) { |
|
|
|
feed := &UprFeed{ |
|
bucket: b, |
|
output: make(chan *memcached.UprEvent, data_chan_size), |
|
quit: make(chan bool), |
|
nodeFeeds: make(map[string]*FeedInfo, 0), |
|
name: name, |
|
sequence: sequence, |
|
killSwitch: make(chan bool), |
|
dcp_buffer_size: dcp_buffer_size, |
|
data_chan_size: data_chan_size, |
|
} |
|
|
|
err := feed.connectToNodes() |
|
if err != nil { |
|
return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error()) |
|
} |
|
feed.connected = true |
|
go feed.run() |
|
|
|
feed.C = feed.output |
|
return feed, nil |
|
} |
|
|
|
// UprRequestStream starts a stream for a vb on a feed |
|
func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32, |
|
vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error { |
|
|
|
defer func() { |
|
if r := recover(); r != nil { |
|
log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket) |
|
} |
|
}() |
|
|
|
vbm := feed.bucket.VBServerMap() |
|
if len(vbm.VBucketMap) < int(vb) { |
|
return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", |
|
vb, vbm.VBucketMap) |
|
} |
|
|
|
if int(vb) >= len(vbm.VBucketMap) { |
|
return fmt.Errorf("Invalid vbucket id %d", vb) |
|
} |
|
|
|
masterID := vbm.VBucketMap[vb][0] |
|
master := feed.bucket.getMasterNode(masterID) |
|
if master == "" { |
|
return fmt.Errorf("Master node not found for vbucket %d", vb) |
|
} |
|
singleFeed := feed.nodeFeeds[master] |
|
if singleFeed == nil { |
|
return fmt.Errorf("UprFeed for this host not found") |
|
} |
|
|
|
if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags, |
|
vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil { |
|
return err |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// UprCloseStream ends a vbucket stream. |
|
func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error { |
|
|
|
defer func() { |
|
if r := recover(); r != nil { |
|
log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket) |
|
} |
|
}() |
|
|
|
vbm := feed.bucket.VBServerMap() |
|
if len(vbm.VBucketMap) < int(vb) { |
|
return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", |
|
vb, vbm.VBucketMap) |
|
} |
|
|
|
if int(vb) >= len(vbm.VBucketMap) { |
|
return fmt.Errorf("Invalid vbucket id %d", vb) |
|
} |
|
|
|
masterID := vbm.VBucketMap[vb][0] |
|
master := feed.bucket.getMasterNode(masterID) |
|
if master == "" { |
|
return fmt.Errorf("Master node not found for vbucket %d", vb) |
|
} |
|
singleFeed := feed.nodeFeeds[master] |
|
if singleFeed == nil { |
|
return fmt.Errorf("UprFeed for this host not found") |
|
} |
|
|
|
if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil { |
|
return err |
|
} |
|
return nil |
|
} |
|
|
|
// Goroutine that runs the feed |
|
func (feed *UprFeed) run() { |
|
retryInterval := initialRetryInterval |
|
bucketOK := true |
|
for { |
|
// Connect to the UPR feed of each server node: |
|
if bucketOK { |
|
// Run until one of the sub-feeds fails: |
|
select { |
|
case <-feed.killSwitch: |
|
case <-feed.quit: |
|
return |
|
} |
|
//feed.closeNodeFeeds() |
|
retryInterval = initialRetryInterval |
|
} |
|
|
|
if feed.closing == true { |
|
// we have been asked to shut down |
|
return |
|
} |
|
|
|
// On error, try to refresh the bucket in case the list of nodes changed: |
|
logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v", |
|
feed.bucket.Name, retryInterval) |
|
|
|
if err := feed.bucket.Refresh(); err != nil { |
|
// if we fail to refresh the bucket, exit the feed |
|
// MB-14917 |
|
logging.Infof("Unable to refresh bucket %s ", err.Error()) |
|
close(feed.output) |
|
feed.outputClosed = true |
|
feed.closeNodeFeeds() |
|
return |
|
} |
|
|
|
// this will only connect to nodes that are not connected or changed |
|
// user will have to reconnect the stream |
|
err := feed.connectToNodes() |
|
if err != nil { |
|
logging.Infof("Unable to connect to nodes..exit ") |
|
close(feed.output) |
|
feed.outputClosed = true |
|
feed.closeNodeFeeds() |
|
return |
|
} |
|
bucketOK = err == nil |
|
|
|
select { |
|
case <-time.After(retryInterval): |
|
case <-feed.quit: |
|
return |
|
} |
|
if retryInterval *= 2; retryInterval > maximumRetryInterval { |
|
retryInterval = maximumRetryInterval |
|
} |
|
} |
|
} |
|
|
|
func (feed *UprFeed) connectToNodes() (err error) { |
|
nodeCount := 0 |
|
for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) { |
|
|
|
// this maybe a reconnection, so check if the connection to the node |
|
// already exists. Connect only if the node is not found in the list |
|
// or connected == false |
|
nodeFeed := feed.nodeFeeds[serverConn.host] |
|
|
|
if nodeFeed != nil && nodeFeed.connected == true { |
|
continue |
|
} |
|
|
|
var singleFeed *memcached.UprFeed |
|
var name string |
|
if feed.name == "" { |
|
name = "DefaultUprClient" |
|
} else { |
|
name = feed.name |
|
} |
|
singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size) |
|
if err != nil { |
|
logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err) |
|
feed.closeNodeFeeds() |
|
return |
|
} |
|
// add the node to the connection map |
|
feedInfo := &FeedInfo{ |
|
uprFeed: singleFeed, |
|
connected: true, |
|
host: serverConn.host, |
|
quit: make(chan bool), |
|
} |
|
feed.nodeFeeds[serverConn.host] = feedInfo |
|
go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host) |
|
feed.wg.Add(1) |
|
nodeCount++ |
|
} |
|
if nodeCount == 0 { |
|
return fmt.Errorf("No connection to bucket") |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// Goroutine that forwards Upr events from a single node's feed to the aggregate feed. |
|
func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) { |
|
singleFeed := nodeFeed.uprFeed |
|
|
|
defer func() { |
|
feed.wg.Done() |
|
if r := recover(); r != nil { |
|
//if feed is not closing, re-throw the panic |
|
if feed.outputClosed != true && feed.closing != true { |
|
panic(r) |
|
} else { |
|
logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully") |
|
|
|
} |
|
} |
|
}() |
|
|
|
for { |
|
select { |
|
case <-nodeFeed.quit: |
|
nodeFeed.connected = false |
|
return |
|
|
|
case event, ok := <-singleFeed.C: |
|
if !ok { |
|
if singleFeed.Error != nil { |
|
logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error) |
|
} |
|
killSwitch <- true |
|
return |
|
} |
|
if feed.outputClosed == true { |
|
// someone closed the node feed |
|
logging.Infof("Node need closed, returning from forwardUprEvent") |
|
return |
|
} |
|
feed.output <- event |
|
if event.Status == gomemcached.NOT_MY_VBUCKET { |
|
logging.Infof(" Got a not my vbucket error !! ") |
|
if err := feed.bucket.Refresh(); err != nil { |
|
logging.Errorf("Unable to refresh bucket %s ", err.Error()) |
|
feed.closeNodeFeeds() |
|
return |
|
} |
|
// this will only connect to nodes that are not connected or changed |
|
// user will have to reconnect the stream |
|
if err := feed.connectToNodes(); err != nil { |
|
logging.Errorf("Unable to connect to nodes %s", err.Error()) |
|
return |
|
} |
|
|
|
} |
|
} |
|
} |
|
} |
|
|
|
func (feed *UprFeed) closeNodeFeeds() { |
|
for _, f := range feed.nodeFeeds { |
|
logging.Infof(" Sending close to forwardUprEvent ") |
|
close(f.quit) |
|
f.uprFeed.Close() |
|
} |
|
feed.nodeFeeds = nil |
|
} |
|
|
|
// Close a Upr feed. |
|
func (feed *UprFeed) Close() error { |
|
select { |
|
case <-feed.quit: |
|
return nil |
|
default: |
|
} |
|
|
|
feed.closing = true |
|
feed.closeNodeFeeds() |
|
close(feed.quit) |
|
|
|
feed.wg.Wait() |
|
if feed.outputClosed == false { |
|
feed.outputClosed = true |
|
close(feed.output) |
|
} |
|
|
|
return nil |
|
}
|
|
|