Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
305 lines
6.9 KiB
305 lines
6.9 KiB
// Copyright 2019 Lunny Xiao. All rights reserved. |
|
// Use of this source code is governed by a MIT-style |
|
// license that can be found in the LICENSE file. |
|
|
|
package levelqueue |
|
|
|
import ( |
|
"bytes" |
|
"encoding/binary" |
|
"sync" |
|
|
|
"github.com/syndtr/goleveldb/leveldb" |
|
"github.com/syndtr/goleveldb/leveldb/errors" |
|
) |
|
|
|
const ( |
|
lowKeyStr = "low" |
|
highKeyStr = "high" |
|
) |
|
|
|
// Queue defines a queue struct |
|
type Queue struct { |
|
db *leveldb.DB |
|
highLock sync.Mutex |
|
lowLock sync.Mutex |
|
low int64 |
|
high int64 |
|
lowKey []byte |
|
highKey []byte |
|
prefix []byte |
|
closeUnderlyingDB bool |
|
} |
|
|
|
// Open opens a queue from the db path or creates a |
|
// queue if it doesn't exist. |
|
// The keys will not be prefixed by default |
|
func Open(dataDir string) (*Queue, error) { |
|
db, err := leveldb.OpenFile(dataDir, nil) |
|
if err != nil { |
|
if !errors.IsCorrupted(err) { |
|
return nil, err |
|
} |
|
db, err = leveldb.RecoverFile(dataDir, nil) |
|
if err != nil { |
|
return nil, err |
|
} |
|
} |
|
return NewQueue(db, []byte{}, true) |
|
} |
|
|
|
// NewQueue creates a queue from a db. The keys will be prefixed with prefix |
|
// and at close the db will be closed as per closeUnderlyingDB |
|
func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) { |
|
var err error |
|
|
|
var queue = &Queue{ |
|
db: db, |
|
closeUnderlyingDB: closeUnderlyingDB, |
|
} |
|
|
|
queue.prefix = make([]byte, len(prefix)) |
|
copy(queue.prefix, prefix) |
|
queue.lowKey = withPrefix(prefix, []byte(lowKeyStr)) |
|
queue.highKey = withPrefix(prefix, []byte(highKeyStr)) |
|
|
|
queue.low, err = queue.readID(queue.lowKey) |
|
if err == leveldb.ErrNotFound { |
|
queue.low = 1 |
|
err = db.Put(queue.lowKey, id2bytes(1), nil) |
|
} |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
queue.high, err = queue.readID(queue.highKey) |
|
if err == leveldb.ErrNotFound { |
|
err = db.Put(queue.highKey, id2bytes(0), nil) |
|
} |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return queue, nil |
|
} |
|
|
|
func (queue *Queue) readID(key []byte) (int64, error) { |
|
bs, err := queue.db.Get(key, nil) |
|
if err != nil { |
|
return 0, err |
|
} |
|
return bytes2id(bs) |
|
} |
|
|
|
func (queue *Queue) highincrement() (int64, error) { |
|
id := queue.high + 1 |
|
queue.high = id |
|
err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) |
|
if err != nil { |
|
queue.high = queue.high - 1 |
|
return 0, err |
|
} |
|
return id, nil |
|
} |
|
|
|
func (queue *Queue) highdecrement() (int64, error) { |
|
queue.high = queue.high - 1 |
|
err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) |
|
if err != nil { |
|
queue.high = queue.high + 1 |
|
return 0, err |
|
} |
|
return queue.high, nil |
|
} |
|
|
|
func (queue *Queue) lowincrement() (int64, error) { |
|
queue.low = queue.low + 1 |
|
err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) |
|
if err != nil { |
|
queue.low = queue.low - 1 |
|
return 0, err |
|
} |
|
return queue.low, nil |
|
} |
|
|
|
func (queue *Queue) lowdecrement() (int64, error) { |
|
queue.low = queue.low - 1 |
|
err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) |
|
if err != nil { |
|
queue.low = queue.low + 1 |
|
return 0, err |
|
} |
|
return queue.low, nil |
|
} |
|
|
|
// Len returns the length of the queue |
|
func (queue *Queue) Len() int64 { |
|
queue.lowLock.Lock() |
|
queue.highLock.Lock() |
|
l := queue.high - queue.low + 1 |
|
queue.highLock.Unlock() |
|
queue.lowLock.Unlock() |
|
return l |
|
} |
|
|
|
func id2bytes(id int64) []byte { |
|
var buf = make([]byte, 8) |
|
binary.PutVarint(buf, id) |
|
return buf |
|
} |
|
|
|
func bytes2id(b []byte) (int64, error) { |
|
return binary.ReadVarint(bytes.NewReader(b)) |
|
} |
|
|
|
func withPrefix(prefix []byte, value []byte) []byte { |
|
if len(prefix) == 0 { |
|
return value |
|
} |
|
prefixed := make([]byte, len(prefix)+1+len(value)) |
|
copy(prefixed[0:len(prefix)], prefix) |
|
prefixed[len(prefix)] = '-' |
|
copy(prefixed[len(prefix)+1:], value) |
|
return prefixed |
|
} |
|
|
|
// RPush pushes a data from right of queue |
|
func (queue *Queue) RPush(data []byte) error { |
|
queue.highLock.Lock() |
|
id, err := queue.highincrement() |
|
if err != nil { |
|
queue.highLock.Unlock() |
|
return err |
|
} |
|
err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) |
|
queue.highLock.Unlock() |
|
return err |
|
} |
|
|
|
// LPush pushes a data from left of queue |
|
func (queue *Queue) LPush(data []byte) error { |
|
queue.lowLock.Lock() |
|
id, err := queue.lowdecrement() |
|
if err != nil { |
|
queue.lowLock.Unlock() |
|
return err |
|
} |
|
err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) |
|
queue.lowLock.Unlock() |
|
return err |
|
} |
|
|
|
// RPop pop a data from right of queue |
|
func (queue *Queue) RPop() ([]byte, error) { |
|
queue.highLock.Lock() |
|
defer queue.highLock.Unlock() |
|
currentID := queue.high |
|
|
|
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) |
|
if err != nil { |
|
if err == leveldb.ErrNotFound { |
|
return nil, ErrNotFound |
|
} |
|
return nil, err |
|
} |
|
|
|
_, err = queue.highdecrement() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return res, nil |
|
} |
|
|
|
// RHandle receives a user callback function to handle the right element of the queue, if function return nil, then delete the element, otherwise keep the element. |
|
func (queue *Queue) RHandle(h func([]byte) error) error { |
|
queue.highLock.Lock() |
|
defer queue.highLock.Unlock() |
|
currentID := queue.high |
|
|
|
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) |
|
if err != nil { |
|
if err == leveldb.ErrNotFound { |
|
return ErrNotFound |
|
} |
|
return err |
|
} |
|
|
|
if err = h(res); err != nil { |
|
return err |
|
} |
|
|
|
_, err = queue.highdecrement() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) |
|
} |
|
|
|
// LPop pop a data from left of queue |
|
func (queue *Queue) LPop() ([]byte, error) { |
|
queue.lowLock.Lock() |
|
defer queue.lowLock.Unlock() |
|
currentID := queue.low |
|
|
|
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) |
|
if err != nil { |
|
if err == leveldb.ErrNotFound { |
|
return nil, ErrNotFound |
|
} |
|
return nil, err |
|
} |
|
|
|
_, err = queue.lowincrement() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return res, nil |
|
} |
|
|
|
// LHandle receives a user callback function to handle the left element of the queue, if function return nil, then delete the element, otherwise keep the element. |
|
func (queue *Queue) LHandle(h func([]byte) error) error { |
|
queue.lowLock.Lock() |
|
defer queue.lowLock.Unlock() |
|
currentID := queue.low |
|
|
|
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) |
|
if err != nil { |
|
if err == leveldb.ErrNotFound { |
|
return ErrNotFound |
|
} |
|
return err |
|
} |
|
|
|
if err = h(res); err != nil { |
|
return err |
|
} |
|
|
|
_, err = queue.lowincrement() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) |
|
} |
|
|
|
// Close closes the queue (and the underlying db is set to closeUnderlyingDB) |
|
func (queue *Queue) Close() error { |
|
if !queue.closeUnderlyingDB { |
|
queue.db = nil |
|
return nil |
|
} |
|
err := queue.db.Close() |
|
queue.db = nil |
|
return err |
|
}
|
|
|