pcm-coordinator/api/internal/mqs/queue.go

198 lines
3.9 KiB
Go

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package mqs
import (
"fmt"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/queue"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/threading"
"k8s.io/apimachinery/pkg/util/json"
"sync"
"time"
)
var InsQueue *workQueue
type (
ConsumeHandle func(v string) error
ConsumeHandler interface {
Consume(value string) error
}
workQueues struct {
queues []queue.MessageQueue
group *service.ServiceGroup
}
workQueue struct {
topic string
Beta *Beta
handler ConsumeHandler
consumerRoutines *threading.RoutineGroup
}
Beta struct {
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
}
empty struct{}
t interface{}
set map[t]empty
)
func (s set) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set) insert(item t) {
s[item] = empty{}
}
func (s set) delete(item t) {
delete(s, item)
}
func (s set) len() int {
return len(s)
}
func (b *Beta) Get() (item interface{}) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
for len(b.queue) == 0 {
b.cond.Wait()
}
if len(b.queue) == 0 {
// We must be shutting down.
return nil
}
item = b.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
b.queue[0] = nil
b.queue = b.queue[1:]
b.processing.insert(item)
b.dirty.delete(item)
return item
}
func (b *Beta) Add(item interface{}) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
if b.dirty.has(item) {
return
}
b.dirty.insert(item)
if b.processing.has(item) {
return
}
b.queue = append(b.queue, item)
b.cond.Signal()
}
func (w *workQueue) Start() {
w.startConsumers()
w.consumerRoutines.Wait()
}
func (w *workQueue) Stop() {
}
func (w workQueues) Start() {
for _, each := range w.queues {
w.group.Add(each)
}
w.group.Start()
}
func (w workQueues) Stop() {
w.group.Stop()
}
func (w *workQueue) startConsumers() {
w.consumerRoutines.Run(func() {
for {
item := w.Beta.Get()
println("开始消费 ")
if item != nil {
bytes, err := json.Marshal(item)
if err != nil {
return
}
w.consumeOne(string(bytes))
println("开始消费3")
}
time.Sleep(1 * time.Second)
}
})
}
func (w *workQueue) consumeOne(value string) error {
err := w.handler.Consume(value)
return err
}
func newWorkQueue(topic string, handler ConsumeHandler) queue.MessageQueue {
wq := &workQueue{
topic: topic,
Beta: &Beta{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
},
consumerRoutines: threading.NewRoutineGroup(),
handler: handler}
InsQueue = wq
return wq
}
func MustNewQueue(topic string, handler ConsumeHandler) queue.MessageQueue {
q, err := NewQueue(topic, handler)
if err != nil {
fmt.Println("NewQueue报错")
}
return q
}
func NewQueue(topic string, handler ConsumeHandler) (queue.MessageQueue, error) {
if len(topic) == 0 {
return nil, errors.New("topic不能为空")
}
r := workQueues{
group: service.NewServiceGroup(),
}
r.queues = append(r.queues, newWorkQueue(topic, handler))
return r, nil
}