Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
396 lines
9.6 KiB
396 lines
9.6 KiB
// The `fwd` package provides a buffered reader |
|
// and writer. Each has methods that help improve |
|
// the encoding/decoding performance of some binary |
|
// protocols. |
|
// |
|
// The `fwd.Writer` and `fwd.Reader` type provide similar |
|
// functionality to their counterparts in `bufio`, plus |
|
// a few extra utility methods that simplify read-ahead |
|
// and write-ahead. I wrote this package to improve serialization |
|
// performance for http://github.com/tinylib/msgp, |
|
// where it provided about a 2x speedup over `bufio` for certain |
|
// workloads. However, care must be taken to understand the semantics of the |
|
// extra methods provided by this package, as they allow |
|
// the user to access and manipulate the buffer memory |
|
// directly. |
|
// |
|
// The extra methods for `fwd.Reader` are `Peek`, `Skip` |
|
// and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`, |
|
// will re-allocate the read buffer in order to accommodate arbitrarily |
|
// large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes |
|
// in the stream, and uses the `io.Seeker` interface if the underlying |
|
// stream implements it. `(*fwd.Reader).Next` returns a slice pointing |
|
// to the next `n` bytes in the read buffer (like `Peek`), but also |
|
// increments the read position. This allows users to process streams |
|
// in arbitrary block sizes without having to manage appropriately-sized |
|
// slices. Additionally, obviating the need to copy the data from the |
|
// buffer to another location in memory can improve performance dramatically |
|
// in CPU-bound applications. |
|
// |
|
// `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which |
|
// returns a slice pointing to the next `n` bytes of the writer, and increments |
|
// the write position by the length of the returned slice. This allows users |
|
// to write directly to the end of the buffer. |
|
// |
|
package fwd |
|
|
|
import "io" |
|
|
|
const ( |
|
// DefaultReaderSize is the default size of the read buffer |
|
DefaultReaderSize = 2048 |
|
|
|
// minimum read buffer; straight from bufio |
|
minReaderSize = 16 |
|
) |
|
|
|
// NewReader returns a new *Reader that reads from 'r' |
|
func NewReader(r io.Reader) *Reader { |
|
return NewReaderSize(r, DefaultReaderSize) |
|
} |
|
|
|
// NewReaderSize returns a new *Reader that |
|
// reads from 'r' and has a buffer size 'n'. |
|
func NewReaderSize(r io.Reader, n int) *Reader { |
|
buf := make([]byte, 0, max(n, minReaderSize)) |
|
return NewReaderBuf(r, buf) |
|
} |
|
|
|
// NewReaderBuf returns a new *Reader that |
|
// reads from 'r' and uses 'buf' as a buffer. |
|
// 'buf' is not used when has smaller capacity than 16, |
|
// custom buffer is allocated instead. |
|
func NewReaderBuf(r io.Reader, buf []byte) *Reader { |
|
if cap(buf) < minReaderSize { |
|
buf = make([]byte, 0, minReaderSize) |
|
} |
|
buf = buf[:0] |
|
rd := &Reader{ |
|
r: r, |
|
data: buf, |
|
} |
|
if s, ok := r.(io.Seeker); ok { |
|
rd.rs = s |
|
} |
|
return rd |
|
} |
|
|
|
// Reader is a buffered look-ahead reader |
|
type Reader struct { |
|
r io.Reader // underlying reader |
|
|
|
// data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space |
|
data []byte // data |
|
n int // read offset |
|
state error // last read error |
|
|
|
// if the reader past to NewReader was |
|
// also an io.Seeker, this is non-nil |
|
rs io.Seeker |
|
} |
|
|
|
// Reset resets the underlying reader |
|
// and the read buffer. |
|
func (r *Reader) Reset(rd io.Reader) { |
|
r.r = rd |
|
r.data = r.data[0:0] |
|
r.n = 0 |
|
r.state = nil |
|
if s, ok := rd.(io.Seeker); ok { |
|
r.rs = s |
|
} else { |
|
r.rs = nil |
|
} |
|
} |
|
|
|
// more() does one read on the underlying reader |
|
func (r *Reader) more() { |
|
// move data backwards so that |
|
// the read offset is 0; this way |
|
// we can supply the maximum number of |
|
// bytes to the reader |
|
if r.n != 0 { |
|
if r.n < len(r.data) { |
|
r.data = r.data[:copy(r.data[0:], r.data[r.n:])] |
|
} else { |
|
r.data = r.data[:0] |
|
} |
|
r.n = 0 |
|
} |
|
var a int |
|
a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)]) |
|
if a == 0 && r.state == nil { |
|
r.state = io.ErrNoProgress |
|
return |
|
} else if a > 0 && r.state == io.EOF { |
|
// discard the io.EOF if we read more than 0 bytes. |
|
// the next call to Read should return io.EOF again. |
|
r.state = nil |
|
} |
|
r.data = r.data[:len(r.data)+a] |
|
} |
|
|
|
// pop error |
|
func (r *Reader) err() (e error) { |
|
e, r.state = r.state, nil |
|
return |
|
} |
|
|
|
// pop error; EOF -> io.ErrUnexpectedEOF |
|
func (r *Reader) noEOF() (e error) { |
|
e, r.state = r.state, nil |
|
if e == io.EOF { |
|
e = io.ErrUnexpectedEOF |
|
} |
|
return |
|
} |
|
|
|
// buffered bytes |
|
func (r *Reader) buffered() int { return len(r.data) - r.n } |
|
|
|
// Buffered returns the number of bytes currently in the buffer |
|
func (r *Reader) Buffered() int { return len(r.data) - r.n } |
|
|
|
// BufferSize returns the total size of the buffer |
|
func (r *Reader) BufferSize() int { return cap(r.data) } |
|
|
|
// Peek returns the next 'n' buffered bytes, |
|
// reading from the underlying reader if necessary. |
|
// It will only return a slice shorter than 'n' bytes |
|
// if it also returns an error. Peek does not advance |
|
// the reader. EOF errors are *not* returned as |
|
// io.ErrUnexpectedEOF. |
|
func (r *Reader) Peek(n int) ([]byte, error) { |
|
// in the degenerate case, |
|
// we may need to realloc |
|
// (the caller asked for more |
|
// bytes than the size of the buffer) |
|
if cap(r.data) < n { |
|
old := r.data[r.n:] |
|
r.data = make([]byte, n+r.buffered()) |
|
r.data = r.data[:copy(r.data, old)] |
|
r.n = 0 |
|
} |
|
|
|
// keep filling until |
|
// we hit an error or |
|
// read enough bytes |
|
for r.buffered() < n && r.state == nil { |
|
r.more() |
|
} |
|
|
|
// we must have hit an error |
|
if r.buffered() < n { |
|
return r.data[r.n:], r.err() |
|
} |
|
|
|
return r.data[r.n : r.n+n], nil |
|
} |
|
|
|
// Skip moves the reader forward 'n' bytes. |
|
// Returns the number of bytes skipped and any |
|
// errors encountered. It is analogous to Seek(n, 1). |
|
// If the underlying reader implements io.Seeker, then |
|
// that method will be used to skip forward. |
|
// |
|
// If the reader encounters |
|
// an EOF before skipping 'n' bytes, it |
|
// returns io.ErrUnexpectedEOF. If the |
|
// underlying reader implements io.Seeker, then |
|
// those rules apply instead. (Many implementations |
|
// will not return `io.EOF` until the next call |
|
// to Read.) |
|
func (r *Reader) Skip(n int) (int, error) { |
|
|
|
// fast path |
|
if r.buffered() >= n { |
|
r.n += n |
|
return n, nil |
|
} |
|
|
|
// use seeker implementation |
|
// if we can |
|
if r.rs != nil { |
|
return r.skipSeek(n) |
|
} |
|
|
|
// loop on filling |
|
// and then erasing |
|
o := n |
|
for r.buffered() < n && r.state == nil { |
|
r.more() |
|
// we can skip forward |
|
// up to r.buffered() bytes |
|
step := min(r.buffered(), n) |
|
r.n += step |
|
n -= step |
|
} |
|
// at this point, n should be |
|
// 0 if everything went smoothly |
|
return o - n, r.noEOF() |
|
} |
|
|
|
// Next returns the next 'n' bytes in the stream. |
|
// Unlike Peek, Next advances the reader position. |
|
// The returned bytes point to the same |
|
// data as the buffer, so the slice is |
|
// only valid until the next reader method call. |
|
// An EOF is considered an unexpected error. |
|
// If an the returned slice is less than the |
|
// length asked for, an error will be returned, |
|
// and the reader position will not be incremented. |
|
func (r *Reader) Next(n int) ([]byte, error) { |
|
|
|
// in case the buffer is too small |
|
if cap(r.data) < n { |
|
old := r.data[r.n:] |
|
r.data = make([]byte, n+r.buffered()) |
|
r.data = r.data[:copy(r.data, old)] |
|
r.n = 0 |
|
} |
|
|
|
// fill at least 'n' bytes |
|
for r.buffered() < n && r.state == nil { |
|
r.more() |
|
} |
|
|
|
if r.buffered() < n { |
|
return r.data[r.n:], r.noEOF() |
|
} |
|
out := r.data[r.n : r.n+n] |
|
r.n += n |
|
return out, nil |
|
} |
|
|
|
// skipSeek uses the io.Seeker to seek forward. |
|
// only call this function when n > r.buffered() |
|
func (r *Reader) skipSeek(n int) (int, error) { |
|
o := r.buffered() |
|
// first, clear buffer |
|
n -= o |
|
r.n = 0 |
|
r.data = r.data[:0] |
|
|
|
// then seek forward remaning bytes |
|
i, err := r.rs.Seek(int64(n), 1) |
|
return int(i) + o, err |
|
} |
|
|
|
// Read implements `io.Reader` |
|
func (r *Reader) Read(b []byte) (int, error) { |
|
// if we have data in the buffer, just |
|
// return that. |
|
if r.buffered() != 0 { |
|
x := copy(b, r.data[r.n:]) |
|
r.n += x |
|
return x, nil |
|
} |
|
var n int |
|
// we have no buffered data; determine |
|
// whether or not to buffer or call |
|
// the underlying reader directly |
|
if len(b) >= cap(r.data) { |
|
n, r.state = r.r.Read(b) |
|
} else { |
|
r.more() |
|
n = copy(b, r.data) |
|
r.n = n |
|
} |
|
if n == 0 { |
|
return 0, r.err() |
|
} |
|
return n, nil |
|
} |
|
|
|
// ReadFull attempts to read len(b) bytes into |
|
// 'b'. It returns the number of bytes read into |
|
// 'b', and an error if it does not return len(b). |
|
// EOF is considered an unexpected error. |
|
func (r *Reader) ReadFull(b []byte) (int, error) { |
|
var n int // read into b |
|
var nn int // scratch |
|
l := len(b) |
|
// either read buffered data, |
|
// or read directly for the underlying |
|
// buffer, or fetch more buffered data. |
|
for n < l && r.state == nil { |
|
if r.buffered() != 0 { |
|
nn = copy(b[n:], r.data[r.n:]) |
|
n += nn |
|
r.n += nn |
|
} else if l-n > cap(r.data) { |
|
nn, r.state = r.r.Read(b[n:]) |
|
n += nn |
|
} else { |
|
r.more() |
|
} |
|
} |
|
if n < l { |
|
return n, r.noEOF() |
|
} |
|
return n, nil |
|
} |
|
|
|
// ReadByte implements `io.ByteReader` |
|
func (r *Reader) ReadByte() (byte, error) { |
|
for r.buffered() < 1 && r.state == nil { |
|
r.more() |
|
} |
|
if r.buffered() < 1 { |
|
return 0, r.err() |
|
} |
|
b := r.data[r.n] |
|
r.n++ |
|
return b, nil |
|
} |
|
|
|
// WriteTo implements `io.WriterTo` |
|
func (r *Reader) WriteTo(w io.Writer) (int64, error) { |
|
var ( |
|
i int64 |
|
ii int |
|
err error |
|
) |
|
// first, clear buffer |
|
if r.buffered() > 0 { |
|
ii, err = w.Write(r.data[r.n:]) |
|
i += int64(ii) |
|
if err != nil { |
|
return i, err |
|
} |
|
r.data = r.data[0:0] |
|
r.n = 0 |
|
} |
|
for r.state == nil { |
|
// here we just do |
|
// 1:1 reads and writes |
|
r.more() |
|
if r.buffered() > 0 { |
|
ii, err = w.Write(r.data) |
|
i += int64(ii) |
|
if err != nil { |
|
return i, err |
|
} |
|
r.data = r.data[0:0] |
|
r.n = 0 |
|
} |
|
} |
|
if r.state != io.EOF { |
|
return i, r.err() |
|
} |
|
return i, nil |
|
} |
|
|
|
func min(a int, b int) int { |
|
if a < b { |
|
return a |
|
} |
|
return b |
|
} |
|
|
|
func max(a int, b int) int { |
|
if a < b { |
|
return b |
|
} |
|
return a |
|
}
|
|
|