Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
312 lines
5.4 KiB
312 lines
5.4 KiB
package nodb |
|
|
|
import ( |
|
"bufio" |
|
"bytes" |
|
"errors" |
|
"io" |
|
"os" |
|
"time" |
|
|
|
"gitea.com/lunny/log" |
|
"gitea.com/lunny/nodb/store/driver" |
|
) |
|
|
|
const ( |
|
maxReplBatchNum = 100 |
|
maxReplLogSize = 1 * 1024 * 1024 |
|
) |
|
|
|
var ( |
|
ErrSkipEvent = errors.New("skip to next event") |
|
) |
|
|
|
var ( |
|
errInvalidBinLogEvent = errors.New("invalid binglog event") |
|
errInvalidBinLogFile = errors.New("invalid binlog file") |
|
) |
|
|
|
type replBatch struct { |
|
wb driver.IWriteBatch |
|
events [][]byte |
|
l *Nodb |
|
|
|
lastHead *BinLogHead |
|
} |
|
|
|
func (b *replBatch) Commit() error { |
|
b.l.commitLock.Lock() |
|
defer b.l.commitLock.Unlock() |
|
|
|
err := b.wb.Commit() |
|
if err != nil { |
|
b.Rollback() |
|
return err |
|
} |
|
|
|
if b.l.binlog != nil { |
|
if err = b.l.binlog.Log(b.events...); err != nil { |
|
b.Rollback() |
|
return err |
|
} |
|
} |
|
|
|
b.events = [][]byte{} |
|
b.lastHead = nil |
|
|
|
return nil |
|
} |
|
|
|
func (b *replBatch) Rollback() error { |
|
b.wb.Rollback() |
|
b.events = [][]byte{} |
|
b.lastHead = nil |
|
return nil |
|
} |
|
|
|
func (l *Nodb) replicateEvent(b *replBatch, event []byte) error { |
|
if len(event) == 0 { |
|
return errInvalidBinLogEvent |
|
} |
|
|
|
b.events = append(b.events, event) |
|
|
|
logType := uint8(event[0]) |
|
switch logType { |
|
case BinLogTypePut: |
|
return l.replicatePutEvent(b, event) |
|
case BinLogTypeDeletion: |
|
return l.replicateDeleteEvent(b, event) |
|
default: |
|
return errInvalidBinLogEvent |
|
} |
|
} |
|
|
|
func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error { |
|
key, value, err := decodeBinLogPut(event) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
b.wb.Put(key, value) |
|
|
|
return nil |
|
} |
|
|
|
func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error { |
|
key, err := decodeBinLogDelete(event) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
b.wb.Delete(key) |
|
|
|
return nil |
|
} |
|
|
|
func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error { |
|
head := &BinLogHead{} |
|
var err error |
|
|
|
for { |
|
if err = head.Read(rb); err != nil { |
|
if err == io.EOF { |
|
break |
|
} else { |
|
return err |
|
} |
|
} |
|
|
|
var dataBuf bytes.Buffer |
|
|
|
if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil { |
|
return err |
|
} |
|
|
|
err = f(head, dataBuf.Bytes()) |
|
if err != nil && err != ErrSkipEvent { |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (l *Nodb) ReplicateFromReader(rb io.Reader) error { |
|
b := new(replBatch) |
|
|
|
b.wb = l.ldb.NewWriteBatch() |
|
b.l = l |
|
|
|
f := func(head *BinLogHead, event []byte) error { |
|
if b.lastHead == nil { |
|
b.lastHead = head |
|
} else if !b.lastHead.InSameBatch(head) { |
|
if err := b.Commit(); err != nil { |
|
log.Fatalf("replication error %s, skip to next", err.Error()) |
|
return ErrSkipEvent |
|
} |
|
b.lastHead = head |
|
} |
|
|
|
err := l.replicateEvent(b, event) |
|
if err != nil { |
|
log.Fatalf("replication error %s, skip to next", err.Error()) |
|
return ErrSkipEvent |
|
} |
|
return nil |
|
} |
|
|
|
err := ReadEventFromReader(rb, f) |
|
if err != nil { |
|
b.Rollback() |
|
return err |
|
} |
|
return b.Commit() |
|
} |
|
|
|
func (l *Nodb) ReplicateFromData(data []byte) error { |
|
rb := bytes.NewReader(data) |
|
|
|
err := l.ReplicateFromReader(rb) |
|
|
|
return err |
|
} |
|
|
|
func (l *Nodb) ReplicateFromBinLog(filePath string) error { |
|
f, err := os.Open(filePath) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
rb := bufio.NewReaderSize(f, 4096) |
|
|
|
err = l.ReplicateFromReader(rb) |
|
|
|
f.Close() |
|
|
|
return err |
|
} |
|
|
|
// try to read events, if no events read, try to wait the new event singal until timeout seconds |
|
func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) { |
|
lastIndex := info.LogFileIndex |
|
lastPos := info.LogPos |
|
|
|
n = 0 |
|
if l.binlog == nil { |
|
//binlog not supported |
|
info.LogFileIndex = 0 |
|
info.LogPos = 0 |
|
return |
|
} |
|
|
|
n, err = l.ReadEventsTo(info, w) |
|
if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos { |
|
//no events read |
|
select { |
|
case <-l.binlog.Wait(): |
|
case <-time.After(time.Duration(timeout) * time.Second): |
|
} |
|
return l.ReadEventsTo(info, w) |
|
} |
|
return |
|
} |
|
|
|
func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) { |
|
n = 0 |
|
if l.binlog == nil { |
|
//binlog not supported |
|
info.LogFileIndex = 0 |
|
info.LogPos = 0 |
|
return |
|
} |
|
|
|
index := info.LogFileIndex |
|
offset := info.LogPos |
|
|
|
filePath := l.binlog.FormatLogFilePath(index) |
|
|
|
var f *os.File |
|
f, err = os.Open(filePath) |
|
if os.IsNotExist(err) { |
|
lastIndex := l.binlog.LogFileIndex() |
|
|
|
if index == lastIndex { |
|
//no binlog at all |
|
info.LogPos = 0 |
|
} else { |
|
//slave binlog info had lost |
|
info.LogFileIndex = -1 |
|
} |
|
} |
|
|
|
if err != nil { |
|
if os.IsNotExist(err) { |
|
err = nil |
|
} |
|
return |
|
} |
|
|
|
defer f.Close() |
|
|
|
var fileSize int64 |
|
st, _ := f.Stat() |
|
fileSize = st.Size() |
|
|
|
if fileSize == info.LogPos { |
|
return |
|
} |
|
|
|
if _, err = f.Seek(offset, os.SEEK_SET); err != nil { |
|
//may be invliad seek offset |
|
return |
|
} |
|
|
|
var lastHead *BinLogHead = nil |
|
|
|
head := &BinLogHead{} |
|
|
|
batchNum := 0 |
|
|
|
for { |
|
if err = head.Read(f); err != nil { |
|
if err == io.EOF { |
|
//we will try to use next binlog |
|
if index < l.binlog.LogFileIndex() { |
|
info.LogFileIndex += 1 |
|
info.LogPos = 0 |
|
} |
|
err = nil |
|
return |
|
} else { |
|
return |
|
} |
|
|
|
} |
|
|
|
if lastHead == nil { |
|
lastHead = head |
|
batchNum++ |
|
} else if !lastHead.InSameBatch(head) { |
|
lastHead = head |
|
batchNum++ |
|
if batchNum > maxReplBatchNum || n > maxReplLogSize { |
|
return |
|
} |
|
} |
|
|
|
if err = head.Write(w); err != nil { |
|
return |
|
} |
|
|
|
if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil { |
|
return |
|
} |
|
|
|
n += (head.Len() + int(head.PayloadLen)) |
|
info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen) |
|
} |
|
|
|
return |
|
}
|
|
|