From 36c66303dff79ae6b6e9a2023901d23b2ce9683a Mon Sep 17 00:00:00 2001
From: zeripath <art27@cantab.net>
Date: Sun, 6 Feb 2022 06:55:44 +0000
Subject: [PATCH] Only attempt to flush queue if the underlying worker pool is
 not finished (#18593) (#18620)

* Only attempt to flush queue if the underlying worker pool is not finished (#18593)

Backport #18593

There is a possible race whereby a worker pool could be cancelled but yet the
underlying queue is not empty. This will lead to flush-all cycling because it
cannot empty the pool.

* On shutdown of Persistant Channel Queues close datachan and empty

Partial Backport #18415

Although we attempt to empty the datachan in queues - due to
races we are better off just closing the channel and forcibly emptying
it in shutdown.

Fix #18618

Signed-off-by: Andrew Thornton <art27@cantab.net>

* Move zero workers warning to debug

Fix #18617

Signed-off-by: Andrew Thornton <art27@cantab.net>

* Update modules/queue/manager.go

Co-authored-by: Gusted <williamzijl7@hotmail.com>

* Update modules/queue/manager.go

Co-authored-by: Gusted <williamzijl7@hotmail.com>

Co-authored-by: Gusted <williamzijl7@hotmail.com>
---
 modules/queue/manager.go            | 15 +++++++++++++--
 modules/queue/queue_disk_channel.go | 16 +++++++---------
 modules/queue/workerpool.go         | 16 ++++++++++++----
 3 files changed, 32 insertions(+), 15 deletions(-)

diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index 23e96155a9..310f3cd4e1 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -72,6 +72,8 @@ type ManagedPool interface {
 	BoostWorkers() int
 	// SetPoolSettings sets the user updatable settings for the pool
 	SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
+	// Done returns a channel that will be closed when the Pool's baseCtx is closed
+	Done() <-chan struct{}
 }
 
 // ManagedQueueList implements the sort.Interface
@@ -141,7 +143,6 @@ func (m *Manager) Remove(qid int64) {
 	delete(m.Queues, qid)
 	m.mutex.Unlock()
 	log.Trace("Queue Manager removed: QID: %d", qid)
-
 }
 
 // GetManagedQueue by qid
@@ -193,6 +194,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
 				wg.Done()
 				continue
 			}
+
+			if pool, ok := mq.Managed.(ManagedPool); ok {
+				// No point into flushing pools when their base's ctx is already done.
+				select {
+				case <-pool.Done():
+					wg.Done()
+					continue
+				default:
+				}
+			}
+
 			allEmpty = false
 			if flushable, ok := mq.Managed.(Flushable); ok {
 				log.Debug("Flushing (flushable) queue: %s", mq.Name)
@@ -225,7 +237,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
 		wg.Wait()
 	}
 	return nil
-
 }
 
 // ManagedQueues returns the managed queues
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index c3a1c5781e..72f330670a 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
 		q.internal.(*LevelQueue).Shutdown()
 		GetManager().Remove(q.internal.(*LevelQueue).qid)
 	}
-
 }
 
 // Flush flushes the queue and blocks till the queue is empty
@@ -252,14 +251,13 @@ func (q *PersistableChannelQueue) Shutdown() {
 	q.channelQueue.Wait()
 	q.internal.(*LevelQueue).Wait()
 	// Redirect all remaining data in the chan to the internal channel
-	go func() {
-		log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
-		for data := range q.channelQueue.dataChan {
-			_ = q.internal.Push(data)
-			atomic.AddInt64(&q.channelQueue.numInQueue, -1)
-		}
-		log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
-	}()
+	close(q.channelQueue.dataChan)
+	log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
+	for data := range q.channelQueue.dataChan {
+		_ = q.internal.Push(data)
+		atomic.AddInt64(&q.channelQueue.numInQueue, -1)
+	}
+	log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
 
 	log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
 }
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 0176e2e0b2..dc6ff3b633 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -65,6 +65,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
 	return pool
 }
 
+// Done returns when this worker pool's base context has been cancelled
+func (p *WorkerPool) Done() <-chan struct{} {
+	return p.baseCtx.Done()
+}
+
 // Push pushes the data to the internal channel
 func (p *WorkerPool) Push(data Data) {
 	atomic.AddInt64(&p.numInQueue, 1)
@@ -90,7 +95,7 @@ func (p *WorkerPool) zeroBoost() {
 		boost = p.maxNumberOfWorkers - p.numberOfWorkers
 	}
 	if mq != nil {
-		log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
+		log.Debug("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
 
 		start := time.Now()
 		pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
@@ -98,7 +103,7 @@ func (p *WorkerPool) zeroBoost() {
 			mq.RemoveWorkers(pid)
 		}
 	} else {
-		log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
+		log.Debug("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
 	}
 	p.lock.Unlock()
 	p.addWorkers(ctx, cancel, boost)
@@ -326,7 +331,10 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
 	log.Trace("WorkerPool: %d Flush", p.qid)
 	for {
 		select {
-		case data := <-p.dataChan:
+		case data, ok := <-p.dataChan:
+			if !ok {
+				return nil
+			}
 			p.handle(data)
 			atomic.AddInt64(&p.numInQueue, -1)
 		case <-p.baseCtx.Done():
@@ -341,7 +349,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
 
 func (p *WorkerPool) doWork(ctx context.Context) {
 	delay := time.Millisecond * 300
-	var data = make([]Data, 0, p.batchLength)
+	data := make([]Data, 0, p.batchLength)
 	for {
 		select {
 		case <-ctx.Done():