Files
chain33-im/comet/group.go
2022-03-17 15:55:27 +08:00

120 lines
2.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package comet
import (
"fmt"
"sync"
"gitlab.33.cn/chat/im/api/comet/grpc"
)
// Group is a group and store channel group info.
type Group struct {
ID string
rLock sync.RWMutex
next *Node
drop bool
Online int32 // dirty read is ok
AllOnline int32
}
// NewGroup new a group struct, store channel group info.
func NewGroup(id string) (r *Group) {
r = new(Group)
r.ID = id
r.drop = false
r.next = nil
r.Online = 0
return
}
// Put put channel into the group.
func (r *Group) Put(ch *Channel) (err error) {
r.rLock.Lock()
if !r.drop && ch.GetNode(r.ID) == nil {
node := &Node{
Current: ch,
Next: nil,
Prev: nil,
}
ch.SetNode(r.ID, node)
if r.next != nil {
r.next.Prev = node
}
node.Next = r.next
node.Prev = nil
r.next = node // insert to header
r.Online++
} /* else { //del: 2021年7月21日16:32:54 dld
err = errors.ErrGroupDroped
}*/
r.rLock.Unlock()
return
}
// Del delete channel from the group.
func (r *Group) Del(ch *Channel) bool {
r.rLock.Lock()
if node := ch.GetNode(r.ID); node != nil {
if node.Next != nil {
// if not footer
node.Next.Prev = node.Prev
}
if node.Prev != nil {
// if not header
node.Prev.Next = node.Next
} else {
r.next = node.Next
}
r.Online--
//r.drop = (r.Online == 0)
ch.DelNode(r.ID) //2021年6月10日 删除对应node防止再次put的时候报ErrGroupDroped错误dld
}
r.rLock.Unlock()
return r.drop
}
// Push push msg to the group, if chan full discard it.
func (r *Group) Push(p *grpc.Proto) {
r.rLock.RLock()
for node := r.next; node != nil; node = node.Next {
if node.Current != nil {
_, _ = node.Current.Push(p)
}
}
r.rLock.RUnlock()
}
// group members Key,IP
func (r *Group) Members() ([]string, []string) {
r.rLock.RLock()
members := make([]string, 0)
mIp := make([]string, 0)
for node := r.next; node != nil; node = node.Next {
if node.Current != nil {
members = append(members, node.Current.Key)
mIp = append(mIp, fmt.Sprintf("%v:%v", node.Current.IP, node.Current.Port))
}
}
r.rLock.RUnlock()
return members, mIp
}
// Close close the group.
func (r *Group) Close() {
r.rLock.Lock()
for node := r.next; node != nil; node = node.Next {
if ch := node.Current; ch != nil {
ch.DelNode(r.ID)
}
}
r.rLock.Unlock()
}
// OnlineNum the group all online.
func (r *Group) OnlineNum() int32 {
if r.AllOnline > 0 {
return r.AllOnline
}
return r.Online
}