Платформа ЦРНП "Мирокод" для разработки проектов
https://git.mirocod.ru
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
228 lines
6.1 KiB
228 lines
6.1 KiB
// Copyright 2012, Google Inc. All rights reserved. |
|
// Use of this source code is governed by a BSD-style |
|
// license that can be found in the LICENSE file. |
|
|
|
// Package pools provides functionality to manage and reuse resources |
|
// like connections. |
|
package pools |
|
|
|
import ( |
|
"fmt" |
|
"time" |
|
|
|
"github.com/ngaut/sync2" |
|
) |
|
|
|
var ( |
|
CLOSED_ERR = fmt.Errorf("ResourcePool is closed") |
|
) |
|
|
|
// Factory is a function that can be used to create a resource. |
|
type Factory func() (Resource, error) |
|
|
|
// Every resource needs to suport the Resource interface. |
|
// Thread synchronization between Close() and IsClosed() |
|
// is the responsibility the caller. |
|
type Resource interface { |
|
Close() |
|
} |
|
|
|
// ResourcePool allows you to use a pool of resources. |
|
type ResourcePool struct { |
|
resources chan resourceWrapper |
|
factory Factory |
|
capacity sync2.AtomicInt64 |
|
idleTimeout sync2.AtomicDuration |
|
|
|
// stats |
|
waitCount sync2.AtomicInt64 |
|
waitTime sync2.AtomicDuration |
|
} |
|
|
|
type resourceWrapper struct { |
|
resource Resource |
|
timeUsed time.Time |
|
} |
|
|
|
// NewResourcePool creates a new ResourcePool pool. |
|
// capacity is the initial capacity of the pool. |
|
// maxCap is the maximum capacity. |
|
// If a resource is unused beyond idleTimeout, it's discarded. |
|
// An idleTimeout of 0 means that there is no timeout. |
|
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration) *ResourcePool { |
|
if capacity <= 0 || maxCap <= 0 || capacity > maxCap { |
|
panic(fmt.Errorf("Invalid/out of range capacity")) |
|
} |
|
rp := &ResourcePool{ |
|
resources: make(chan resourceWrapper, maxCap), |
|
factory: factory, |
|
capacity: sync2.AtomicInt64(capacity), |
|
idleTimeout: sync2.AtomicDuration(idleTimeout), |
|
} |
|
for i := 0; i < capacity; i++ { |
|
rp.resources <- resourceWrapper{} |
|
} |
|
return rp |
|
} |
|
|
|
// Close empties the pool calling Close on all its resources. |
|
// You can call Close while there are outstanding resources. |
|
// It waits for all resources to be returned (Put). |
|
// After a Close, Get and TryGet are not allowed. |
|
func (rp *ResourcePool) Close() { |
|
rp.SetCapacity(0) |
|
} |
|
|
|
func (rp *ResourcePool) IsClosed() (closed bool) { |
|
return rp.capacity.Get() == 0 |
|
} |
|
|
|
// Get will return the next available resource. If capacity |
|
// has not been reached, it will create a new one using the factory. Otherwise, |
|
// it will indefinitely wait till the next resource becomes available. |
|
func (rp *ResourcePool) Get() (resource Resource, err error) { |
|
return rp.get(true) |
|
} |
|
|
|
// TryGet will return the next available resource. If none is available, and capacity |
|
// has not been reached, it will create a new one using the factory. Otherwise, |
|
// it will return nil with no error. |
|
func (rp *ResourcePool) TryGet() (resource Resource, err error) { |
|
return rp.get(false) |
|
} |
|
|
|
func (rp *ResourcePool) get(wait bool) (resource Resource, err error) { |
|
// Fetch |
|
var wrapper resourceWrapper |
|
var ok bool |
|
select { |
|
case wrapper, ok = <-rp.resources: |
|
default: |
|
if !wait { |
|
return nil, nil |
|
} |
|
startTime := time.Now() |
|
wrapper, ok = <-rp.resources |
|
rp.recordWait(startTime) |
|
} |
|
if !ok { |
|
return nil, CLOSED_ERR |
|
} |
|
|
|
// Unwrap |
|
timeout := rp.idleTimeout.Get() |
|
if wrapper.resource != nil && timeout > 0 && wrapper.timeUsed.Add(timeout).Sub(time.Now()) < 0 { |
|
wrapper.resource.Close() |
|
wrapper.resource = nil |
|
} |
|
if wrapper.resource == nil { |
|
wrapper.resource, err = rp.factory() |
|
if err != nil { |
|
rp.resources <- resourceWrapper{} |
|
} |
|
} |
|
return wrapper.resource, err |
|
} |
|
|
|
// Put will return a resource to the pool. For every successful Get, |
|
// a corresponding Put is required. If you no longer need a resource, |
|
// you will need to call Put(nil) instead of returning the closed resource. |
|
// The will eventually cause a new resource to be created in its place. |
|
func (rp *ResourcePool) Put(resource Resource) { |
|
var wrapper resourceWrapper |
|
if resource != nil { |
|
wrapper = resourceWrapper{resource, time.Now()} |
|
} |
|
select { |
|
case rp.resources <- wrapper: |
|
default: |
|
panic(fmt.Errorf("Attempt to Put into a full ResourcePool")) |
|
} |
|
} |
|
|
|
// SetCapacity changes the capacity of the pool. |
|
// You can use it to shrink or expand, but not beyond |
|
// the max capacity. If the change requires the pool |
|
// to be shrunk, SetCapacity waits till the necessary |
|
// number of resources are returned to the pool. |
|
// A SetCapacity of 0 is equivalent to closing the ResourcePool. |
|
func (rp *ResourcePool) SetCapacity(capacity int) error { |
|
if capacity < 0 || capacity > cap(rp.resources) { |
|
return fmt.Errorf("capacity %d is out of range", capacity) |
|
} |
|
|
|
// Atomically swap new capacity with old, but only |
|
// if old capacity is non-zero. |
|
var oldcap int |
|
for { |
|
oldcap = int(rp.capacity.Get()) |
|
if oldcap == 0 { |
|
return CLOSED_ERR |
|
} |
|
if oldcap == capacity { |
|
return nil |
|
} |
|
if rp.capacity.CompareAndSwap(int64(oldcap), int64(capacity)) { |
|
break |
|
} |
|
} |
|
|
|
if capacity < oldcap { |
|
for i := 0; i < oldcap-capacity; i++ { |
|
wrapper := <-rp.resources |
|
if wrapper.resource != nil { |
|
wrapper.resource.Close() |
|
} |
|
} |
|
} else { |
|
for i := 0; i < capacity-oldcap; i++ { |
|
rp.resources <- resourceWrapper{} |
|
} |
|
} |
|
if capacity == 0 { |
|
close(rp.resources) |
|
} |
|
return nil |
|
} |
|
|
|
func (rp *ResourcePool) recordWait(start time.Time) { |
|
rp.waitCount.Add(1) |
|
rp.waitTime.Add(time.Now().Sub(start)) |
|
} |
|
|
|
func (rp *ResourcePool) SetIdleTimeout(idleTimeout time.Duration) { |
|
rp.idleTimeout.Set(idleTimeout) |
|
} |
|
|
|
func (rp *ResourcePool) StatsJSON() string { |
|
c, a, mx, wc, wt, it := rp.Stats() |
|
return fmt.Sprintf(`{"Capacity": %v, "Available": %v, "MaxCapacity": %v, "WaitCount": %v, "WaitTime": %v, "IdleTimeout": %v}`, c, a, mx, wc, int64(wt), int64(it)) |
|
} |
|
|
|
func (rp *ResourcePool) Stats() (capacity, available, maxCap, waitCount int64, waitTime, idleTimeout time.Duration) { |
|
return rp.Capacity(), rp.Available(), rp.MaxCap(), rp.WaitCount(), rp.WaitTime(), rp.IdleTimeout() |
|
} |
|
|
|
func (rp *ResourcePool) Capacity() int64 { |
|
return rp.capacity.Get() |
|
} |
|
|
|
func (rp *ResourcePool) Available() int64 { |
|
return int64(len(rp.resources)) |
|
} |
|
|
|
func (rp *ResourcePool) MaxCap() int64 { |
|
return int64(cap(rp.resources)) |
|
} |
|
|
|
func (rp *ResourcePool) WaitCount() int64 { |
|
return rp.waitCount.Get() |
|
} |
|
|
|
func (rp *ResourcePool) WaitTime() time.Duration { |
|
return rp.waitTime.Get() |
|
} |
|
|
|
func (rp *ResourcePool) IdleTimeout() time.Duration { |
|
return rp.idleTimeout.Get() |
|
}
|
|
|