初始提交: Gitea 项目代码
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
backoffBegin = 50 * time.Millisecond
|
||||
backoffUpper = 2 * time.Second
|
||||
)
|
||||
|
||||
type (
|
||||
backoffFuncRetErr[T any] func() (retry bool, ret T, err error)
|
||||
backoffFuncErr func() (retry bool, err error)
|
||||
)
|
||||
|
||||
func mockBackoffDuration(d time.Duration) func() {
|
||||
oldBegin, oldUpper := backoffBegin, backoffUpper
|
||||
backoffBegin, backoffUpper = d, d
|
||||
return func() {
|
||||
backoffBegin, backoffUpper = oldBegin, oldUpper
|
||||
}
|
||||
}
|
||||
|
||||
func backoffRetErr[T any](ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncRetErr[T]) (ret T, err error) {
|
||||
d := begin
|
||||
for {
|
||||
// check whether the context has been cancelled or has reached the deadline, return early
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ret, ctx.Err()
|
||||
case <-end:
|
||||
return ret, context.DeadlineExceeded
|
||||
default:
|
||||
}
|
||||
|
||||
// call the target function
|
||||
retry, ret, err := fn()
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
if !retry {
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// wait for a while before retrying, and also respect the context & deadline
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ret, ctx.Err()
|
||||
case <-time.After(d):
|
||||
d *= 2
|
||||
if d > upper {
|
||||
d = upper
|
||||
}
|
||||
case <-end:
|
||||
return ret, context.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func backoffErr(ctx context.Context, begin, upper time.Duration, end <-chan time.Time, fn backoffFuncErr) error {
|
||||
_, err := backoffRetErr(ctx, begin, upper, end, func() (retry bool, ret any, err error) {
|
||||
retry, err = fn()
|
||||
return retry, nil, err
|
||||
})
|
||||
return err
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
var pushBlockTime = 5 * time.Second
|
||||
|
||||
type baseQueue interface {
|
||||
PushItem(ctx context.Context, data []byte) error
|
||||
PopItem(ctx context.Context) ([]byte, error)
|
||||
HasItem(ctx context.Context, data []byte) (bool, error)
|
||||
Len(ctx context.Context) (int, error)
|
||||
Close() error
|
||||
RemoveAll(ctx context.Context) error
|
||||
}
|
||||
|
||||
func popItemByChan(ctx context.Context, popItemFn func(ctx context.Context) ([]byte, error)) (chanItem chan []byte, chanErr chan error) {
|
||||
chanItem = make(chan []byte)
|
||||
chanErr = make(chan error)
|
||||
go func() {
|
||||
for {
|
||||
it, err := popItemFn(ctx)
|
||||
if err != nil {
|
||||
close(chanItem)
|
||||
chanErr <- err
|
||||
return
|
||||
}
|
||||
if it == nil {
|
||||
close(chanItem)
|
||||
close(chanErr)
|
||||
return
|
||||
}
|
||||
chanItem <- it
|
||||
}
|
||||
}()
|
||||
return chanItem, chanErr
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.dev/modules/container"
|
||||
)
|
||||
|
||||
var errChannelClosed = errors.New("channel is closed")
|
||||
|
||||
type baseChannel struct {
|
||||
c chan []byte
|
||||
set container.Set[string]
|
||||
mu sync.Mutex
|
||||
|
||||
isUnique bool
|
||||
}
|
||||
|
||||
var _ baseQueue = (*baseChannel)(nil)
|
||||
|
||||
func newBaseChannelGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
||||
q := &baseChannel{c: make(chan []byte, cfg.Length), isUnique: unique}
|
||||
if unique {
|
||||
q.set = container.Set[string]{}
|
||||
}
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func newBaseChannelSimple(cfg *BaseConfig) (baseQueue, error) {
|
||||
return newBaseChannelGeneric(cfg, false)
|
||||
}
|
||||
|
||||
func newBaseChannelUnique(cfg *BaseConfig) (baseQueue, error) {
|
||||
return newBaseChannelGeneric(cfg, true)
|
||||
}
|
||||
|
||||
func (q *baseChannel) PushItem(ctx context.Context, data []byte) error {
|
||||
if q.c == nil {
|
||||
return errChannelClosed
|
||||
}
|
||||
|
||||
if q.isUnique {
|
||||
q.mu.Lock()
|
||||
has := q.set.Contains(string(data))
|
||||
q.mu.Unlock()
|
||||
if has {
|
||||
return ErrAlreadyInQueue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case q.c <- data:
|
||||
if q.isUnique {
|
||||
q.mu.Lock()
|
||||
q.set.Add(string(data))
|
||||
q.mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
case <-time.After(pushBlockTime):
|
||||
return context.DeadlineExceeded
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (q *baseChannel) PopItem(ctx context.Context) ([]byte, error) {
|
||||
select {
|
||||
case data, ok := <-q.c:
|
||||
if !ok {
|
||||
return nil, errChannelClosed
|
||||
}
|
||||
q.mu.Lock()
|
||||
q.set.Remove(string(data))
|
||||
q.mu.Unlock()
|
||||
return data, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (q *baseChannel) HasItem(ctx context.Context, data []byte) (bool, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if !q.isUnique {
|
||||
return false, nil
|
||||
}
|
||||
return q.set.Contains(string(data)), nil
|
||||
}
|
||||
|
||||
func (q *baseChannel) Len(ctx context.Context) (int, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
if q.c == nil {
|
||||
return 0, errChannelClosed
|
||||
}
|
||||
|
||||
return len(q.c), nil
|
||||
}
|
||||
|
||||
func (q *baseChannel) Close() error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
close(q.c)
|
||||
if q.isUnique {
|
||||
q.set = container.Set[string]{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *baseChannel) RemoveAll(ctx context.Context) error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
for len(q.c) > 0 {
|
||||
<-q.c
|
||||
}
|
||||
|
||||
if q.isUnique {
|
||||
q.set = container.Set[string]{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestBaseChannel(t *testing.T) {
|
||||
testQueueBasic(t, newBaseChannelSimple, &BaseConfig{ManagedName: "baseChannel", Length: 10}, false)
|
||||
testQueueBasic(t, newBaseChannelUnique, &BaseConfig{ManagedName: "baseChannel", Length: 10}, true)
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import "context"
|
||||
|
||||
type baseDummy struct{}
|
||||
|
||||
var _ baseQueue = (*baseDummy)(nil)
|
||||
|
||||
func newBaseDummy(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
||||
return &baseDummy{}, nil
|
||||
}
|
||||
|
||||
func (q *baseDummy) PushItem(ctx context.Context, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *baseDummy) PopItem(ctx context.Context) ([]byte, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (q *baseDummy) Len(ctx context.Context) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (q *baseDummy) HasItem(ctx context.Context, data []byte) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (q *baseDummy) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *baseDummy) RemoveAll(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"gitea.dev/modules/nosql"
|
||||
"gitea.dev/modules/queue/lqinternal"
|
||||
|
||||
"gitea.com/lunny/levelqueue"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
type baseLevelQueue struct {
|
||||
internal atomic.Pointer[levelqueue.Queue]
|
||||
|
||||
conn string
|
||||
cfg *BaseConfig
|
||||
db *leveldb.DB
|
||||
}
|
||||
|
||||
var _ baseQueue = (*baseLevelQueue)(nil)
|
||||
|
||||
func newBaseLevelQueueGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
||||
if unique {
|
||||
return newBaseLevelQueueUnique(cfg)
|
||||
}
|
||||
return newBaseLevelQueueSimple(cfg)
|
||||
}
|
||||
|
||||
func newBaseLevelQueueSimple(cfg *BaseConfig) (baseQueue, error) {
|
||||
conn, db, err := prepareLevelDB(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q := &baseLevelQueue{conn: conn, cfg: cfg, db: db}
|
||||
lq, err := levelqueue.NewQueue(db, []byte(cfg.QueueFullName), false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q.internal.Store(lq)
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (q *baseLevelQueue) PushItem(ctx context.Context, data []byte) error {
|
||||
c := baseLevelQueueCommon(q.cfg, nil, func() baseLevelQueuePushPoper { return q.internal.Load() })
|
||||
return c.PushItem(ctx, data)
|
||||
}
|
||||
|
||||
func (q *baseLevelQueue) PopItem(ctx context.Context) ([]byte, error) {
|
||||
c := baseLevelQueueCommon(q.cfg, nil, func() baseLevelQueuePushPoper { return q.internal.Load() })
|
||||
return c.PopItem(ctx)
|
||||
}
|
||||
|
||||
func (q *baseLevelQueue) HasItem(ctx context.Context, data []byte) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (q *baseLevelQueue) Len(ctx context.Context) (int, error) {
|
||||
return int(q.internal.Load().Len()), nil
|
||||
}
|
||||
|
||||
func (q *baseLevelQueue) Close() error {
|
||||
err := q.internal.Load().Close()
|
||||
_ = nosql.GetManager().CloseLevelDB(q.conn)
|
||||
q.db = nil // the db is not managed by us, it's managed by the nosql manager
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *baseLevelQueue) RemoveAll(ctx context.Context) error {
|
||||
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName))
|
||||
lq, err := levelqueue.NewQueue(q.db, []byte(q.cfg.QueueFullName), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
old := q.internal.Load()
|
||||
q.internal.Store(lq)
|
||||
_ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.dev/modules/nosql"
|
||||
|
||||
"gitea.com/lunny/levelqueue"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
// baseLevelQueuePushPoper is the common interface for levelqueue.Queue and levelqueue.UniqueQueue
|
||||
type baseLevelQueuePushPoper interface {
|
||||
RPush(data []byte) error
|
||||
LPop() ([]byte, error)
|
||||
Len() int64
|
||||
}
|
||||
|
||||
type baseLevelQueueCommonImpl struct {
|
||||
length int
|
||||
internalFunc func() baseLevelQueuePushPoper
|
||||
mu *sync.Mutex
|
||||
}
|
||||
|
||||
func (q *baseLevelQueueCommonImpl) PushItem(ctx context.Context, data []byte) error {
|
||||
return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
|
||||
if q.mu != nil {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
}
|
||||
|
||||
cnt := int(q.internalFunc().Len())
|
||||
if cnt >= q.length {
|
||||
return true, nil
|
||||
}
|
||||
retry, err = false, q.internalFunc().RPush(data)
|
||||
if err == levelqueue.ErrAlreadyInQueue {
|
||||
err = ErrAlreadyInQueue
|
||||
}
|
||||
return retry, err
|
||||
})
|
||||
}
|
||||
|
||||
func (q *baseLevelQueueCommonImpl) PopItem(ctx context.Context) ([]byte, error) {
|
||||
return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
|
||||
if q.mu != nil {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
}
|
||||
|
||||
data, err = q.internalFunc().LPop()
|
||||
if err == levelqueue.ErrNotFound {
|
||||
return true, nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
return false, data, nil
|
||||
})
|
||||
}
|
||||
|
||||
func baseLevelQueueCommon(cfg *BaseConfig, mu *sync.Mutex, internalFunc func() baseLevelQueuePushPoper) *baseLevelQueueCommonImpl {
|
||||
return &baseLevelQueueCommonImpl{length: cfg.Length, mu: mu, internalFunc: internalFunc}
|
||||
}
|
||||
|
||||
func prepareLevelDB(cfg *BaseConfig) (conn string, db *leveldb.DB, err error) {
|
||||
if cfg.ConnStr == "" { // use data dir as conn str
|
||||
if !filepath.IsAbs(cfg.DataFullDir) {
|
||||
return "", nil, fmt.Errorf("invalid leveldb data dir (not absolute): %q", cfg.DataFullDir)
|
||||
}
|
||||
conn = cfg.DataFullDir
|
||||
} else {
|
||||
if !strings.HasPrefix(cfg.ConnStr, "leveldb://") {
|
||||
return "", nil, fmt.Errorf("invalid leveldb connection string: %q", cfg.ConnStr)
|
||||
}
|
||||
conn = cfg.ConnStr
|
||||
}
|
||||
for range 10 {
|
||||
if db, err = nosql.GetManager().GetLevelDB(conn); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
return conn, db, err
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"gitea.dev/modules/queue/lqinternal"
|
||||
"gitea.dev/modules/setting"
|
||||
|
||||
"gitea.com/lunny/levelqueue"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
func TestBaseLevelDB(t *testing.T) {
|
||||
_, err := newBaseLevelQueueGeneric(&BaseConfig{ConnStr: "redis://"}, false)
|
||||
assert.ErrorContains(t, err, "invalid leveldb connection string")
|
||||
|
||||
_, err = newBaseLevelQueueGeneric(&BaseConfig{DataFullDir: "relative"}, false)
|
||||
assert.ErrorContains(t, err, "invalid leveldb data dir")
|
||||
|
||||
testQueueBasic(t, newBaseLevelQueueSimple, toBaseConfig("baseLevelQueue", setting.QueueSettings{Datadir: t.TempDir() + "/queue-test", Length: 10}), false)
|
||||
testQueueBasic(t, newBaseLevelQueueUnique, toBaseConfig("baseLevelQueueUnique", setting.QueueSettings{ConnStr: "leveldb://" + t.TempDir() + "/queue-test", Length: 10}), true)
|
||||
}
|
||||
|
||||
func TestCorruptedLevelQueue(t *testing.T) {
|
||||
// sometimes the levelqueue could be in a corrupted state, this test is to make sure it can recover from it
|
||||
dbDir := t.TempDir() + "/levelqueue-test"
|
||||
db, err := leveldb.OpenFile(dbDir, nil)
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
assert.NoError(t, db.Put([]byte("other-key"), []byte("other-value"), nil))
|
||||
|
||||
nameQueuePrefix := []byte("queue_name")
|
||||
nameSetPrefix := []byte("set_name")
|
||||
lq, err := levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, lq.RPush([]byte("item-1")))
|
||||
|
||||
itemKey := lqinternal.QueueItemKeyBytes(nameQueuePrefix, 1)
|
||||
itemValue, err := db.Get(itemKey, nil)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []byte("item-1"), itemValue)
|
||||
|
||||
// there should be 5 keys in db: queue low, queue high, 1 queue item, 1 set item, and "other-key"
|
||||
keys := lqinternal.ListLevelQueueKeys(db)
|
||||
assert.Len(t, keys, 5)
|
||||
|
||||
// delete the queue item key, to corrupt the queue
|
||||
assert.NoError(t, db.Delete(itemKey, nil))
|
||||
// now the queue is corrupted, it never works again
|
||||
_, err = lq.LPop()
|
||||
assert.ErrorIs(t, err, levelqueue.ErrNotFound)
|
||||
assert.NoError(t, lq.Close())
|
||||
|
||||
// remove all the queue related keys to reset the queue
|
||||
lqinternal.RemoveLevelQueueKeys(db, nameQueuePrefix)
|
||||
lqinternal.RemoveLevelQueueKeys(db, nameSetPrefix)
|
||||
// now there should be only 1 key in db: "other-key"
|
||||
keys = lqinternal.ListLevelQueueKeys(db)
|
||||
assert.Len(t, keys, 1)
|
||||
assert.Equal(t, []byte("other-key"), keys[0])
|
||||
|
||||
// re-create a queue from db
|
||||
lq, err = levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, lq.RPush([]byte("item-new-1")))
|
||||
// now the queue works again
|
||||
itemValue, err = lq.LPop()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []byte("item-new-1"), itemValue)
|
||||
assert.NoError(t, lq.Close())
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"gitea.dev/modules/nosql"
|
||||
"gitea.dev/modules/queue/lqinternal"
|
||||
|
||||
"gitea.com/lunny/levelqueue"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
type baseLevelQueueUnique struct {
|
||||
internal atomic.Pointer[levelqueue.UniqueQueue]
|
||||
|
||||
conn string
|
||||
cfg *BaseConfig
|
||||
db *leveldb.DB
|
||||
|
||||
mu sync.Mutex // the levelqueue.UniqueQueue is not thread-safe, there is no mutex protecting the underlying queue&set together
|
||||
}
|
||||
|
||||
var _ baseQueue = (*baseLevelQueueUnique)(nil)
|
||||
|
||||
func newBaseLevelQueueUnique(cfg *BaseConfig) (baseQueue, error) {
|
||||
conn, db, err := prepareLevelDB(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q := &baseLevelQueueUnique{conn: conn, cfg: cfg, db: db}
|
||||
lq, err := levelqueue.NewUniqueQueue(db, []byte(cfg.QueueFullName), []byte(cfg.SetFullName), false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q.internal.Store(lq)
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (q *baseLevelQueueUnique) PushItem(ctx context.Context, data []byte) error {
|
||||
c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() })
|
||||
return c.PushItem(ctx, data)
|
||||
}
|
||||
|
||||
func (q *baseLevelQueueUnique) PopItem(ctx context.Context) ([]byte, error) {
|
||||
c := baseLevelQueueCommon(q.cfg, &q.mu, func() baseLevelQueuePushPoper { return q.internal.Load() })
|
||||
return c.PopItem(ctx)
|
||||
}
|
||||
|
||||
func (q *baseLevelQueueUnique) HasItem(ctx context.Context, data []byte) (bool, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
return q.internal.Load().Has(data)
|
||||
}
|
||||
|
||||
func (q *baseLevelQueueUnique) Len(ctx context.Context) (int, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
return int(q.internal.Load().Len()), nil
|
||||
}
|
||||
|
||||
func (q *baseLevelQueueUnique) Close() error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
err := q.internal.Load().Close()
|
||||
q.db = nil // the db is not managed by us, it's managed by the nosql manager
|
||||
_ = nosql.GetManager().CloseLevelDB(q.conn)
|
||||
return err
|
||||
}
|
||||
|
||||
func (q *baseLevelQueueUnique) RemoveAll(ctx context.Context) error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.QueueFullName))
|
||||
lqinternal.RemoveLevelQueueKeys(q.db, []byte(q.cfg.SetFullName))
|
||||
lq, err := levelqueue.NewUniqueQueue(q.db, []byte(q.cfg.QueueFullName), []byte(q.cfg.SetFullName), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
old := q.internal.Load()
|
||||
q.internal.Store(lq)
|
||||
_ = old.Close() // Not ideal for concurrency. Luckily, the levelqueue only sets its db=nil because it doesn't manage the db, so far so good
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,138 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.dev/modules/graceful"
|
||||
"gitea.dev/modules/log"
|
||||
"gitea.dev/modules/nosql"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type baseRedis struct {
|
||||
client redis.UniversalClient
|
||||
isUnique bool
|
||||
cfg *BaseConfig
|
||||
|
||||
mu sync.Mutex // the old implementation is not thread-safe, the queue operation and set operation should be protected together
|
||||
}
|
||||
|
||||
var _ baseQueue = (*baseRedis)(nil)
|
||||
|
||||
func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
||||
client := nosql.GetManager().GetRedisClient(cfg.ConnStr)
|
||||
|
||||
var err error
|
||||
for range 10 {
|
||||
err = client.Ping(graceful.GetManager().ShutdownContext()).Err()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
log.Warn("Redis is not ready, waiting for 1 second to retry: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &baseRedis{cfg: cfg, client: client, isUnique: unique}, nil
|
||||
}
|
||||
|
||||
func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) {
|
||||
return newBaseRedisGeneric(cfg, false)
|
||||
}
|
||||
|
||||
func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) {
|
||||
return newBaseRedisGeneric(cfg, true)
|
||||
}
|
||||
|
||||
func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
|
||||
return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if int(cnt) >= q.cfg.Length {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if q.isUnique {
|
||||
added, err := q.client.SAdd(ctx, q.cfg.SetFullName, data).Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if added == 0 {
|
||||
return false, ErrAlreadyInQueue
|
||||
}
|
||||
}
|
||||
return false, q.client.RPush(ctx, q.cfg.QueueFullName, data).Err()
|
||||
})
|
||||
}
|
||||
|
||||
func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) {
|
||||
return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
data, err = q.client.LPop(ctx, q.cfg.QueueFullName).Bytes()
|
||||
if err == redis.Nil {
|
||||
return true, nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return true, nil, nil
|
||||
}
|
||||
if q.isUnique {
|
||||
// the data has been popped, even if there is any error we can't do anything
|
||||
_ = q.client.SRem(ctx, q.cfg.SetFullName, data).Err()
|
||||
}
|
||||
return false, data, err
|
||||
})
|
||||
}
|
||||
|
||||
func (q *baseRedis) HasItem(ctx context.Context, data []byte) (bool, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if !q.isUnique {
|
||||
return false, nil
|
||||
}
|
||||
return q.client.SIsMember(ctx, q.cfg.SetFullName, data).Result()
|
||||
}
|
||||
|
||||
func (q *baseRedis) Len(ctx context.Context) (int, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
|
||||
return int(cnt), err
|
||||
}
|
||||
|
||||
func (q *baseRedis) Close() error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
return q.client.Close()
|
||||
}
|
||||
|
||||
func (q *baseRedis) RemoveAll(ctx context.Context) error {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
c1 := q.client.Del(ctx, q.cfg.QueueFullName)
|
||||
// the "set" must be cleared after the "list" because there is no transaction.
|
||||
// it's better to have duplicate items than losing items.
|
||||
c2 := q.client.Del(ctx, q.cfg.SetFullName)
|
||||
if c1.Err() != nil {
|
||||
return c1.Err()
|
||||
}
|
||||
if c2.Err() != nil {
|
||||
return c2.Err()
|
||||
}
|
||||
return nil // actually, checking errors doesn't make sense here because the state could be out-of-sync
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.dev/modules/nosql"
|
||||
"gitea.dev/modules/setting"
|
||||
"gitea.dev/modules/test"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func waitRedisReady(conn string, dur time.Duration) (ready bool) {
|
||||
ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
for t := time.Now(); ; time.Sleep(50 * time.Millisecond) {
|
||||
ret := nosql.GetManager().GetRedisClient(conn).Ping(ctxTimed)
|
||||
if ret.Err() == nil {
|
||||
return true
|
||||
}
|
||||
if time.Since(t) > dur {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func redisServerCmd(t *testing.T) *exec.Cmd {
|
||||
redisServerProg, err := exec.LookPath("redis-server")
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
c := &exec.Cmd{
|
||||
Path: redisServerProg,
|
||||
Args: []string{redisServerProg, "--bind", "127.0.0.1", "--port", "6379"},
|
||||
Dir: t.TempDir(),
|
||||
Stdin: os.Stdin,
|
||||
Stdout: os.Stdout,
|
||||
Stderr: os.Stderr,
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func TestBaseRedis(t *testing.T) {
|
||||
var redisServer *exec.Cmd
|
||||
defer func() {
|
||||
if redisServer != nil {
|
||||
_ = redisServer.Process.Signal(os.Interrupt)
|
||||
_ = redisServer.Wait()
|
||||
}
|
||||
}()
|
||||
if !waitRedisReady("redis://127.0.0.1:6379/0", 0) {
|
||||
redisServer = redisServerCmd(t)
|
||||
if redisServer == nil && test.AllowSkipExternalService() {
|
||||
t.Skip("redis server command not found, skipped")
|
||||
}
|
||||
require.NotNil(t, redisServer)
|
||||
assert.NoError(t, redisServer.Start())
|
||||
require.True(t, waitRedisReady("redis://127.0.0.1:6379/0", 5*time.Second), "start redis-server")
|
||||
}
|
||||
|
||||
testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", setting.QueueSettings{Length: 10}), false)
|
||||
testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", setting.QueueSettings{Length: 10}), true)
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func testQueueBasic(t *testing.T, newFn func(cfg *BaseConfig) (baseQueue, error), cfg *BaseConfig, isUnique bool) {
|
||||
t.Run(fmt.Sprintf("testQueueBasic-%s-unique:%v", cfg.ManagedName, isUnique), func(t *testing.T) {
|
||||
q, err := newFn(cfg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
ctx := t.Context()
|
||||
_ = q.RemoveAll(ctx)
|
||||
cnt, err := q.Len(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, cnt)
|
||||
|
||||
// push the first item
|
||||
err = q.PushItem(ctx, []byte("foo"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
cnt, err = q.Len(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, cnt)
|
||||
|
||||
// push a duplicate item
|
||||
err = q.PushItem(ctx, []byte("foo"))
|
||||
if !isUnique {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.ErrorIs(t, err, ErrAlreadyInQueue)
|
||||
}
|
||||
|
||||
// check the duplicate item
|
||||
cnt, err = q.Len(ctx)
|
||||
assert.NoError(t, err)
|
||||
has, err := q.HasItem(ctx, []byte("foo"))
|
||||
assert.NoError(t, err)
|
||||
if !isUnique {
|
||||
assert.Equal(t, 2, cnt)
|
||||
assert.False(t, has) // non-unique queues don't check for duplicates
|
||||
} else {
|
||||
assert.Equal(t, 1, cnt)
|
||||
assert.True(t, has)
|
||||
}
|
||||
|
||||
// push another item
|
||||
err = q.PushItem(ctx, []byte("bar"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// pop the first item (and the duplicate if non-unique)
|
||||
it, err := q.PopItem(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "foo", string(it))
|
||||
|
||||
if !isUnique {
|
||||
it, err = q.PopItem(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "foo", string(it))
|
||||
}
|
||||
|
||||
// pop another item
|
||||
it, err = q.PopItem(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "bar", string(it))
|
||||
|
||||
// pop an empty queue (timeout, cancel)
|
||||
ctxTimed, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
|
||||
it, err = q.PopItem(ctxTimed)
|
||||
assert.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
assert.Nil(t, it)
|
||||
cancel()
|
||||
|
||||
ctxTimed, cancel = context.WithTimeout(ctx, 10*time.Millisecond)
|
||||
cancel()
|
||||
it, err = q.PopItem(ctxTimed)
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
assert.Nil(t, it)
|
||||
|
||||
// test blocking push if queue is full
|
||||
for i := 0; i < cfg.Length; i++ {
|
||||
err = q.PushItem(ctx, fmt.Appendf(nil, "item-%d", i))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
ctxTimed, cancel = context.WithTimeout(ctx, 10*time.Millisecond)
|
||||
err = q.PushItem(ctxTimed, []byte("item-full"))
|
||||
assert.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
cancel()
|
||||
|
||||
// test blocking push if queue is full (with custom pushBlockTime)
|
||||
oldPushBlockTime := pushBlockTime
|
||||
timeStart := time.Now()
|
||||
pushBlockTime = 30 * time.Millisecond
|
||||
err = q.PushItem(ctx, []byte("item-full"))
|
||||
assert.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
assert.GreaterOrEqual(t, time.Since(timeStart), pushBlockTime*2/3)
|
||||
pushBlockTime = oldPushBlockTime
|
||||
|
||||
// remove all
|
||||
cnt, err = q.Len(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, cfg.Length, cnt)
|
||||
|
||||
_ = q.RemoveAll(ctx)
|
||||
|
||||
cnt, err = q.Len(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, cnt)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBaseDummy(t *testing.T) {
|
||||
q, err := newBaseDummy(&BaseConfig{}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
ctx := t.Context()
|
||||
assert.NoError(t, q.PushItem(ctx, []byte("foo")))
|
||||
|
||||
cnt, err := q.Len(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, cnt)
|
||||
|
||||
has, err := q.HasItem(ctx, []byte("foo"))
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, has)
|
||||
|
||||
it, err := q.PopItem(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, it)
|
||||
|
||||
assert.NoError(t, q.RemoveAll(ctx))
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"gitea.dev/modules/setting"
|
||||
)
|
||||
|
||||
type BaseConfig struct {
|
||||
ManagedName string
|
||||
DataFullDir string // the caller must prepare an absolute path
|
||||
|
||||
ConnStr string
|
||||
Length int
|
||||
|
||||
QueueFullName, SetFullName string
|
||||
}
|
||||
|
||||
func toBaseConfig(managedName string, queueSetting setting.QueueSettings) *BaseConfig {
|
||||
baseConfig := &BaseConfig{
|
||||
ManagedName: managedName,
|
||||
DataFullDir: queueSetting.Datadir,
|
||||
|
||||
ConnStr: queueSetting.ConnStr,
|
||||
Length: queueSetting.Length,
|
||||
}
|
||||
|
||||
// queue name and set name
|
||||
baseConfig.QueueFullName = managedName + queueSetting.QueueName
|
||||
baseConfig.SetFullName = baseConfig.QueueFullName + queueSetting.SetName
|
||||
if baseConfig.SetFullName == baseConfig.QueueFullName {
|
||||
baseConfig.SetFullName += "_unique"
|
||||
}
|
||||
return baseConfig
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package lqinternal
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
)
|
||||
|
||||
func QueueItemIDBytes(id int64) []byte {
|
||||
buf := make([]byte, 8)
|
||||
binary.PutVarint(buf, id)
|
||||
return buf
|
||||
}
|
||||
|
||||
func QueueItemKeyBytes(prefix []byte, id int64) []byte {
|
||||
key := make([]byte, len(prefix), len(prefix)+1+8)
|
||||
copy(key, prefix)
|
||||
key = append(key, '-')
|
||||
return append(key, QueueItemIDBytes(id)...)
|
||||
}
|
||||
|
||||
func RemoveLevelQueueKeys(db *leveldb.DB, namePrefix []byte) {
|
||||
keyPrefix := make([]byte, len(namePrefix)+1)
|
||||
copy(keyPrefix, namePrefix)
|
||||
keyPrefix[len(namePrefix)] = '-'
|
||||
|
||||
it := db.NewIterator(nil, &opt.ReadOptions{Strict: opt.NoStrict})
|
||||
defer it.Release()
|
||||
for it.Next() {
|
||||
if bytes.HasPrefix(it.Key(), keyPrefix) {
|
||||
_ = db.Delete(it.Key(), nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ListLevelQueueKeys(db *leveldb.DB) (res [][]byte) {
|
||||
it := db.NewIterator(nil, &opt.ReadOptions{Strict: opt.NoStrict})
|
||||
defer it.Release()
|
||||
for it.Next() {
|
||||
res = append(res, it.Key())
|
||||
}
|
||||
return res
|
||||
}
|
||||
@@ -0,0 +1,115 @@
|
||||
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"maps"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.dev/modules/log"
|
||||
"gitea.dev/modules/setting"
|
||||
)
|
||||
|
||||
// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
|
||||
type Manager struct {
|
||||
mu sync.Mutex
|
||||
|
||||
qidCounter int64
|
||||
Queues map[int64]ManagedWorkerPoolQueue
|
||||
}
|
||||
|
||||
type ManagedWorkerPoolQueue interface {
|
||||
GetName() string
|
||||
GetType() string
|
||||
GetItemTypeName() string
|
||||
GetWorkerNumber() int
|
||||
GetWorkerActiveNumber() int
|
||||
GetWorkerMaxNumber() int
|
||||
SetWorkerMaxNumber(num int)
|
||||
GetQueueItemNumber() int
|
||||
|
||||
// FlushWithContext tries to make the handler process all items in the queue synchronously.
|
||||
// It is for testing purpose only. It's not designed to be used in a cluster.
|
||||
// Negative timeout means discarding all items in the queue.
|
||||
FlushWithContext(ctx context.Context, timeout time.Duration) error
|
||||
|
||||
// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
|
||||
RemoveAllItems(ctx context.Context) error
|
||||
}
|
||||
|
||||
var manager *Manager
|
||||
|
||||
func init() {
|
||||
manager = &Manager{
|
||||
Queues: make(map[int64]ManagedWorkerPoolQueue),
|
||||
}
|
||||
}
|
||||
|
||||
func GetManager() *Manager {
|
||||
return manager
|
||||
}
|
||||
|
||||
func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.qidCounter++
|
||||
m.Queues[m.qidCounter] = managed
|
||||
}
|
||||
|
||||
func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.Queues[qid]
|
||||
}
|
||||
|
||||
func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
|
||||
maps.Copy(queues, m.Queues)
|
||||
return queues
|
||||
}
|
||||
|
||||
// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
|
||||
// It is for testing purpose only. It's not designed to be used in a cluster.
|
||||
// Negative timeout means discarding all items in the queue.
|
||||
func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
|
||||
var finalErrors []error
|
||||
qs := m.ManagedQueues()
|
||||
for _, q := range qs {
|
||||
if err := q.FlushWithContext(ctx, timeout); err != nil {
|
||||
finalErrors = append(finalErrors, err)
|
||||
}
|
||||
}
|
||||
return errors.Join(finalErrors...)
|
||||
}
|
||||
|
||||
// CreateSimpleQueue creates a simple queue from global setting config provider by name
|
||||
func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
|
||||
return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false)
|
||||
}
|
||||
|
||||
// CreateUniqueQueue creates a unique queue from global setting config provider by name
|
||||
func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
|
||||
return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true)
|
||||
}
|
||||
|
||||
func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
|
||||
queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
|
||||
if err != nil {
|
||||
log.Error("Failed to get queue settings for %q: %v", name, err)
|
||||
return nil
|
||||
}
|
||||
w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique)
|
||||
if err != nil {
|
||||
log.Error("Failed to create queue %q: %v", name, err)
|
||||
return nil
|
||||
}
|
||||
GetManager().AddManagedQueue(w)
|
||||
return w
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"gitea.dev/modules/setting"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestManager(t *testing.T) {
|
||||
setting.AppDataPath = t.TempDir()
|
||||
|
||||
newQueueFromConfig := func(name, cfg string) (*WorkerPoolQueue[int], error) {
|
||||
cfgProvider, err := setting.NewConfigProviderFromData(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
qs, err := setting.GetQueueSettings(cfgProvider, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newWorkerPoolQueueForTest(name, qs, func(s ...int) (unhandled []int) { return nil }, false)
|
||||
}
|
||||
|
||||
// test invalid CONN_STR
|
||||
_, err := newQueueFromConfig("default", `
|
||||
[queue]
|
||||
DATADIR = temp-dir
|
||||
CONN_STR = redis://
|
||||
`)
|
||||
assert.ErrorContains(t, err, "invalid leveldb connection string")
|
||||
|
||||
// test default config
|
||||
q, err := newQueueFromConfig("default", "")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "default", q.GetName())
|
||||
assert.Equal(t, "level", q.GetType())
|
||||
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/common"), q.baseConfig.DataFullDir)
|
||||
assert.Equal(t, 100000, q.baseConfig.Length)
|
||||
assert.Equal(t, 20, q.batchLength)
|
||||
assert.Empty(t, q.baseConfig.ConnStr)
|
||||
assert.Equal(t, "default_queue", q.baseConfig.QueueFullName)
|
||||
assert.Equal(t, "default_queue_unique", q.baseConfig.SetFullName)
|
||||
assert.NotZero(t, q.GetWorkerMaxNumber())
|
||||
assert.Equal(t, 0, q.GetWorkerNumber())
|
||||
assert.Equal(t, 0, q.GetWorkerActiveNumber())
|
||||
assert.Equal(t, 0, q.GetQueueItemNumber())
|
||||
assert.Equal(t, "int", q.GetItemTypeName())
|
||||
|
||||
// test inherited config
|
||||
cfgProvider, err := setting.NewConfigProviderFromData(`
|
||||
[queue]
|
||||
TYPE = channel
|
||||
DATADIR = queues/dir1
|
||||
LENGTH = 100
|
||||
BATCH_LENGTH = 20
|
||||
CONN_STR = "addrs=127.0.0.1:6379 db=0"
|
||||
QUEUE_NAME = _queue1
|
||||
|
||||
[queue.sub]
|
||||
TYPE = level
|
||||
DATADIR = queues/dir2
|
||||
LENGTH = 102
|
||||
BATCH_LENGTH = 22
|
||||
CONN_STR =
|
||||
QUEUE_NAME = _q2
|
||||
SET_NAME = _u2
|
||||
MAX_WORKERS = 123
|
||||
`)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
q1 := createWorkerPoolQueue[string](t.Context(), "no-such", cfgProvider, nil, false)
|
||||
assert.Equal(t, "no-such", q1.GetName())
|
||||
assert.Equal(t, "dummy", q1.GetType()) // no handler, so it becomes dummy
|
||||
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir1"), q1.baseConfig.DataFullDir)
|
||||
assert.Equal(t, 100, q1.baseConfig.Length)
|
||||
assert.Equal(t, 20, q1.batchLength)
|
||||
assert.Equal(t, "addrs=127.0.0.1:6379 db=0", q1.baseConfig.ConnStr)
|
||||
assert.Equal(t, "no-such_queue1", q1.baseConfig.QueueFullName)
|
||||
assert.Equal(t, "no-such_queue1_unique", q1.baseConfig.SetFullName)
|
||||
assert.NotZero(t, q1.GetWorkerMaxNumber())
|
||||
assert.Equal(t, 0, q1.GetWorkerNumber())
|
||||
assert.Equal(t, 0, q1.GetWorkerActiveNumber())
|
||||
assert.Equal(t, 0, q1.GetQueueItemNumber())
|
||||
assert.Equal(t, "string", q1.GetItemTypeName())
|
||||
qid1 := GetManager().qidCounter
|
||||
|
||||
q2 := createWorkerPoolQueue(t.Context(), "sub", cfgProvider, func(s ...int) (unhandled []int) { return nil }, false)
|
||||
assert.Equal(t, "sub", q2.GetName())
|
||||
assert.Equal(t, "level", q2.GetType())
|
||||
assert.Equal(t, filepath.Join(setting.AppDataPath, "queues/dir2"), q2.baseConfig.DataFullDir)
|
||||
assert.Equal(t, 102, q2.baseConfig.Length)
|
||||
assert.Equal(t, 22, q2.batchLength)
|
||||
assert.Empty(t, q2.baseConfig.ConnStr)
|
||||
assert.Equal(t, "sub_q2", q2.baseConfig.QueueFullName)
|
||||
assert.Equal(t, "sub_q2_u2", q2.baseConfig.SetFullName)
|
||||
assert.Equal(t, 123, q2.GetWorkerMaxNumber())
|
||||
assert.Equal(t, 0, q2.GetWorkerNumber())
|
||||
assert.Equal(t, 0, q2.GetWorkerActiveNumber())
|
||||
assert.Equal(t, 0, q2.GetQueueItemNumber())
|
||||
assert.Equal(t, "int", q2.GetItemTypeName())
|
||||
qid2 := GetManager().qidCounter
|
||||
|
||||
assert.Equal(t, q1, GetManager().ManagedQueues()[qid1])
|
||||
|
||||
GetManager().GetManagedQueue(qid1).SetWorkerMaxNumber(120)
|
||||
assert.Equal(t, 120, q1.workerMaxNum)
|
||||
|
||||
stop := runWorkerPoolQueue(q2)
|
||||
assert.NoError(t, GetManager().GetManagedQueue(qid2).FlushWithContext(t.Context(), 0))
|
||||
assert.NoError(t, GetManager().FlushAll(t.Context(), 0))
|
||||
stop()
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
// Package queue implements a specialized concurrent queue system for Gitea.
|
||||
//
|
||||
// Terminology:
|
||||
//
|
||||
// 1. Item:
|
||||
// - An item can be a simple value, such as an integer, or a more complex structure that has multiple fields.
|
||||
// Usually a item serves as a task or a message. Sets of items will be sent to a queue handler to be processed.
|
||||
// - It's represented as a JSON-marshaled binary slice in the queue
|
||||
// - Since the item is marshaled by JSON, and JSON doesn't have stable key-order/type support,
|
||||
// so the decoded handler item may not be the same as the original "pushed" one if you use map/any types,
|
||||
//
|
||||
// 2. Batch:
|
||||
// - A collection of items that are grouped together for processing. Each worker receives a batch of items.
|
||||
//
|
||||
// 3. Worker:
|
||||
// - Individual unit of execution designed to process items from the queue. It's a goroutine that calls the Handler.
|
||||
// - Workers will get new items through a channel (WorkerPoolQueue is responsible for the distribution).
|
||||
// - Workers operate in parallel. The default value of max workers is determined by the setting system.
|
||||
//
|
||||
// 4. Handler (represented by HandlerFuncT type):
|
||||
// - It's the function responsible for processing items. Each active worker will call it.
|
||||
// - If an item or some items are not successfully processed, the handler could return them as "unhandled items".
|
||||
// In such scenarios, the queue system ensures these unhandled items are returned to the base queue after a brief delay.
|
||||
// This mechanism is particularly beneficial in cases where the processing entity (like a document indexer) is
|
||||
// temporarily unavailable. It ensures that no item is skipped or lost due to transient failures in the processing
|
||||
// mechanism.
|
||||
//
|
||||
// 5. Base queue:
|
||||
// - Represents the underlying storage mechanism for the queue. There are several implementations:
|
||||
// - Channel: Uses Go's native channel constructs to manage the queue, suitable for in-memory queuing.
|
||||
// - LevelDB: Especially useful in persistent queues for single instances.
|
||||
// - Redis: Suitable for clusters, where we may have multiple nodes.
|
||||
// - Dummy: This is special, it's not a real queue, it's a immediate no-op queue, which is useful for tests.
|
||||
// - They all have the same abstraction, the same interface, and they are tested by the same testing code.
|
||||
//
|
||||
// 6. WorkerPoolQueue:
|
||||
// - It's responsible to glue all together, using the "base queue" to provide "worker pool" functionality. It creates
|
||||
// new workers if needed and can flush the queue, running all the items synchronously till it finishes.
|
||||
// - Its "Push" function doesn't block forever, it will return an error if the queue is full after the timeout.
|
||||
//
|
||||
// 7. Manager:
|
||||
// - The purpose of it is to serve as a centralized manager for multiple WorkerPoolQueue instances. Whenever we want
|
||||
// to create a new queue, flush, or get a specific queue, we could use it.
|
||||
//
|
||||
// A queue can be "simple" or "unique". A unique queue will try to avoid duplicate items.
|
||||
// Unique queue's "Has" function can be used to check whether an item is already in the queue,
|
||||
// although it's not 100% reliable due to the lack of proper transaction support.
|
||||
// Simple queue's "Has" function always returns "has=false".
|
||||
//
|
||||
// A WorkerPoolQueue is a generic struct; this means it will work with any type but just for that type.
|
||||
// If you want another kind of items to run, you would have to call the manager to create a new WorkerPoolQueue for you
|
||||
// with a different handler that works with this new type of item. As an example of this:
|
||||
//
|
||||
// func Init() error {
|
||||
// itemQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "queue-name", handler)
|
||||
// ...
|
||||
// }
|
||||
// func handler(items ...*mypkg.QueueItem) []*mypkg.QueueItem { ... }
|
||||
package queue
|
||||
|
||||
import "gitea.dev/modules/util"
|
||||
|
||||
type HandlerFuncT[T any] func(...T) (unhandled []T)
|
||||
|
||||
var ErrAlreadyInQueue = util.NewAlreadyExistErrorf("already in queue")
|
||||
@@ -0,0 +1,40 @@
|
||||
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// testStateRecorder is used to record state changes for testing, to help debug async behaviors
|
||||
type testStateRecorder struct {
|
||||
records []string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
var testRecorder = &testStateRecorder{}
|
||||
|
||||
func (t *testStateRecorder) Record(format string, args ...any) {
|
||||
t.mu.Lock()
|
||||
t.records = append(t.records, fmt.Sprintf(format, args...))
|
||||
if len(t.records) > 1000 {
|
||||
t.records = t.records[len(t.records)-1000:]
|
||||
}
|
||||
t.mu.Unlock()
|
||||
}
|
||||
|
||||
func (t *testStateRecorder) Records() []string {
|
||||
t.mu.Lock()
|
||||
r := make([]string, len(t.records))
|
||||
copy(r, t.records)
|
||||
t.mu.Unlock()
|
||||
return r
|
||||
}
|
||||
|
||||
func (t *testStateRecorder) Reset() {
|
||||
t.mu.Lock()
|
||||
t.records = nil
|
||||
t.mu.Unlock()
|
||||
}
|
||||
@@ -0,0 +1,372 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime/pprof"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitea.dev/modules/log"
|
||||
)
|
||||
|
||||
var (
|
||||
infiniteTimerC = make(chan time.Time)
|
||||
batchDebounceDuration = 100 * time.Millisecond
|
||||
workerIdleDuration = 1 * time.Second
|
||||
shutdownDefaultTimeout = 2 * time.Second
|
||||
|
||||
unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test
|
||||
)
|
||||
|
||||
func init() {
|
||||
unhandledItemRequeueDuration.Store(int64(time.Second))
|
||||
}
|
||||
|
||||
// workerGroup is a group of workers to work with a WorkerPoolQueue
|
||||
type workerGroup[T any] struct {
|
||||
q *WorkerPoolQueue[T]
|
||||
wg sync.WaitGroup
|
||||
|
||||
ctxWorker context.Context
|
||||
ctxWorkerCancel context.CancelFunc
|
||||
|
||||
batchBuffer []T
|
||||
popItemChan chan []byte
|
||||
popItemErr chan error
|
||||
}
|
||||
|
||||
func (wg *workerGroup[T]) doPrepareWorkerContext() {
|
||||
wg.ctxWorker, wg.ctxWorkerCancel = context.WithCancel(wg.q.ctxRun)
|
||||
}
|
||||
|
||||
// doDispatchBatchToWorker dispatches a batch of items to worker's channel.
|
||||
// If the channel is full, it tries to start a new worker if possible.
|
||||
func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushChan chan flushType) {
|
||||
batch := wg.batchBuffer
|
||||
wg.batchBuffer = nil
|
||||
|
||||
if len(batch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
full := false
|
||||
select {
|
||||
case q.batchChan <- batch:
|
||||
default:
|
||||
full = true
|
||||
}
|
||||
|
||||
// TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
|
||||
// The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
|
||||
// So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
|
||||
// This data-race is not serious, as long as a new worker will be started soon to make sure there are enough workers,
|
||||
// so no need to hugely refactor at the moment.
|
||||
q.workerNumMu.Lock()
|
||||
noWorker := q.workerNum == 0
|
||||
if full || noWorker {
|
||||
if q.workerNum < q.workerMaxNum || noWorker && q.workerMaxNum <= 0 {
|
||||
q.workerNum++
|
||||
q.doStartNewWorker(wg)
|
||||
}
|
||||
}
|
||||
q.workerNumMu.Unlock()
|
||||
|
||||
if full {
|
||||
select {
|
||||
case q.batchChan <- batch:
|
||||
case flush := <-flushChan:
|
||||
q.doWorkerHandle(batch)
|
||||
q.doFlush(wg, flush)
|
||||
case <-q.ctxRun.Done():
|
||||
wg.batchBuffer = batch // return the batch to buffer, the "doRun" function will handle it
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// doWorkerHandle calls the safeHandler to handle a batch of items, and it increases/decreases the active worker number.
|
||||
// If the context has been canceled, it should not be caller because the "Push" still needs the context, in such case, call q.safeHandler directly
|
||||
func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
|
||||
q.workerNumMu.Lock()
|
||||
q.workerActiveNum++
|
||||
q.workerNumMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
q.workerNumMu.Lock()
|
||||
q.workerActiveNum--
|
||||
q.workerNumMu.Unlock()
|
||||
}()
|
||||
|
||||
unhandled := q.safeHandler(batch...)
|
||||
// if none of the items were handled, it should back-off for a few seconds
|
||||
// in this case the handler (eg: document indexer) may have encountered some errors/failures
|
||||
if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
|
||||
if q.isFlushing.Load() {
|
||||
return // do not requeue items when flushing, since all items failed, requeue them will continue failing.
|
||||
}
|
||||
log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
|
||||
// TODO: ideally it shouldn't "sleep" here (blocks the worker, then blocks flush).
|
||||
// It could debounce the requeue operation, and try to requeue the items in the future.
|
||||
select {
|
||||
case <-q.ctxRun.Done():
|
||||
case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())):
|
||||
}
|
||||
}
|
||||
for _, item := range unhandled {
|
||||
if err := q.Push(item); err != nil {
|
||||
if !q.basePushForShutdown(item) {
|
||||
log.Error("Failed to requeue item for queue %q when calling handler: %v", q.GetName(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// basePushForShutdown tries to requeue items into the base queue when the WorkerPoolQueue is shutting down.
|
||||
// If the queue is shutting down, it returns true and try to push the items
|
||||
// Otherwise it does nothing and returns false
|
||||
func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
|
||||
shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
|
||||
if shutdownTimeout == 0 {
|
||||
return false
|
||||
}
|
||||
ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
||||
defer ctxShutdownCancel()
|
||||
for _, item := range items {
|
||||
// if there is still any error, the queue can do nothing instead of losing the items
|
||||
if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil {
|
||||
log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func resetIdleTicker(t *time.Ticker, dur time.Duration) {
|
||||
t.Reset(dur)
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
|
||||
func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
|
||||
wp.wg.Go(func() {
|
||||
log.Debug("Queue %q starts new worker", q.GetName())
|
||||
defer log.Debug("Queue %q stops idle worker", q.GetName())
|
||||
|
||||
t := time.NewTicker(workerIdleDuration)
|
||||
defer t.Stop()
|
||||
|
||||
keepWorking := true
|
||||
stopWorking := func() {
|
||||
q.workerNumMu.Lock()
|
||||
keepWorking = false
|
||||
q.workerNum--
|
||||
q.workerNumMu.Unlock()
|
||||
}
|
||||
for keepWorking {
|
||||
select {
|
||||
case <-wp.ctxWorker.Done():
|
||||
stopWorking()
|
||||
case batch, ok := <-q.batchChan:
|
||||
if !ok {
|
||||
stopWorking()
|
||||
continue
|
||||
}
|
||||
q.doWorkerHandle(batch)
|
||||
// reset the idle ticker, and drain the tick after reset in case a tick is already triggered
|
||||
resetIdleTicker(t, workerIdleDuration) // key code for TestWorkerPoolQueueWorkerIdleReset
|
||||
case <-t.C:
|
||||
q.workerNumMu.Lock()
|
||||
keepWorking = q.workerNum <= 1 // keep the last worker running
|
||||
if !keepWorking {
|
||||
q.workerNum--
|
||||
}
|
||||
q.workerNumMu.Unlock()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// doFlush flushes the queue: it tries to read all items from the queue and handles them.
|
||||
// It is for testing purpose only. It's not designed to work for a cluster.
|
||||
func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
|
||||
q.isFlushing.Store(true)
|
||||
defer q.isFlushing.Store(false)
|
||||
|
||||
log.Debug("Queue %q starts flushing", q.GetName())
|
||||
defer log.Debug("Queue %q finishes flushing", q.GetName())
|
||||
|
||||
// stop all workers, and prepare a new worker context to start new workers
|
||||
wg.ctxWorkerCancel()
|
||||
wg.wg.Wait()
|
||||
|
||||
defer func() {
|
||||
close(flush.c)
|
||||
wg.doPrepareWorkerContext()
|
||||
}()
|
||||
|
||||
if flush.timeout < 0 {
|
||||
// discard everything
|
||||
wg.batchBuffer = nil
|
||||
for {
|
||||
select {
|
||||
case <-wg.popItemChan:
|
||||
case <-wg.popItemErr:
|
||||
case <-q.batchChan:
|
||||
case <-q.ctxRun.Done():
|
||||
return
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// drain the batch channel first
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case batch := <-q.batchChan:
|
||||
q.doWorkerHandle(batch)
|
||||
default:
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
// drain the popItem channel
|
||||
emptyCounter := 0
|
||||
for {
|
||||
select {
|
||||
case <-q.ctxRun.Done():
|
||||
log.Debug("Queue %q is shutting down", q.GetName())
|
||||
return
|
||||
case data, dataOk := <-wg.popItemChan:
|
||||
if !dataOk {
|
||||
return
|
||||
}
|
||||
emptyCounter = 0
|
||||
if v, jsonOk := q.unmarshal(data); !jsonOk {
|
||||
continue
|
||||
} else {
|
||||
q.doWorkerHandle([]T{v})
|
||||
}
|
||||
case err := <-wg.popItemErr:
|
||||
if !q.isCtxRunCanceled() {
|
||||
log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err)
|
||||
}
|
||||
return
|
||||
case <-time.After(20 * time.Millisecond):
|
||||
// There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables.
|
||||
// If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance.
|
||||
// Luckily, the "Flush" trick is only used in tests, so far so good.
|
||||
if cnt, _ := q.baseQueue.Len(q.ctxRun); cnt == 0 && len(wg.popItemChan) == 0 {
|
||||
emptyCounter++
|
||||
}
|
||||
if emptyCounter >= 2 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) isCtxRunCanceled() bool {
|
||||
select {
|
||||
case <-q.ctxRun.Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip reading other flush requests
|
||||
|
||||
// doRun is the main loop of the queue. All related "doXxx" functions are executed in its context.
|
||||
func (q *WorkerPoolQueue[T]) doRun() {
|
||||
pprof.SetGoroutineLabels(q.ctxRun)
|
||||
|
||||
log.Debug("Queue %q starts running", q.GetName())
|
||||
defer log.Debug("Queue %q stops running", q.GetName())
|
||||
|
||||
wg := &workerGroup[T]{q: q}
|
||||
wg.doPrepareWorkerContext()
|
||||
wg.popItemChan, wg.popItemErr = popItemByChan(q.ctxRun, q.baseQueue.PopItem)
|
||||
|
||||
defer func() {
|
||||
q.ctxRunCancel()
|
||||
|
||||
// drain all data on the fly
|
||||
// since the queue is shutting down, the items can't be dispatched to workers because the context is canceled
|
||||
// it can't call doWorkerHandle either, because there is no chance to push unhandled items back to the queue
|
||||
var unhandled []T
|
||||
close(q.batchChan)
|
||||
for batch := range q.batchChan {
|
||||
unhandled = append(unhandled, batch...)
|
||||
}
|
||||
unhandled = append(unhandled, wg.batchBuffer...)
|
||||
for data := range wg.popItemChan {
|
||||
if v, ok := q.unmarshal(data); ok {
|
||||
unhandled = append(unhandled, v)
|
||||
}
|
||||
}
|
||||
|
||||
shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
|
||||
if shutdownTimeout != 0 {
|
||||
// if there is a shutdown context, try to push the items back to the base queue
|
||||
q.basePushForShutdown(unhandled...)
|
||||
workerDone := make(chan struct{})
|
||||
// the only way to wait for the workers, because the handlers do not have context to wait for
|
||||
go func() { wg.wg.Wait(); close(workerDone) }()
|
||||
select {
|
||||
case <-workerDone:
|
||||
case <-time.After(shutdownTimeout):
|
||||
log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName())
|
||||
}
|
||||
} else {
|
||||
// if there is no shutdown context, just call the handler to try to handle the items. if the handler fails again, the items are lost
|
||||
q.safeHandler(unhandled...)
|
||||
}
|
||||
|
||||
close(q.shutdownDone)
|
||||
}()
|
||||
|
||||
var batchDispatchC <-chan time.Time = infiniteTimerC
|
||||
for {
|
||||
select {
|
||||
case flush := <-q.flushChan:
|
||||
// before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
|
||||
// after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
|
||||
// since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
|
||||
q.doDispatchBatchToWorker(wg, skipFlushChan)
|
||||
q.doFlush(wg, flush)
|
||||
case <-q.ctxRun.Done():
|
||||
log.Debug("Queue %q is shutting down", q.GetName())
|
||||
return
|
||||
case data, dataOk := <-wg.popItemChan:
|
||||
if !dataOk {
|
||||
return
|
||||
}
|
||||
if v, jsonOk := q.unmarshal(data); !jsonOk {
|
||||
testRecorder.Record("pop:corrupted:%s", data) // in rare cases the levelqueue(leveldb) might be corrupted
|
||||
continue
|
||||
} else {
|
||||
wg.batchBuffer = append(wg.batchBuffer, v)
|
||||
}
|
||||
if len(wg.batchBuffer) >= q.batchLength {
|
||||
q.doDispatchBatchToWorker(wg, q.flushChan)
|
||||
} else if batchDispatchC == infiniteTimerC {
|
||||
batchDispatchC = time.After(batchDebounceDuration)
|
||||
} // else: batchDispatchC is already a debounce timer, it will be triggered soon
|
||||
case <-batchDispatchC:
|
||||
batchDispatchC = infiniteTimerC
|
||||
q.doDispatchBatchToWorker(wg, q.flushChan)
|
||||
case err := <-wg.popItemErr:
|
||||
if !q.isCtxRunCanceled() {
|
||||
log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,260 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime/pprof"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitea.dev/modules/json"
|
||||
"gitea.dev/modules/log"
|
||||
"gitea.dev/modules/process"
|
||||
"gitea.dev/modules/setting"
|
||||
)
|
||||
|
||||
// WorkerPoolQueue is a queue that uses a pool of workers to process items
|
||||
// It can use different underlying (base) queue types
|
||||
type WorkerPoolQueue[T any] struct {
|
||||
ctxRun context.Context
|
||||
ctxRunCancel process.FinishedFunc
|
||||
|
||||
shutdownDone chan struct{}
|
||||
shutdownTimeout atomic.Int64 // in case some buggy handlers (workers) would hang forever, "shutdown" should finish in predictable time
|
||||
|
||||
origHandler HandlerFuncT[T]
|
||||
safeHandler HandlerFuncT[T]
|
||||
|
||||
baseQueueType string
|
||||
baseConfig *BaseConfig
|
||||
baseQueue baseQueue
|
||||
|
||||
batchChan chan []T
|
||||
flushChan chan flushType
|
||||
isFlushing atomic.Bool
|
||||
|
||||
batchLength int
|
||||
workerNum int
|
||||
workerMaxNum int
|
||||
workerActiveNum int
|
||||
workerNumMu sync.Mutex
|
||||
}
|
||||
|
||||
type flushType struct {
|
||||
timeout time.Duration
|
||||
c chan struct{}
|
||||
}
|
||||
|
||||
var _ ManagedWorkerPoolQueue = (*WorkerPoolQueue[any])(nil)
|
||||
|
||||
func (q *WorkerPoolQueue[T]) GetName() string {
|
||||
return q.baseConfig.ManagedName
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) GetType() string {
|
||||
return q.baseQueueType
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) GetItemTypeName() string {
|
||||
var t T
|
||||
return fmt.Sprintf("%T", t)
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) GetWorkerNumber() int {
|
||||
q.workerNumMu.Lock()
|
||||
defer q.workerNumMu.Unlock()
|
||||
return q.workerNum
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) GetWorkerActiveNumber() int {
|
||||
q.workerNumMu.Lock()
|
||||
defer q.workerNumMu.Unlock()
|
||||
return q.workerActiveNum
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) GetWorkerMaxNumber() int {
|
||||
q.workerNumMu.Lock()
|
||||
defer q.workerNumMu.Unlock()
|
||||
return q.workerMaxNum
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) SetWorkerMaxNumber(num int) {
|
||||
q.workerNumMu.Lock()
|
||||
defer q.workerNumMu.Unlock()
|
||||
q.workerMaxNum = num
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) GetQueueItemNumber() int {
|
||||
cnt, err := q.baseQueue.Len(q.ctxRun)
|
||||
if err != nil {
|
||||
log.Error("Failed to get number of items in queue %q: %v", q.GetName(), err)
|
||||
}
|
||||
return cnt
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.Duration) (err error) {
|
||||
if q.isBaseQueueDummy() {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug("Try to flush queue %q with timeout %v", q.GetName(), timeout)
|
||||
defer log.Debug("Finish flushing queue %q, err: %v", q.GetName(), err)
|
||||
|
||||
var after <-chan time.Time
|
||||
after = infiniteTimerC
|
||||
if timeout > 0 {
|
||||
after = time.After(timeout)
|
||||
}
|
||||
flush := flushType{timeout: timeout, c: make(chan struct{})}
|
||||
|
||||
// send flush request
|
||||
// if it blocks, it means that there is a flush in progress or the queue hasn't been started yet
|
||||
select {
|
||||
case q.flushChan <- flush:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-q.ctxRun.Done():
|
||||
return q.ctxRun.Err()
|
||||
case <-after:
|
||||
return context.DeadlineExceeded
|
||||
}
|
||||
|
||||
// wait for flush to finish
|
||||
select {
|
||||
case <-flush.c:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-q.ctxRun.Done():
|
||||
return q.ctxRun.Err()
|
||||
case <-after:
|
||||
return context.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveAllItems removes all items in the baes queue
|
||||
func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error {
|
||||
return q.baseQueue.RemoveAll(ctx)
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) marshal(data T) []byte {
|
||||
bs, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
log.Error("Failed to marshal item for queue %q: %v", q.GetName(), err)
|
||||
return nil
|
||||
}
|
||||
return bs
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) unmarshal(data []byte) (t T, ok bool) {
|
||||
if err := json.Unmarshal(data, &t); err != nil {
|
||||
log.Error("Failed to unmarshal item from queue %q: %v", q.GetName(), err)
|
||||
return t, false
|
||||
}
|
||||
return t, true
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) isBaseQueueDummy() bool {
|
||||
_, isDummy := q.baseQueue.(*baseDummy)
|
||||
return isDummy
|
||||
}
|
||||
|
||||
// Push adds an item to the queue, it may block for a while and then returns an error if the queue is full
|
||||
func (q *WorkerPoolQueue[T]) Push(data T) error {
|
||||
if q.isBaseQueueDummy() && q.safeHandler != nil {
|
||||
// FIXME: the "immediate" queue is only for testing, but it really causes problems because its behavior is different from a real queue.
|
||||
// Even if tests pass, it doesn't mean that there is no bug in code.
|
||||
if data, ok := q.unmarshal(q.marshal(data)); ok {
|
||||
q.safeHandler(data)
|
||||
}
|
||||
}
|
||||
return q.baseQueue.PushItem(q.ctxRun, q.marshal(data))
|
||||
}
|
||||
|
||||
// Has only works for unique queues. Keep in mind that this check may not be reliable (due to lacking of proper transaction support)
|
||||
// There could be a small chance that duplicate items appear in the queue
|
||||
func (q *WorkerPoolQueue[T]) Has(data T) (bool, error) {
|
||||
return q.baseQueue.HasItem(q.ctxRun, q.marshal(data))
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) Run() {
|
||||
q.doRun()
|
||||
}
|
||||
|
||||
func (q *WorkerPoolQueue[T]) Cancel() {
|
||||
q.ctxRunCancel()
|
||||
}
|
||||
|
||||
// ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue
|
||||
// It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed
|
||||
func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) {
|
||||
q.shutdownTimeout.Store(int64(timeout))
|
||||
q.ctxRunCancel()
|
||||
<-q.shutdownDone
|
||||
}
|
||||
|
||||
func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQueue, error)) {
|
||||
switch t {
|
||||
case "dummy", "immediate":
|
||||
return t, newBaseDummy
|
||||
case "channel":
|
||||
return t, newBaseChannelGeneric
|
||||
case "redis":
|
||||
return t, newBaseRedisGeneric
|
||||
default: // level(leveldb,levelqueue,persistable-channel)
|
||||
return "level", newBaseLevelQueueGeneric
|
||||
}
|
||||
}
|
||||
|
||||
func newWorkerPoolQueueForTest[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
|
||||
return NewWorkerPoolQueueWithContext(context.Background(), name, queueSetting, handler, unique)
|
||||
}
|
||||
|
||||
func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error) {
|
||||
if handler == nil {
|
||||
log.Debug("Use dummy queue for %q because handler is nil and caller doesn't want to process the queue items", name)
|
||||
queueSetting.Type = "dummy"
|
||||
}
|
||||
|
||||
var w WorkerPoolQueue[T]
|
||||
var err error
|
||||
queueType, newQueueFn := getNewQueueFn(queueSetting.Type)
|
||||
w.baseQueueType = queueType
|
||||
w.baseConfig = toBaseConfig(name, queueSetting)
|
||||
w.baseQueue, err = newQueueFn(w.baseConfig, unique)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Trace("Created queue %q of type %q", name, queueType)
|
||||
|
||||
w.ctxRun, _, w.ctxRunCancel = process.GetManager().AddTypedContext(ctx, "Queue: "+w.GetName(), process.SystemProcessType, false)
|
||||
w.batchChan = make(chan []T)
|
||||
w.flushChan = make(chan flushType)
|
||||
w.shutdownDone = make(chan struct{})
|
||||
w.shutdownTimeout.Store(int64(shutdownDefaultTimeout))
|
||||
w.workerMaxNum = queueSetting.MaxWorkers
|
||||
w.batchLength = queueSetting.BatchLength
|
||||
|
||||
w.origHandler = handler
|
||||
w.safeHandler = func(t ...T) (unhandled []T) {
|
||||
defer func() {
|
||||
// FIXME: there is no ctx support in the handler, so process manager is unable to restore the labels
|
||||
// so here we explicitly set the "queue ctx" labels again after the handler is done
|
||||
pprof.SetGoroutineLabels(w.ctxRun)
|
||||
err := recover()
|
||||
if err != nil {
|
||||
log.Error("Recovered from panic in queue %q handler: %v\n%s", name, err, log.Stack(2))
|
||||
}
|
||||
}()
|
||||
if w.origHandler != nil {
|
||||
return w.origHandler(t...)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return &w, nil
|
||||
}
|
||||
@@ -0,0 +1,284 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.dev/modules/setting"
|
||||
"gitea.dev/modules/test"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
|
||||
go q.Run()
|
||||
return func() {
|
||||
q.ShutdownWait(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorkerPoolQueueUnhandled(t *testing.T) {
|
||||
oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load()
|
||||
unhandledItemRequeueDuration.Store(0)
|
||||
defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration)
|
||||
|
||||
mu := sync.Mutex{}
|
||||
|
||||
test := func(t *testing.T, queueSetting setting.QueueSettings) {
|
||||
queueSetting.Length = 100
|
||||
queueSetting.Type = "channel"
|
||||
queueSetting.Datadir = t.TempDir() + "/test-queue"
|
||||
m := map[int]int{}
|
||||
|
||||
// odds are handled once, evens are handled twice
|
||||
handler := func(items ...int) (unhandled []int) {
|
||||
testRecorder.Record("handle:%v", items)
|
||||
for _, item := range items {
|
||||
mu.Lock()
|
||||
if item%2 == 0 && m[item] == 0 {
|
||||
unhandled = append(unhandled, item)
|
||||
}
|
||||
m[item]++
|
||||
mu.Unlock()
|
||||
}
|
||||
return unhandled
|
||||
}
|
||||
|
||||
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", queueSetting, handler, false)
|
||||
stop := runWorkerPoolQueue(q)
|
||||
for i := 0; i < queueSetting.Length; i++ {
|
||||
testRecorder.Record("push:%v", i)
|
||||
assert.NoError(t, q.Push(i))
|
||||
}
|
||||
assert.NoError(t, q.FlushWithContext(t.Context(), 0))
|
||||
stop()
|
||||
|
||||
ok := true
|
||||
for i := 0; i < queueSetting.Length; i++ {
|
||||
if i%2 == 0 {
|
||||
ok = ok && assert.Equal(t, 2, m[i], "test %s: item %d", t.Name(), i)
|
||||
} else {
|
||||
ok = ok && assert.Equal(t, 1, m[i], "test %s: item %d", t.Name(), i)
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
t.Logf("m: %v", m)
|
||||
t.Logf("records: %v", testRecorder.Records())
|
||||
}
|
||||
testRecorder.Reset()
|
||||
}
|
||||
|
||||
runCount := 2 // we can run these tests even hundreds times to see its stability
|
||||
t.Run("1/1", func(t *testing.T) {
|
||||
for range runCount {
|
||||
test(t, setting.QueueSettings{BatchLength: 1, MaxWorkers: 1})
|
||||
}
|
||||
})
|
||||
t.Run("3/1", func(t *testing.T) {
|
||||
for range runCount {
|
||||
test(t, setting.QueueSettings{BatchLength: 3, MaxWorkers: 1})
|
||||
}
|
||||
})
|
||||
t.Run("4/5", func(t *testing.T) {
|
||||
for range runCount {
|
||||
test(t, setting.QueueSettings{BatchLength: 4, MaxWorkers: 5})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestWorkerPoolQueuePersistence(t *testing.T) {
|
||||
runCount := 2 // we can run these tests even hundreds times to see its stability
|
||||
t.Run("1/1", func(t *testing.T) {
|
||||
for range runCount {
|
||||
testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 1, MaxWorkers: 1, Length: 100})
|
||||
}
|
||||
})
|
||||
t.Run("3/1", func(t *testing.T) {
|
||||
for range runCount {
|
||||
testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 3, MaxWorkers: 1, Length: 100})
|
||||
}
|
||||
})
|
||||
t.Run("4/5", func(t *testing.T) {
|
||||
for range runCount {
|
||||
testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 4, MaxWorkers: 5, Length: 100})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSettings) {
|
||||
testCount := queueSetting.Length
|
||||
queueSetting.Type = "level"
|
||||
queueSetting.Datadir = t.TempDir() + "/test-queue"
|
||||
|
||||
mu := sync.Mutex{}
|
||||
|
||||
var tasksQ1, tasksQ2 []string
|
||||
q1 := func() {
|
||||
startWhenAllReady := make(chan struct{}) // only start data consuming when the "testCount" tasks are all pushed into queue
|
||||
stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
|
||||
|
||||
testHandler := func(data ...string) []string {
|
||||
<-startWhenAllReady
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
for _, s := range data {
|
||||
mu.Lock()
|
||||
tasksQ1 = append(tasksQ1, s)
|
||||
mu.Unlock()
|
||||
|
||||
if s == "task-20" {
|
||||
close(stopAt20Shutdown)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
|
||||
stop := runWorkerPoolQueue(q)
|
||||
for i := range testCount {
|
||||
_ = q.Push("task-" + strconv.Itoa(i))
|
||||
}
|
||||
close(startWhenAllReady)
|
||||
<-stopAt20Shutdown // it's possible to have more than 20 tasks executed
|
||||
stop()
|
||||
}
|
||||
|
||||
q1() // run some tasks and shutdown at an intermediate point
|
||||
|
||||
time.Sleep(100 * time.Millisecond) // because the handler in q1 has a slight delay, we need to wait for it to finish
|
||||
|
||||
q2 := func() {
|
||||
testHandler := func(data ...string) []string {
|
||||
for _, s := range data {
|
||||
mu.Lock()
|
||||
tasksQ2 = append(tasksQ2, s)
|
||||
mu.Unlock()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
|
||||
stop := runWorkerPoolQueue(q)
|
||||
assert.NoError(t, q.FlushWithContext(t.Context(), 0))
|
||||
stop()
|
||||
}
|
||||
|
||||
q2() // restart the queue to continue to execute the tasks in it
|
||||
|
||||
assert.NotEmpty(t, tasksQ1)
|
||||
assert.NotEmpty(t, tasksQ2)
|
||||
assert.Equal(t, testCount, len(tasksQ1)+len(tasksQ2))
|
||||
}
|
||||
|
||||
func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
|
||||
defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)()
|
||||
|
||||
handler := func(items ...int) (unhandled []int) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
return nil
|
||||
}
|
||||
|
||||
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
|
||||
stop := runWorkerPoolQueue(q)
|
||||
for i := range 5 {
|
||||
assert.NoError(t, q.Push(i))
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
assert.Equal(t, 1, q.GetWorkerNumber())
|
||||
assert.Equal(t, 1, q.GetWorkerActiveNumber())
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, q.GetWorkerNumber())
|
||||
assert.Equal(t, 0, q.GetWorkerActiveNumber())
|
||||
time.Sleep(workerIdleDuration)
|
||||
assert.Equal(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
|
||||
stop()
|
||||
|
||||
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
|
||||
stop = runWorkerPoolQueue(q)
|
||||
for i := range 15 {
|
||||
assert.NoError(t, q.Push(i))
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
assert.Equal(t, 3, q.GetWorkerNumber())
|
||||
assert.Equal(t, 3, q.GetWorkerActiveNumber())
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 3, q.GetWorkerNumber())
|
||||
assert.Equal(t, 0, q.GetWorkerActiveNumber())
|
||||
time.Sleep(workerIdleDuration)
|
||||
assert.Equal(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
|
||||
stop()
|
||||
}
|
||||
|
||||
func TestWorkerPoolQueueShutdown(t *testing.T) {
|
||||
oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load()
|
||||
unhandledItemRequeueDuration.Store(int64(100 * time.Millisecond))
|
||||
defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration)
|
||||
|
||||
// simulate a slow handler, it doesn't handle any item (all items will be pushed back to the queue)
|
||||
handlerCalled := make(chan struct{})
|
||||
handler := func(items ...int) (unhandled []int) {
|
||||
if items[0] == 0 {
|
||||
close(handlerCalled)
|
||||
}
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
return items
|
||||
}
|
||||
|
||||
qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20}
|
||||
q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
|
||||
stop := runWorkerPoolQueue(q)
|
||||
for i := 0; i < qs.Length; i++ {
|
||||
assert.NoError(t, q.Push(i))
|
||||
}
|
||||
<-handlerCalled
|
||||
time.Sleep(200 * time.Millisecond) // wait for a while to make sure all workers are active
|
||||
assert.Equal(t, 4, q.GetWorkerActiveNumber())
|
||||
stop() // stop triggers shutdown
|
||||
assert.Equal(t, 0, q.GetWorkerActiveNumber())
|
||||
|
||||
// no item was ever handled, so we still get all of them again
|
||||
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
|
||||
assert.Equal(t, 20, q.GetQueueItemNumber())
|
||||
}
|
||||
|
||||
func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
|
||||
defer test.MockVariableValue(&workerIdleDuration, 10*time.Millisecond)()
|
||||
defer mockBackoffDuration(5 * time.Millisecond)()
|
||||
|
||||
var q *WorkerPoolQueue[int]
|
||||
var handledCount atomic.Int32
|
||||
var hasOnlyOneWorkerRunning atomic.Bool
|
||||
handler := func(items ...int) (unhandled []int) {
|
||||
handledCount.Add(int32(len(items)))
|
||||
// make each work have different duration, and check the active worker number periodically
|
||||
var activeNums []int
|
||||
for i := 0; i < 5-items[0]%2; i++ {
|
||||
time.Sleep(workerIdleDuration * 2)
|
||||
activeNums = append(activeNums, q.GetWorkerActiveNumber())
|
||||
}
|
||||
// When the queue never becomes empty, the existing workers should keep working
|
||||
// It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
|
||||
// If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
|
||||
if slices.Equal([]int{1, 1}, activeNums[len(activeNums)-2:]) {
|
||||
hasOnlyOneWorkerRunning.Store(true)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
|
||||
stop := runWorkerPoolQueue(q)
|
||||
for i := range 100 {
|
||||
assert.NoError(t, q.Push(i))
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Greater(t, int(handledCount.Load()), 4) // make sure there are enough items handled during the test
|
||||
assert.False(t, hasOnlyOneWorkerRunning.Load(), "a slow handler should not block other workers from starting")
|
||||
stop()
|
||||
}
|
||||
Reference in New Issue
Block a user