Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
306 lines
8.5 KiB
306 lines
8.5 KiB
// Copyright (c) 2017 Couchbase, Inc. |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package zap |
|
|
|
import ( |
|
"bytes" |
|
"encoding/binary" |
|
"fmt" |
|
"math" |
|
"reflect" |
|
"sort" |
|
|
|
index "github.com/blevesearch/bleve_index_api" |
|
segment "github.com/blevesearch/scorch_segment_api/v2" |
|
"github.com/golang/snappy" |
|
) |
|
|
|
var reflectStaticSizedocValueReader int |
|
|
|
func init() { |
|
var dvi docValueReader |
|
reflectStaticSizedocValueReader = int(reflect.TypeOf(dvi).Size()) |
|
} |
|
|
|
type docNumTermsVisitor func(docNum uint64, terms []byte) error |
|
|
|
type docVisitState struct { |
|
dvrs map[uint16]*docValueReader |
|
segment *SegmentBase |
|
} |
|
|
|
type docValueReader struct { |
|
field string |
|
curChunkNum uint64 |
|
chunkOffsets []uint64 |
|
dvDataLoc uint64 |
|
curChunkHeader []MetaData |
|
curChunkData []byte // compressed data cache |
|
uncompressed []byte // temp buf for snappy decompression |
|
} |
|
|
|
func (di *docValueReader) size() int { |
|
return reflectStaticSizedocValueReader + SizeOfPtr + |
|
len(di.field) + |
|
len(di.chunkOffsets)*SizeOfUint64 + |
|
len(di.curChunkHeader)*reflectStaticSizeMetaData + |
|
len(di.curChunkData) |
|
} |
|
|
|
func (di *docValueReader) cloneInto(rv *docValueReader) *docValueReader { |
|
if rv == nil { |
|
rv = &docValueReader{} |
|
} |
|
|
|
rv.field = di.field |
|
rv.curChunkNum = math.MaxUint64 |
|
rv.chunkOffsets = di.chunkOffsets // immutable, so it's sharable |
|
rv.dvDataLoc = di.dvDataLoc |
|
rv.curChunkHeader = rv.curChunkHeader[:0] |
|
rv.curChunkData = nil |
|
rv.uncompressed = rv.uncompressed[:0] |
|
|
|
return rv |
|
} |
|
|
|
func (di *docValueReader) curChunkNumber() uint64 { |
|
return di.curChunkNum |
|
} |
|
|
|
func (s *SegmentBase) loadFieldDocValueReader(field string, |
|
fieldDvLocStart, fieldDvLocEnd uint64) (*docValueReader, error) { |
|
// get the docValue offset for the given fields |
|
if fieldDvLocStart == fieldNotUninverted { |
|
// no docValues found, nothing to do |
|
return nil, nil |
|
} |
|
|
|
// read the number of chunks, and chunk offsets position |
|
var numChunks, chunkOffsetsPosition uint64 |
|
|
|
if fieldDvLocEnd-fieldDvLocStart > 16 { |
|
numChunks = binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-8 : fieldDvLocEnd]) |
|
// read the length of chunk offsets |
|
chunkOffsetsLen := binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-16 : fieldDvLocEnd-8]) |
|
// acquire position of chunk offsets |
|
chunkOffsetsPosition = (fieldDvLocEnd - 16) - chunkOffsetsLen |
|
} else { |
|
return nil, fmt.Errorf("loadFieldDocValueReader: fieldDvLoc too small: %d-%d", fieldDvLocEnd, fieldDvLocStart) |
|
} |
|
|
|
fdvIter := &docValueReader{ |
|
curChunkNum: math.MaxUint64, |
|
field: field, |
|
chunkOffsets: make([]uint64, int(numChunks)), |
|
} |
|
|
|
// read the chunk offsets |
|
var offset uint64 |
|
for i := 0; i < int(numChunks); i++ { |
|
loc, read := binary.Uvarint(s.mem[chunkOffsetsPosition+offset : chunkOffsetsPosition+offset+binary.MaxVarintLen64]) |
|
if read <= 0 { |
|
return nil, fmt.Errorf("corrupted chunk offset during segment load") |
|
} |
|
fdvIter.chunkOffsets[i] = loc |
|
offset += uint64(read) |
|
} |
|
|
|
// set the data offset |
|
fdvIter.dvDataLoc = fieldDvLocStart |
|
|
|
return fdvIter, nil |
|
} |
|
|
|
func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error { |
|
// advance to the chunk where the docValues |
|
// reside for the given docNum |
|
destChunkDataLoc, curChunkEnd := di.dvDataLoc, di.dvDataLoc |
|
start, end := readChunkBoundary(int(chunkNumber), di.chunkOffsets) |
|
if start >= end { |
|
di.curChunkHeader = di.curChunkHeader[:0] |
|
di.curChunkData = nil |
|
di.curChunkNum = chunkNumber |
|
di.uncompressed = di.uncompressed[:0] |
|
return nil |
|
} |
|
|
|
destChunkDataLoc += start |
|
curChunkEnd += end |
|
|
|
// read the number of docs reside in the chunk |
|
numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64]) |
|
if read <= 0 { |
|
return fmt.Errorf("failed to read the chunk") |
|
} |
|
chunkMetaLoc := destChunkDataLoc + uint64(read) |
|
|
|
offset := uint64(0) |
|
if cap(di.curChunkHeader) < int(numDocs) { |
|
di.curChunkHeader = make([]MetaData, int(numDocs)) |
|
} else { |
|
di.curChunkHeader = di.curChunkHeader[:int(numDocs)] |
|
} |
|
for i := 0; i < int(numDocs); i++ { |
|
di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) |
|
offset += uint64(read) |
|
di.curChunkHeader[i].DocDvOffset, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64]) |
|
offset += uint64(read) |
|
} |
|
|
|
compressedDataLoc := chunkMetaLoc + offset |
|
dataLength := curChunkEnd - compressedDataLoc |
|
di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength] |
|
di.curChunkNum = chunkNumber |
|
di.uncompressed = di.uncompressed[:0] |
|
return nil |
|
} |
|
|
|
func (di *docValueReader) iterateAllDocValues(s *SegmentBase, visitor docNumTermsVisitor) error { |
|
for i := 0; i < len(di.chunkOffsets); i++ { |
|
err := di.loadDvChunk(uint64(i), s) |
|
if err != nil { |
|
return err |
|
} |
|
if di.curChunkData == nil || len(di.curChunkHeader) == 0 { |
|
continue |
|
} |
|
|
|
// uncompress the already loaded data |
|
uncompressed, err := snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) |
|
if err != nil { |
|
return err |
|
} |
|
di.uncompressed = uncompressed |
|
|
|
start := uint64(0) |
|
for _, entry := range di.curChunkHeader { |
|
err = visitor(entry.DocNum, uncompressed[start:entry.DocDvOffset]) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
start = entry.DocDvOffset |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (di *docValueReader) visitDocValues(docNum uint64, |
|
visitor index.DocValueVisitor) error { |
|
// binary search the term locations for the docNum |
|
start, end := di.getDocValueLocs(docNum) |
|
if start == math.MaxUint64 || end == math.MaxUint64 || start == end { |
|
return nil |
|
} |
|
|
|
var uncompressed []byte |
|
var err error |
|
// use the uncompressed copy if available |
|
if len(di.uncompressed) > 0 { |
|
uncompressed = di.uncompressed |
|
} else { |
|
// uncompress the already loaded data |
|
uncompressed, err = snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData) |
|
if err != nil { |
|
return err |
|
} |
|
di.uncompressed = uncompressed |
|
} |
|
|
|
// pick the terms for the given docNum |
|
uncompressed = uncompressed[start:end] |
|
for { |
|
i := bytes.Index(uncompressed, termSeparatorSplitSlice) |
|
if i < 0 { |
|
break |
|
} |
|
|
|
visitor(di.field, uncompressed[0:i]) |
|
uncompressed = uncompressed[i+1:] |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (di *docValueReader) getDocValueLocs(docNum uint64) (uint64, uint64) { |
|
i := sort.Search(len(di.curChunkHeader), func(i int) bool { |
|
return di.curChunkHeader[i].DocNum >= docNum |
|
}) |
|
if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocNum == docNum { |
|
return ReadDocValueBoundary(i, di.curChunkHeader) |
|
} |
|
return math.MaxUint64, math.MaxUint64 |
|
} |
|
|
|
// VisitDocValues is an implementation of the |
|
// DocValueVisitable interface |
|
func (s *SegmentBase) VisitDocValues(localDocNum uint64, fields []string, |
|
visitor index.DocValueVisitor, dvsIn segment.DocVisitState) ( |
|
segment.DocVisitState, error) { |
|
dvs, ok := dvsIn.(*docVisitState) |
|
if !ok || dvs == nil { |
|
dvs = &docVisitState{} |
|
} else { |
|
if dvs.segment != s { |
|
dvs.segment = s |
|
dvs.dvrs = nil |
|
} |
|
} |
|
|
|
var fieldIDPlus1 uint16 |
|
if dvs.dvrs == nil { |
|
dvs.dvrs = make(map[uint16]*docValueReader, len(fields)) |
|
for _, field := range fields { |
|
if fieldIDPlus1, ok = s.fieldsMap[field]; !ok { |
|
continue |
|
} |
|
fieldID := fieldIDPlus1 - 1 |
|
if dvIter, exists := s.fieldDvReaders[fieldID]; exists && |
|
dvIter != nil { |
|
dvs.dvrs[fieldID] = dvIter.cloneInto(dvs.dvrs[fieldID]) |
|
} |
|
} |
|
} |
|
|
|
// find the chunkNumber where the docValues are stored |
|
docInChunk := localDocNum / uint64(s.chunkFactor) |
|
var dvr *docValueReader |
|
for _, field := range fields { |
|
if fieldIDPlus1, ok = s.fieldsMap[field]; !ok { |
|
continue |
|
} |
|
fieldID := fieldIDPlus1 - 1 |
|
if dvr, ok = dvs.dvrs[fieldID]; ok && dvr != nil { |
|
// check if the chunk is already loaded |
|
if docInChunk != dvr.curChunkNumber() { |
|
err := dvr.loadDvChunk(docInChunk, s) |
|
if err != nil { |
|
return dvs, err |
|
} |
|
} |
|
|
|
_ = dvr.visitDocValues(localDocNum, visitor) |
|
} |
|
} |
|
return dvs, nil |
|
} |
|
|
|
// VisitableDocValueFields returns the list of fields with |
|
// persisted doc value terms ready to be visitable using the |
|
// VisitDocumentFieldTerms method. |
|
func (s *SegmentBase) VisitableDocValueFields() ([]string, error) { |
|
return s.fieldDvNames, nil |
|
}
|
|
|