Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
296 lines
8.5 KiB
296 lines
8.5 KiB
// Copyright 2013 The ql Authors. All rights reserved. |
|
// Use of this source code is governed by a BSD-style |
|
// license that can be found in the LICENSES/QL-LICENSE file. |
|
|
|
// Copyright 2015 PingCAP, Inc. |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package tidb |
|
|
|
import ( |
|
"net/http" |
|
"time" |
|
// For pprof |
|
_ "net/http/pprof" |
|
"net/url" |
|
"os" |
|
"strings" |
|
"sync" |
|
|
|
"github.com/juju/errors" |
|
"github.com/ngaut/log" |
|
"github.com/pingcap/tidb/ast" |
|
"github.com/pingcap/tidb/context" |
|
"github.com/pingcap/tidb/domain" |
|
"github.com/pingcap/tidb/executor" |
|
"github.com/pingcap/tidb/kv" |
|
"github.com/pingcap/tidb/parser" |
|
"github.com/pingcap/tidb/sessionctx/autocommit" |
|
"github.com/pingcap/tidb/sessionctx/variable" |
|
"github.com/pingcap/tidb/store/hbase" |
|
"github.com/pingcap/tidb/store/localstore" |
|
"github.com/pingcap/tidb/store/localstore/boltdb" |
|
"github.com/pingcap/tidb/store/localstore/engine" |
|
"github.com/pingcap/tidb/store/localstore/goleveldb" |
|
"github.com/pingcap/tidb/util/types" |
|
) |
|
|
|
// Engine prefix name |
|
const ( |
|
EngineGoLevelDBMemory = "memory://" |
|
EngineGoLevelDBPersistent = "goleveldb://" |
|
EngineBoltDB = "boltdb://" |
|
EngineHBase = "hbase://" |
|
defaultMaxRetries = 30 |
|
retrySleepInterval = 500 * time.Millisecond |
|
) |
|
|
|
type domainMap struct { |
|
domains map[string]*domain.Domain |
|
mu sync.Mutex |
|
} |
|
|
|
func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { |
|
key := store.UUID() |
|
dm.mu.Lock() |
|
defer dm.mu.Unlock() |
|
d = dm.domains[key] |
|
if d != nil { |
|
return |
|
} |
|
|
|
lease := time.Duration(0) |
|
if !localstore.IsLocalStore(store) { |
|
lease = schemaLease |
|
} |
|
d, err = domain.NewDomain(store, lease) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
dm.domains[key] = d |
|
return |
|
} |
|
|
|
var ( |
|
domap = &domainMap{ |
|
domains: map[string]*domain.Domain{}, |
|
} |
|
stores = make(map[string]kv.Driver) |
|
// EnablePprof indicates whether to enable HTTP Pprof or not. |
|
EnablePprof = os.Getenv("TIDB_PPROF") != "0" |
|
// PprofAddr is the pprof url. |
|
PprofAddr = "localhost:8888" |
|
// store.UUID()-> IfBootstrapped |
|
storeBootstrapped = make(map[string]bool) |
|
|
|
// schemaLease is the time for re-updating remote schema. |
|
// In online DDL, we must wait 2 * SchemaLease time to guarantee |
|
// all servers get the neweset schema. |
|
// Default schema lease time is 1 second, you can change it with a proper time, |
|
// but you must know that too little may cause badly performance degradation. |
|
// For production, you should set a big schema lease, like 300s+. |
|
schemaLease = 1 * time.Second |
|
) |
|
|
|
// SetSchemaLease changes the default schema lease time for DDL. |
|
// This function is very dangerous, don't use it if you really know what you do. |
|
// SetSchemaLease only affects not local storage after bootstrapped. |
|
func SetSchemaLease(lease time.Duration) { |
|
schemaLease = lease |
|
} |
|
|
|
// What character set should the server translate a statement to after receiving it? |
|
// For this, the server uses the character_set_connection and collation_connection system variables. |
|
// It converts statements sent by the client from character_set_client to character_set_connection |
|
// (except for string literals that have an introducer such as _latin1 or _utf8). |
|
// collation_connection is important for comparisons of literal strings. |
|
// For comparisons of strings with column values, collation_connection does not matter because columns |
|
// have their own collation, which has a higher collation precedence. |
|
// See: https://dev.mysql.com/doc/refman/5.7/en/charset-connection.html |
|
func getCtxCharsetInfo(ctx context.Context) (string, string) { |
|
sessionVars := variable.GetSessionVars(ctx) |
|
charset := sessionVars.Systems["character_set_connection"] |
|
collation := sessionVars.Systems["collation_connection"] |
|
return charset, collation |
|
} |
|
|
|
// Parse parses a query string to raw ast.StmtNode. |
|
func Parse(ctx context.Context, src string) ([]ast.StmtNode, error) { |
|
log.Debug("compiling", src) |
|
charset, collation := getCtxCharsetInfo(ctx) |
|
stmts, err := parser.Parse(src, charset, collation) |
|
if err != nil { |
|
log.Warnf("compiling %s, error: %v", src, err) |
|
return nil, errors.Trace(err) |
|
} |
|
return stmts, nil |
|
} |
|
|
|
// Compile is safe for concurrent use by multiple goroutines. |
|
func Compile(ctx context.Context, rawStmt ast.StmtNode) (ast.Statement, error) { |
|
compiler := &executor.Compiler{} |
|
st, err := compiler.Compile(ctx, rawStmt) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
return st, nil |
|
} |
|
|
|
func runStmt(ctx context.Context, s ast.Statement, args ...interface{}) (ast.RecordSet, error) { |
|
var err error |
|
var rs ast.RecordSet |
|
// before every execution, we must clear affectedrows. |
|
variable.GetSessionVars(ctx).SetAffectedRows(0) |
|
if s.IsDDL() { |
|
err = ctx.FinishTxn(false) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
} |
|
rs, err = s.Exec(ctx) |
|
// All the history should be added here. |
|
se := ctx.(*session) |
|
se.history.add(0, s) |
|
// MySQL DDL should be auto-commit |
|
if s.IsDDL() || autocommit.ShouldAutocommit(ctx) { |
|
if err != nil { |
|
ctx.FinishTxn(true) |
|
} else { |
|
err = ctx.FinishTxn(false) |
|
} |
|
} |
|
return rs, errors.Trace(err) |
|
} |
|
|
|
// GetRows gets all the rows from a RecordSet. |
|
func GetRows(rs ast.RecordSet) ([][]types.Datum, error) { |
|
if rs == nil { |
|
return nil, nil |
|
} |
|
var rows [][]types.Datum |
|
defer rs.Close() |
|
// Negative limit means no limit. |
|
for { |
|
row, err := rs.Next() |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
if row == nil { |
|
break |
|
} |
|
rows = append(rows, row.Data) |
|
} |
|
return rows, nil |
|
} |
|
|
|
// RegisterStore registers a kv storage with unique name and its associated Driver. |
|
func RegisterStore(name string, driver kv.Driver) error { |
|
name = strings.ToLower(name) |
|
|
|
if _, ok := stores[name]; ok { |
|
return errors.Errorf("%s is already registered", name) |
|
} |
|
|
|
stores[name] = driver |
|
return nil |
|
} |
|
|
|
// RegisterLocalStore registers a local kv storage with unique name and its associated engine Driver. |
|
func RegisterLocalStore(name string, driver engine.Driver) error { |
|
d := localstore.Driver{Driver: driver} |
|
return RegisterStore(name, d) |
|
} |
|
|
|
// NewStore creates a kv Storage with path. |
|
// |
|
// The path must be a URL format 'engine://path?params' like the one for |
|
// tidb.Open() but with the dbname cut off. |
|
// Examples: |
|
// goleveldb://relative/path |
|
// boltdb:///absolute/path |
|
// hbase://zk1,zk2,zk3/hbasetbl?tso=127.0.0.1:1234 |
|
// |
|
// The engine should be registered before creating storage. |
|
func NewStore(path string) (kv.Storage, error) { |
|
return newStoreWithRetry(path, defaultMaxRetries) |
|
} |
|
|
|
func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) { |
|
url, err := url.Parse(path) |
|
if err != nil { |
|
return nil, errors.Trace(err) |
|
} |
|
|
|
name := strings.ToLower(url.Scheme) |
|
d, ok := stores[name] |
|
if !ok { |
|
return nil, errors.Errorf("invalid uri format, storage %s is not registered", name) |
|
} |
|
|
|
var s kv.Storage |
|
for i := 1; i <= maxRetries; i++ { |
|
s, err = d.Open(path) |
|
if err == nil || !kv.IsRetryableError(err) { |
|
break |
|
} |
|
sleepTime := time.Duration(uint64(retrySleepInterval) * uint64(i)) |
|
log.Warnf("Waiting store to get ready, sleep %v and try again...", sleepTime) |
|
time.Sleep(sleepTime) |
|
} |
|
return s, errors.Trace(err) |
|
} |
|
|
|
var queryStmtTable = []string{"explain", "select", "show", "execute", "describe", "desc", "admin"} |
|
|
|
func trimSQL(sql string) string { |
|
// Trim space. |
|
sql = strings.TrimSpace(sql) |
|
// Trim leading /*comment*/ |
|
// There may be multiple comments |
|
for strings.HasPrefix(sql, "/*") { |
|
i := strings.Index(sql, "*/") |
|
if i != -1 && i < len(sql)+1 { |
|
sql = sql[i+2:] |
|
sql = strings.TrimSpace(sql) |
|
continue |
|
} |
|
break |
|
} |
|
// Trim leading '('. For `(select 1);` is also a query. |
|
return strings.TrimLeft(sql, "( ") |
|
} |
|
|
|
// IsQuery checks if a sql statement is a query statement. |
|
func IsQuery(sql string) bool { |
|
sqlText := strings.ToLower(trimSQL(sql)) |
|
for _, key := range queryStmtTable { |
|
if strings.HasPrefix(sqlText, key) { |
|
return true |
|
} |
|
} |
|
|
|
return false |
|
} |
|
|
|
func init() { |
|
// Register default memory and goleveldb storage |
|
RegisterLocalStore("memory", goleveldb.MemoryDriver{}) |
|
RegisterLocalStore("goleveldb", goleveldb.Driver{}) |
|
RegisterLocalStore("boltdb", boltdb.Driver{}) |
|
RegisterStore("hbase", hbasekv.Driver{}) |
|
|
|
// start pprof handlers |
|
if EnablePprof { |
|
go http.ListenAndServe(PprofAddr, nil) |
|
} |
|
}
|
|
|