This commit is contained in:
2022-03-17 15:55:27 +08:00
commit bd5a9fad97
92 changed files with 13861 additions and 0 deletions

29
comet/CHANGELOG.md Normal file
View File

@@ -0,0 +1,29 @@
版本号`major.minor.patch`具体规则如下:
- major主版本号如有重大版本重构则该字段递增通常各主版本间接口不兼容。
- minor次版本号各次版本号间接口保持兼容如有接口新增或优化则该字段递增。
- patch补丁号如有功能改善或缺陷修复则该字段递增。
## version 3.0.2 @2021.10.28
优化 log 模块
配置文件更新
新增
```toml
[log]
Level="debug"
Mode="console"
Path=""
Display="json"
```
## example x.x.x @yy.mm.dd
**Feature**
**Bug Fixes**
**Improvement**
**Breaking Change**

159
comet/bucket.go Normal file
View File

@@ -0,0 +1,159 @@
package comet
import (
"sync"
"sync/atomic"
"gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/im/comet/conf"
)
// Bucket is a channel holder.
type Bucket struct {
c *conf.Bucket
cLock sync.RWMutex // protect the channels for chs
chs map[string]*Channel // map sub key to a channel
//group
groups map[string]*Group
routines []chan *grpc.BroadcastGroupReq
routinesNum uint64
}
// NewBucket new a bucket struct. store the key with im channel.
func NewBucket(c *conf.Bucket) (b *Bucket) {
b = new(Bucket)
b.chs = make(map[string]*Channel, c.Channel)
b.c = c
b.groups = make(map[string]*Group, c.Groups)
b.routines = make([]chan *grpc.BroadcastGroupReq, c.RoutineAmount)
for i := uint64(0); i < c.RoutineAmount; i++ {
c := make(chan *grpc.BroadcastGroupReq, c.RoutineSize)
b.routines[i] = c
go b.groupProc(c)
}
return
}
// ChannelCount channel count in the bucket
func (b *Bucket) ChannelCount() int {
return len(b.chs)
}
// Put put a channel according with sub key.
func (b *Bucket) Put(ch *Channel) (err error) {
b.cLock.Lock()
// close old channel
if dch := b.chs[ch.Key]; dch != nil {
dch.Close()
}
b.chs[ch.Key] = ch
b.cLock.Unlock()
return
}
// Del delete the channel by sub key.
func (b *Bucket) Del(dch *Channel) {
var (
ok bool
ch *Channel
)
b.cLock.Lock()
if ch, ok = b.chs[dch.Key]; ok {
if ch == dch {
//修改内容:获取channel下所有添加的群并逐个在群聊中删除该channel修改人dld;修改时间2021年5月8日16:36:00 c4f618a9-1c37-3459-3861-a24f26bb2d85
for id := range ch.Groups() {
if g := b.groups[id]; g != nil {
g.Del(ch)
}
}
//结束c4f618a9-1c37-3459-3861-a24f26bb2d85
delete(b.chs, ch.Key)
}
}
b.cLock.Unlock()
}
// Channel get a channel by sub key.
func (b *Bucket) Channel(key string) (ch *Channel) {
b.cLock.RLock()
ch = b.chs[key]
b.cLock.RUnlock()
return
}
// Broadcast push msgs to all channels in the bucket.
func (b *Bucket) Broadcast(p *grpc.Proto, op int32) {
var ch *Channel
b.cLock.RLock()
for _, ch = range b.chs {
_, _ = ch.Push(p)
}
b.cLock.RUnlock()
}
// group
// GroupCount room count in the bucket
func (b *Bucket) GroupCount() int {
return len(b.groups)
}
// GroupsCount get all group id where online number > 0.
func (b *Bucket) GroupsCount() (res map[string]int32) {
var (
groupID string
group *Group
)
b.cLock.RLock()
res = make(map[string]int32)
for groupID, group = range b.groups {
if group.Online > 0 {
res[groupID] = group.Online
}
}
b.cLock.RUnlock()
return
}
// Put put a group according with sub key.
func (b *Bucket) PutGroup(gid string) (group *Group, err error) {
var ok bool
b.cLock.Lock()
if group, ok = b.groups[gid]; !ok {
group = NewGroup(gid)
b.groups[gid] = group
}
b.cLock.Unlock()
return
}
// Group get a group by group id.
func (b *Bucket) Group(gid string) (group *Group) {
b.cLock.RLock()
group = b.groups[gid]
b.cLock.RUnlock()
return
}
// DelGroup delete a room by group id.
func (b *Bucket) DelGroup(group *Group) {
b.cLock.Lock()
delete(b.groups, group.ID)
b.cLock.Unlock()
group.Close()
}
// BroadcastGroup broadcast a message to specified group
func (b *Bucket) BroadcastGroup(arg *grpc.BroadcastGroupReq) {
num := atomic.AddUint64(&b.routinesNum, 1) % b.c.RoutineAmount
b.routines[num] <- arg
}
// group proc
func (b *Bucket) groupProc(c chan *grpc.BroadcastGroupReq) {
for {
arg := <-c
if group := b.Group(arg.GroupID); group != nil {
group.Push(arg.Proto)
}
}
}

102
comet/channel.go Normal file
View File

@@ -0,0 +1,102 @@
package comet
import (
"errors"
"sync"
"sync/atomic"
"github.com/Terry-Mao/goim/pkg/bufio"
"github.com/golang/protobuf/proto"
"gitlab.33.cn/chat/im/api/comet/grpc"
)
// Channel used by message pusher send msg to write goroutine.
type Channel struct {
CliProto Ring
signal chan *grpc.Proto
Writer bufio.Writer
Reader bufio.Reader
Seq int32
Key string
IP string
Port string
nodes map[string]*Node
mutex sync.RWMutex
}
// NewChannel new a channel.
func NewChannel(cli, svr int) *Channel {
c := new(Channel)
c.CliProto.Init(cli)
c.signal = make(chan *grpc.Proto, svr)
c.nodes = make(map[string]*Node)
return c
}
// Push server push message.
func (c *Channel) seqInc() int32 {
return atomic.AddInt32(&c.Seq, 1)
}
// Push server push message.
func (c *Channel) push(p *grpc.Proto) (err error) {
select {
case c.signal <- p:
default:
}
return
}
// Push server push message.
func (c *Channel) Push(p *grpc.Proto) (seq int32, err error) {
if p, ok := proto.Clone(p).(*grpc.Proto); ok {
if p.Op == int32(grpc.Op_ReceiveMsg) {
p.Seq = c.seqInc()
}
seq = p.Seq
return seq, c.push(p)
} else {
return 0, errors.New("protocol type gRPC proto failed")
}
}
// Ready check the channel ready or close?
func (c *Channel) Ready() *grpc.Proto {
return <-c.signal
}
// Signal send signal to the channel, protocol ready.
func (c *Channel) Signal() {
c.signal <- grpc.ProtoReady
}
// Close close the channel.
func (c *Channel) Close() {
c.signal <- grpc.ProtoFinish
}
// Close close the channel.
func (c Channel) Groups() map[string]*Node {
return c.nodes
}
//
func (c *Channel) DelNode(id string) {
c.mutex.Lock()
delete(c.nodes, id)
c.mutex.Unlock()
}
func (c *Channel) SetNode(id string, node *Node) {
c.mutex.Lock()
c.nodes[id] = node
c.mutex.Unlock()
}
func (c *Channel) GetNode(id string) *Node {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.nodes[id]
}

135
comet/cmd/main.go Normal file
View File

@@ -0,0 +1,135 @@
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"math/rand"
"net"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/Terry-Mao/goim/pkg/ip"
"github.com/opentracing/opentracing-go"
"github.com/rs/zerolog/log"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
xlog "gitlab.33.cn/chat/im-pkg/log"
"gitlab.33.cn/chat/im-pkg/trace"
"gitlab.33.cn/chat/im/comet"
"gitlab.33.cn/chat/im/comet/conf"
"gitlab.33.cn/chat/im/comet/grpc"
"gitlab.33.cn/chat/im/comet/http"
"gitlab.33.cn/chat/im/naming"
)
const (
srvName = "comet"
)
var (
// projectVersion 项目版本
projectVersion = "3.0.3"
// goVersion go版本
goVersion = ""
// gitCommit git提交commit id
gitCommit = ""
// buildTime 编译时间
buildTime = ""
isShowVersion = flag.Bool("version", false, "show project version")
)
// showVersion 显示项目版本信息
func showVersion(isShow bool) {
if isShow {
fmt.Printf("Project: %s\n", srvName)
fmt.Printf(" Version: %s\n", projectVersion)
fmt.Printf(" Go Version: %s\n", goVersion)
fmt.Printf(" Git Commit: %s\n", gitCommit)
fmt.Printf(" Build Time: %s\n", buildTime)
os.Exit(0)
}
}
func main() {
flag.Parse()
showVersion(*isShowVersion)
if err := conf.Init(); err != nil {
panic(err)
}
rand.Seed(time.Now().UTC().UnixNano())
runtime.GOMAXPROCS(runtime.NumCPU())
//log init
var err error
log.Logger, err = xlog.Init(conf.Conf.Log)
if err != nil {
panic(err)
}
log.Logger.With().Str("service", srvName)
byte, _ := json.Marshal(conf.Conf)
log.Info().Str("config", string(byte)).Send()
// trace init
tracer, tracerCloser := trace.Init(srvName, conf.Conf.Trace, config.Logger(jaeger.NullLogger))
//不然后续不会有Jaeger实例
opentracing.SetGlobalTracer(tracer)
srv := comet.New(conf.Conf)
rpcSrv := grpc.New(conf.Conf.RPCServer, srv)
httpSrv := http.Start(":8000", srv)
if err := comet.InitWebsocket(srv, conf.Conf.Websocket.Bind, runtime.NumCPU()); err != nil {
panic(err)
}
if err := comet.InitTCP(srv, conf.Conf.TCP.Bind, runtime.NumCPU()); err != nil {
panic(err)
}
// register comet
_, port, _ := net.SplitHostPort(conf.Conf.RPCServer.Addr)
addr := fmt.Sprintf("%s:%s", ip.InternalIP(), port)
if err := naming.Register(conf.Conf.Reg.RegAddrs, conf.Conf.Reg.SrvName, addr, conf.Conf.Reg.Schema, 15); err != nil {
panic(err)
}
fmt.Println("register ok")
// signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
s := <-c
log.Info().Str("signal", s.String()).Send()
switch s {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
if err := naming.UnRegister(conf.Conf.Reg.SrvName, addr, conf.Conf.Reg.Schema); err != nil {
log.Error().Err(err).Msg("naming.UnRegister")
}
rpcSrv.GracefulStop()
srv.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := httpSrv.Shutdown(ctx); err != nil {
log.Fatal().Err(err).Msg("http server shutdown")
}
if err := tracerCloser.Close(); err != nil {
log.Error().Err(err).Msg("tracer close failed")
}
log.Info().Msg("comet exit")
xlog.Close()
//log.Flush()
return
case syscall.SIGHUP:
default:
return
}
}
}

126
comet/comet.go Normal file
View File

@@ -0,0 +1,126 @@
package comet
import (
"context"
"fmt"
"math/rand"
"net"
"time"
"github.com/Terry-Mao/goim/pkg/ip"
"github.com/zhenjl/cityhash"
"gitlab.33.cn/chat/im-pkg/trace"
comet "gitlab.33.cn/chat/im/api/comet/grpc"
logic "gitlab.33.cn/chat/im/api/logic/grpc"
"gitlab.33.cn/chat/im/comet/conf"
"gitlab.33.cn/chat/im/common"
"gitlab.33.cn/chat/im/naming"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
)
// Comet is comet server.
type Comet struct {
c *conf.Config
round *Round
buckets []*Bucket // subkey bucket
bucketIdx uint32
serverID string
logicRPCClient logic.LogicClient
}
// NewServer returns a new Server.
func New(c *conf.Config) *Comet {
s := &Comet{
c: c,
round: NewRound(c),
logicRPCClient: newLogicClient(c),
}
// init bucket
s.buckets = make([]*Bucket, c.Bucket.Size)
s.bucketIdx = uint32(c.Bucket.Size)
for i := 0; i < c.Bucket.Size; i++ {
s.buckets[i] = NewBucket(c.Bucket)
}
addr := ip.InternalIP()
_, port, _ := net.SplitHostPort(c.RPCServer.Addr)
s.serverID = "grpc://" + addr + ":" + port
return s
}
func newLogicClient(c *conf.Config) logic.LogicClient {
rb := naming.NewResolver(c.Reg.RegAddrs, c.LogicRPCClient.Schema)
resolver.Register(rb)
addr := fmt.Sprintf("%s:///%s", c.LogicRPCClient.Schema, c.LogicRPCClient.SrvName) // "schema://[authority]/service"
fmt.Println("rpc client call addr:", addr)
conn, err := common.NewGRPCConn(addr, time.Duration(c.LogicRPCClient.Dial), grpc.WithUnaryInterceptor(trace.OpentracingClientInterceptor))
if err != nil {
panic(err)
}
return logic.NewLogicClient(conn)
}
// Buckets return all buckets.
func (s *Comet) Buckets() []*Bucket {
return s.buckets
}
// Bucket get the bucket by subkey.
func (s *Comet) Bucket(subKey string) *Bucket {
idx := cityhash.CityHash32([]byte(subKey), uint32(len(subKey))) % s.bucketIdx
return s.buckets[idx]
}
// RandServerHearbeat rand server heartbeat.
func (s *Comet) RandServerHearbeat() time.Duration {
return time.Duration(s.c.Protocol.MinHeartbeat) + time.Duration(rand.Int63n(int64(s.c.Protocol.MaxHeartbeat-s.c.Protocol.MinHeartbeat)))
}
// Close close the server.
func (s *Comet) Close() (err error) {
return
}
// Connect connected a connection.
func (s *Comet) Connect(c context.Context, p *comet.Proto) (key string, hb time.Duration, err error) {
var (
req logic.ConnectReq
reply *logic.ConnectReply
)
req.Server = s.serverID
req.Proto = p
reply, err = s.logicRPCClient.Connect(c, &req)
if err != nil {
return
}
return reply.Key, time.Duration(reply.Heartbeat), nil
}
// Disconnect disconnected a connection.
func (s *Comet) Disconnect(c context.Context, key string) (err error) {
_, err = s.logicRPCClient.Disconnect(context.Background(), &logic.DisconnectReq{
Server: s.serverID,
Key: key,
})
return
}
// Heartbeat heartbeat a connection session.
func (s *Comet) Heartbeat(ctx context.Context, key string) (err error) {
_, err = s.logicRPCClient.Heartbeat(ctx, &logic.HeartbeatReq{
Server: s.serverID,
Key: key,
})
return
}
// Receive receive a message.
func (s *Comet) Receive(ctx context.Context, key string, p *comet.Proto) (err error) {
_, err = s.logicRPCClient.Receive(ctx, &logic.ReceiveReq{Key: key, Proto: p})
return
}

70
comet/conf/comet.toml Normal file
View File

@@ -0,0 +1,70 @@
env="debug"
[log]
Level="debug"
Mode="console"
Path=""
Display="json"
[Trace]
ServiceName=""
Gen128Bit=true
[Trace.Sampler]
Type="const"
Param=1.0
[Trace.Reporter]
LogSpans=true
LocalAgentHostPort="172.16.101.130:6831"
[reg]
schema = "im"
srvName = "comet"
regAddrs = "127.0.0.1:2379"
[logicRPCClient]
schema = "im"
srvName = "logic"
dial = "1s"
timeout = "1s"
[RPCServer]
Network = "tcp"
Addr = ":3109"
Timeout = "1s"
KeepAliveMaxConnectionIdle = "60s"
KeepAliveMaxConnectionAge = "2h"
KeepAliveMaxMaxConnectionAgeGrace = "20s"
KeepAliveTime = "60s"
KeepAliveTimeout = "20s"
[tcp]
bind = [":3101"]
sndbuf = 4096
rcvbuf = 4096
keepalive = false
reader = 32
readBuf = 1024
readBufSize = 8192
writer = 32
writeBuf = 1024
writeBufSize = 8192
[websocket]
bind = [":3102"]
tlsOpen = false
tlsBind = [":3103"]
certFile = "../../cert.pem"
privateFile = "../../private.pem"
[protocol]
timer = 32
timerSize = 2048
svrProto = 10
cliProto = 5
handshakeTimeout = "8s"
minHeartbeat = "5s"
maxHeartbeat = "10s"
[bucket]
size = 32
channel = 1024

209
comet/conf/conf.go Normal file
View File

@@ -0,0 +1,209 @@
package conf
import (
"flag"
"github.com/uber/jaeger-client-go"
"os"
"time"
"github.com/BurntSushi/toml"
xtime "github.com/Terry-Mao/goim/pkg/time"
traceConfig "github.com/uber/jaeger-client-go/config"
xlog "gitlab.33.cn/chat/im-pkg/log"
)
const (
DebugMode = "debug"
ReleaseMode = "release"
TestMode = "test"
)
var (
confPath string
regAddress string
// Conf config
Conf *Config
)
func init() {
var (
defAddress = os.Getenv("REGADDRS")
)
flag.StringVar(&confPath, "conf", "comet.toml", "default config path.")
flag.StringVar(&regAddress, "reg", defAddress, "etcd register addrs. eg:127.0.0.1:2379")
}
// Init init config.
func Init() (err error) {
Conf = Default()
_, err = toml.DecodeFile(confPath, &Conf)
return
}
// Default new a config with specified defualt value.
func Default() *Config {
return &Config{
Env: "",
Log: xlog.Config{
Level: "debug",
Mode: "console",
Path: "",
Display: "console",
},
Trace: traceConfig.Configuration{
ServiceName: "answer",
Gen128Bit: true,
Sampler: &traceConfig.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &traceConfig.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "127.0.0.1:6831",
},
},
Reg: &Reg{
Schema: "im",
SrvName: "comet",
RegAddrs: regAddress,
},
LogicRPCClient: &RPCClient{
Schema: "im",
SrvName: "logic",
Dial: xtime.Duration(time.Second),
Timeout: xtime.Duration(time.Second),
},
RPCServer: &RPCServer{
Network: "tcp",
Addr: ":3109",
Timeout: xtime.Duration(time.Second),
IdleTimeout: xtime.Duration(time.Second * 60),
MaxLifeTime: xtime.Duration(time.Hour * 2),
ForceCloseWait: xtime.Duration(time.Second * 20),
KeepAliveInterval: xtime.Duration(time.Second * 60),
KeepAliveTimeout: xtime.Duration(time.Second * 20),
},
TCP: &TCP{
Bind: []string{":3101"},
Sndbuf: 4096,
Rcvbuf: 4096,
KeepAlive: false,
Reader: 32,
ReadBuf: 1024,
ReadBufSize: 8192,
Writer: 32,
WriteBuf: 1024,
WriteBufSize: 8192,
},
Websocket: &Websocket{
Bind: []string{":3102"},
},
Protocol: &Protocol{
Timer: 32,
TimerSize: 2048,
Task: 32,
TaskSize: 2048,
CliProto: 5,
SvrProto: 10,
HandshakeTimeout: xtime.Duration(time.Second * 5),
TaskDuration: xtime.Duration(time.Second * 5),
MinHeartbeat: xtime.Duration(time.Minute * 10),
MaxHeartbeat: xtime.Duration(time.Minute * 30),
},
Bucket: &Bucket{
Size: 32,
Channel: 1024,
Groups: 1024,
RoutineAmount: 32,
RoutineSize: 1024,
},
}
}
// Config is comet config.
type Config struct {
Env string
Log xlog.Config
Trace traceConfig.Configuration
Reg *Reg
TCP *TCP
Websocket *Websocket
Protocol *Protocol
Bucket *Bucket
LogicRPCClient *RPCClient
RPCServer *RPCServer
}
// Reg is service register/discovery config
type Reg struct {
Schema string
SrvName string // call
RegAddrs string // etcd addrs, seperate by ','
}
// RPCClient is RPC client config.
type RPCClient struct {
Schema string
SrvName string // call
Dial xtime.Duration
Timeout xtime.Duration
}
// RPCServer is RPC server config.
type RPCServer struct {
Network string
Addr string
Timeout xtime.Duration
IdleTimeout xtime.Duration
MaxLifeTime xtime.Duration
ForceCloseWait xtime.Duration
KeepAliveInterval xtime.Duration
KeepAliveTimeout xtime.Duration
}
// TCP is tcp config.
type TCP struct {
Bind []string
Sndbuf int
Rcvbuf int
KeepAlive bool
Reader int
ReadBuf int
ReadBufSize int
Writer int
WriteBuf int
WriteBufSize int
}
// Websocket is websocket config.
type Websocket struct {
Bind []string
TLSOpen bool
TLSBind []string
CertFile string
PrivateFile string
}
// Protocol is protocol config.
type Protocol struct {
Timer int
TimerSize int
Task int
TaskSize int
SvrProto int
CliProto int
HandshakeTimeout xtime.Duration
MinHeartbeat xtime.Duration
MaxHeartbeat xtime.Duration
TaskDuration xtime.Duration
}
// Bucket is bucket config.
type Bucket struct {
Size int
Channel int
Groups int
RoutineAmount uint64
RoutineSize int
}

34
comet/errors/errors.go Normal file
View File

@@ -0,0 +1,34 @@
package errors
import (
"errors"
)
// .
var (
// server
ErrHandshake = errors.New("handshake failed")
ErrOperation = errors.New("request operation not valid")
// ring
ErrRingEmpty = errors.New("ring buffer empty")
ErrRingFull = errors.New("ring buffer full")
// timer
ErrTimerFull = errors.New("timer full")
ErrTimerEmpty = errors.New("timer empty")
ErrTimerNoItem = errors.New("timer item not exist")
// channel
ErrUnconnected = errors.New("client unconnected error")
ErrJoinGroupArg = errors.New("rpc joingroup arg error")
ErrPushMsgArg = errors.New("rpc pushmsg arg error")
ErrPushMsgsArg = errors.New("rpc pushmsgs arg error")
ErrMPushMsgArg = errors.New("rpc mpushmsg arg error")
ErrMPushMsgsArg = errors.New("rpc mpushmsgs arg error")
// bucket
ErrBroadCastArg = errors.New("rpc broadcast arg error")
ErrBroadCastRoomArg = errors.New("rpc broadcast room arg error")
// group
ErrGroupDroped = errors.New("group droped")
// rpc
ErrLogic = errors.New("logic rpc is not available")
)

119
comet/group.go Normal file
View File

@@ -0,0 +1,119 @@
package comet
import (
"fmt"
"sync"
"gitlab.33.cn/chat/im/api/comet/grpc"
)
// Group is a group and store channel group info.
type Group struct {
ID string
rLock sync.RWMutex
next *Node
drop bool
Online int32 // dirty read is ok
AllOnline int32
}
// NewGroup new a group struct, store channel group info.
func NewGroup(id string) (r *Group) {
r = new(Group)
r.ID = id
r.drop = false
r.next = nil
r.Online = 0
return
}
// Put put channel into the group.
func (r *Group) Put(ch *Channel) (err error) {
r.rLock.Lock()
if !r.drop && ch.GetNode(r.ID) == nil {
node := &Node{
Current: ch,
Next: nil,
Prev: nil,
}
ch.SetNode(r.ID, node)
if r.next != nil {
r.next.Prev = node
}
node.Next = r.next
node.Prev = nil
r.next = node // insert to header
r.Online++
} /* else { //del: 2021年7月21日16:32:54 dld
err = errors.ErrGroupDroped
}*/
r.rLock.Unlock()
return
}
// Del delete channel from the group.
func (r *Group) Del(ch *Channel) bool {
r.rLock.Lock()
if node := ch.GetNode(r.ID); node != nil {
if node.Next != nil {
// if not footer
node.Next.Prev = node.Prev
}
if node.Prev != nil {
// if not header
node.Prev.Next = node.Next
} else {
r.next = node.Next
}
r.Online--
//r.drop = (r.Online == 0)
ch.DelNode(r.ID) //2021年6月10日 删除对应node防止再次put的时候报ErrGroupDroped错误dld
}
r.rLock.Unlock()
return r.drop
}
// Push push msg to the group, if chan full discard it.
func (r *Group) Push(p *grpc.Proto) {
r.rLock.RLock()
for node := r.next; node != nil; node = node.Next {
if node.Current != nil {
_, _ = node.Current.Push(p)
}
}
r.rLock.RUnlock()
}
// group members Key,IP
func (r *Group) Members() ([]string, []string) {
r.rLock.RLock()
members := make([]string, 0)
mIp := make([]string, 0)
for node := r.next; node != nil; node = node.Next {
if node.Current != nil {
members = append(members, node.Current.Key)
mIp = append(mIp, fmt.Sprintf("%v:%v", node.Current.IP, node.Current.Port))
}
}
r.rLock.RUnlock()
return members, mIp
}
// Close close the group.
func (r *Group) Close() {
r.rLock.Lock()
for node := r.next; node != nil; node = node.Next {
if ch := node.Current; ch != nil {
ch.DelNode(r.ID)
}
}
r.rLock.Unlock()
}
// OnlineNum the group all online.
func (r *Group) OnlineNum() int32 {
if r.AllOnline > 0 {
return r.AllOnline
}
return r.Online
}

121
comet/group_test.go Normal file
View File

@@ -0,0 +1,121 @@
package comet
import (
"strconv"
"sync/atomic"
"testing"
"time"
)
func TestGroup_Put(t *testing.T) {
//one channel
var gid int32 = 0
ch := NewChannel(10, 10)
//one groups
{
g := NewGroup(strconv.Itoa(int(atomic.AddInt32(&gid, 1))))
for i := 0; i < 10; i++ {
go func() {
for {
err := g.Put(ch)
if err != nil {
t.Error(err)
}
}
}()
}
}
//many groups
{
for i := 0; i < 10; i++ {
g := NewGroup(strconv.Itoa(int(atomic.AddInt32(&gid, 1))))
go func() {
for {
err := g.Put(ch)
if err != nil {
t.Error(err)
}
}
}()
}
}
time.Sleep(10 * time.Second)
}
func TestGroup_Del(t *testing.T) {
//one channel
var gid int32 = 0
//var lc sync.RWMutex
var groups = make(map[int32]*Group)
ch := NewChannel(10, 10)
//init
for i := 0; i < 10; i++ {
g := NewGroup(strconv.Itoa(int(atomic.AddInt32(&gid, 1))))
groups[gid] = g
}
gid = 0
go func() {
for _, g := range groups {
err := g.Put(ch)
if err != nil {
t.Error(err)
}
}
}()
go func() {
for _, g := range groups {
g.Del(ch)
}
}()
time.Sleep(10 * time.Second)
}
func TestGroup_Del2(t *testing.T) {
//one channel
var gid int32 = 0
//var lc sync.RWMutex
var groups = make(map[int32]*Group)
ch := NewChannel(10, 10)
//init
for i := 0; i < 10; i++ {
g := NewGroup(strconv.Itoa(int(atomic.AddInt32(&gid, 1))))
groups[gid] = g
}
gid = 0
go func() {
for _, g := range groups {
err := g.Put(ch)
if err != nil {
t.Error(err)
}
}
}()
go func() {
for _, g := range groups {
g.Del(ch)
}
}()
time.Sleep(10 * time.Second)
}
func convertAddress(addr string) string {
if len(addr) == 0 {
return addr
}
switch addr[0] {
case 'g':
return addr[7:]
default:
return addr
}
}
func Test_convertAddress(t *testing.T) {
t.Log(convertAddress("grpc://172.16.101.107:3109"))
}

152
comet/grpc/server.go Normal file
View File

@@ -0,0 +1,152 @@
package grpc
import (
"context"
"gitlab.33.cn/chat/im-pkg/trace"
"github.com/rs/zerolog/log"
"net"
"time"
pb "gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/im/comet"
"gitlab.33.cn/chat/im/comet/conf"
"gitlab.33.cn/chat/im/comet/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
// New comet grpc server.
func New(c *conf.RPCServer, s *comet.Comet) *grpc.Server {
keepParams := grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: time.Duration(c.IdleTimeout),
MaxConnectionAgeGrace: time.Duration(c.ForceCloseWait),
Time: time.Duration(c.KeepAliveInterval),
Timeout: time.Duration(c.KeepAliveTimeout),
MaxConnectionAge: time.Duration(c.MaxLifeTime),
})
connectionTimeout := grpc.ConnectionTimeout(time.Duration(c.Timeout))
srv := grpc.NewServer(keepParams, connectionTimeout,
grpc.ChainUnaryInterceptor(
trace.OpentracingServerInterceptor,
))
pb.RegisterCometServer(srv, &server{srv: s})
lis, err := net.Listen(c.Network, c.Addr)
if err != nil {
panic(err)
}
go func() {
if err := srv.Serve(lis); err != nil {
panic(err)
}
}()
return srv
}
type server struct {
pb.UnimplementedCometServer
srv *comet.Comet
}
// PushMsg push a message to specified sub keys.
func (s *server) PushMsg(ctx context.Context, req *pb.PushMsgReq) (reply *pb.PushMsgReply, err error) {
if len(req.Keys) == 0 || req.Proto == nil {
return nil, errors.ErrPushMsgArg
}
index := make(map[string]int32)
var seq int32
for _, key := range req.Keys {
if channel := s.srv.Bucket(key).Channel(key); channel != nil {
if seq, err = channel.Push(req.Proto); err != nil {
return
}
index[key] = seq
}
}
return &pb.PushMsgReply{Index: index}, nil
}
// Broadcast broadcast msg to all user.
func (s *server) Broadcast(ctx context.Context, req *pb.BroadcastReq) (*pb.BroadcastReply, error) {
if req.Proto == nil {
return nil, errors.ErrBroadCastArg
}
// TODO use broadcast queue
go func() {
for _, bucket := range s.srv.Buckets() {
bucket.Broadcast(req.GetProto(), req.ProtoOp)
}
}()
return &pb.BroadcastReply{}, nil
}
func (s *server) BroadcastGroup(ctx context.Context, req *pb.BroadcastGroupReq) (*pb.BroadcastGroupReply, error) {
if req.Proto == nil || req.GroupID == "" {
return nil, errors.ErrBroadCastArg
}
for _, bucket := range s.srv.Buckets() {
bucket.BroadcastGroup(req)
}
return &pb.BroadcastGroupReply{}, nil
}
func (s *server) JoinGroups(ctx context.Context, req *pb.JoinGroupsReq) (*pb.JoinGroupsReply, error) {
if len(req.Keys) == 0 || len(req.Gid) == 0 {
return nil, errors.ErrJoinGroupArg
}
for _, key := range req.Keys {
var channel *comet.Channel
bucket := s.srv.Bucket(key)
if channel = bucket.Channel(key); channel == nil {
log.Error().Str("key", key).Msg("JoinGroups get channel err")
continue
//return &pb.JoinGroupsReply{}, errors.ErrUnconnected todo 2021_12_08_14_36:
}
for _, gid := range req.Gid {
var group *comet.Group
if group = bucket.Group(gid); group == nil {
group, _ = bucket.PutGroup(gid)
}
err := group.Put(channel)
if err != nil {
log.Error().Err(err).Str("key", key).Str("gid", gid).
Int32("channel.Seq", channel.Seq).Str("channel.Key", channel.Key).Str("channel.Ip", channel.IP).Str("channel.Port", channel.Port).
Msg("JoinGroups get channel err")
continue
//return &pb.JoinGroupsReply{}, err todo 2021_12_08_14_36:
}
}
}
return &pb.JoinGroupsReply{}, nil
}
func (s *server) LeaveGroups(ctx context.Context, req *pb.LeaveGroupsReq) (*pb.LeaveGroupsReply, error) {
if len(req.Keys) == 0 || len(req.Gid) == 0 {
return nil, errors.ErrPushMsgArg
}
for _, key := range req.Keys {
var channel *comet.Channel
bucket := s.srv.Bucket(key)
if channel = bucket.Channel(key); channel == nil {
continue
//return &pb.LeaveGroupsReply{}, errors.ErrUnconnected todo 2021_12_08_14_36:
}
for _, gid := range req.Gid {
if group := bucket.Group(gid); group != nil {
group.Del(channel)
}
}
}
return &pb.LeaveGroupsReply{}, nil
}
func (s *server) DelGroups(ctx context.Context, req *pb.DelGroupsReq) (*pb.DelGroupsReply, error) {
for _, gid := range req.Gid {
for _, bucket := range s.srv.Buckets() {
if g := bucket.Group(gid); g != nil {
bucket.DelGroup(g)
}
}
}
return &pb.DelGroupsReply{}, nil
}

36
comet/http/result.go Normal file
View File

@@ -0,0 +1,36 @@
package http
import "github.com/gin-gonic/gin"
const (
// OK ok
OK = 0
// RequestErr request error
RequestErr = -400
// ServerErr server error
ServerErr = -500
contextErrCode = "context/err/code"
)
type resp struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
func errors(c *gin.Context, code int, msg string) {
c.Set(contextErrCode, code)
c.JSON(200, resp{
Code: code,
Message: msg,
})
}
func result(c *gin.Context, data interface{}, code int) {
c.Set(contextErrCode, code)
c.JSON(200, resp{
Code: code,
Data: data,
})
}

46
comet/http/server.go Normal file
View File

@@ -0,0 +1,46 @@
package http
import (
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"gitlab.33.cn/chat/im/comet"
"gitlab.33.cn/chat/im/comet/conf"
"net/http"
)
var (
srv *comet.Comet
)
func Start(addr string, s *comet.Comet) *http.Server {
srv = s
gin.ForceConsoleColor()
switch conf.Conf.Env {
case conf.DebugMode:
gin.SetMode(gin.DebugMode)
case conf.ReleaseMode:
gin.SetMode(gin.ReleaseMode)
}
engine := gin.Default()
SetupEngine(engine)
pprof.Register(engine)
srv := &http.Server{
Addr: addr,
Handler: engine,
}
go func() {
// service connections
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal().Err(err).Msg("http listen failed")
}
}()
return srv
}
func SetupEngine(e *gin.Engine) *gin.Engine {
e.GET("/statics", Statics)
e.GET("/groupDetails", GroupDetails)
return e
}

75
comet/http/statics.go Normal file
View File

@@ -0,0 +1,75 @@
package http
import "github.com/gin-gonic/gin"
func Statics(c *gin.Context) {
var arg struct {
Drop bool `form:"isDrop"`
}
if err := c.BindQuery(&arg); err != nil {
errors(c, RequestErr, err.Error())
return
}
res := map[string]interface{}{
"buckets": groupsInfo(arg.Drop),
}
result(c, res, OK)
}
func groupsInfo(isDrop bool) []map[string]interface{} {
var res = make([]map[string]interface{}, len(srv.Buckets()))
for i, bucket := range srv.Buckets() {
if !isDrop {
if bucket.GroupCount() == 0 {
continue
}
if len(bucket.GroupsCount()) == 0 {
continue
}
}
item := map[string]interface{}{
"counts": bucket.GroupCount(),
"group-members": bucket.GroupsCount(),
}
res[i] = item
}
return res
}
func GroupDetails(c *gin.Context) {
var arg struct {
Groups []string `form:"groups" binding:"required"`
}
if err := c.BindQuery(&arg); err != nil {
errors(c, RequestErr, err.Error())
return
}
var groups = make([]interface{}, 0)
for _, gid := range arg.Groups {
gInfo := map[string]interface{}{
"gid": gid,
"members": groupsMembers(gid),
}
groups = append(groups, gInfo)
}
res := map[string]interface{}{
"groups": groups,
}
result(c, res, OK)
}
func groupsMembers(gid string) map[string]string {
members := make(map[string]string, 0)
for _, bucket := range srv.Buckets() {
g := bucket.Group(gid)
if g == nil {
continue
}
mems, ips := g.Members()
for i, mem := range mems {
members[mem] = ips[i]
}
}
return members
}

7
comet/nodes.go Normal file
View File

@@ -0,0 +1,7 @@
package comet
type Node struct {
Current *Channel
Next *Node
Prev *Node
}

44
comet/operation.go Normal file
View File

@@ -0,0 +1,44 @@
package comet
import (
"context"
"strconv"
"gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/im/dtask"
)
// Operate operate.
func (s *Comet) Operate(ctx context.Context, p *grpc.Proto, ch *Channel, tsk *dtask.Task) error {
switch p.Op {
case int32(grpc.Op_SendMsg):
//标明Ack的消息序列
p.Ack = p.Seq
err := s.Receive(ctx, ch.Key, p)
if err != nil {
//下层业务调用失败返回error的话会直接断开连接
return err
}
//p.Op = int32(grpc.Op_SendMsgReply)
case int32(grpc.Op_ReceiveMsgReply):
//从task中删除某一条
if j := tsk.Get(strconv.FormatInt(int64(p.Ack), 10)); j != nil {
j.Cancel()
}
err := s.Receive(ctx, ch.Key, p)
if err != nil {
//下层业务调用失败返回error的话会直接断开连接
return err
}
case int32(grpc.Op_SyncMsgReq):
err := s.Receive(ctx, ch.Key, p)
if err != nil {
//下层业务调用失败返回error的话会直接断开连接
return err
}
p.Op = int32(grpc.Op_SyncMsgReply)
default:
return s.Receive(ctx, ch.Key, p)
}
return nil
}

80
comet/ring.go Normal file
View File

@@ -0,0 +1,80 @@
package comet
import (
"gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/im/comet/errors"
)
// Ring ring proto buffer.
type Ring struct {
// read
rp uint64
num uint64
mask uint64
// TODO split cacheline, many cpu cache line size is 64
// pad [40]byte
// write
wp uint64
data []grpc.Proto
}
// NewRing new a ring buffer.
func NewRing(num int) *Ring {
r := new(Ring)
r.init(uint64(num))
return r
}
// Init init ring.
func (r *Ring) Init(num int) {
r.init(uint64(num))
}
func (r *Ring) init(num uint64) {
// 2^N
if num&(num-1) != 0 {
for num&(num-1) != 0 {
num &= (num - 1)
}
num = num << 1
}
r.data = make([]grpc.Proto, num)
r.num = num
r.mask = r.num - 1
}
// Get get a proto from ring.
func (r *Ring) Get() (proto *grpc.Proto, err error) {
if r.rp == r.wp {
return nil, errors.ErrRingEmpty
}
proto = &r.data[r.rp&r.mask]
return
}
// GetAdv incr read index.
func (r *Ring) GetAdv() {
r.rp++
}
// Set get a proto to write.
func (r *Ring) Set() (proto *grpc.Proto, err error) {
if r.wp-r.rp >= r.num {
return nil, errors.ErrRingFull
}
proto = &r.data[r.wp&r.mask]
return
}
// SetAdv incr write index.
func (r *Ring) SetAdv() {
r.wp++
}
// Reset reset ring.
func (r *Ring) Reset() {
r.rp = 0
r.wp = 0
// prevent pad compiler optimization
// r.pad = [40]byte{}
}

78
comet/round.go Normal file
View File

@@ -0,0 +1,78 @@
package comet
import (
"github.com/Terry-Mao/goim/pkg/bytes"
"github.com/Terry-Mao/goim/pkg/time"
"gitlab.33.cn/chat/im/comet/conf"
)
// RoundOptions round options.
type RoundOptions struct {
Timer int
TimerSize int
Reader int
ReadBuf int
ReadBufSize int
Writer int
WriteBuf int
WriteBufSize int
Task int
TaskSize int
}
// Round userd for connection round-robin get a reader/writer/timer for split big lock.
type Round struct {
readers []bytes.Pool
writers []bytes.Pool
timers []time.Timer
options RoundOptions
}
// NewRound new a round struct.
func NewRound(c *conf.Config) (r *Round) {
var i int
r = &Round{
options: RoundOptions{
Reader: c.TCP.Reader,
ReadBuf: c.TCP.ReadBuf,
ReadBufSize: c.TCP.ReadBufSize,
Writer: c.TCP.Writer,
WriteBuf: c.TCP.WriteBuf,
WriteBufSize: c.TCP.WriteBufSize,
Timer: c.Protocol.Timer,
TimerSize: c.Protocol.TimerSize,
Task: c.Protocol.Task,
TaskSize: c.Protocol.TaskSize,
}}
// reader
r.readers = make([]bytes.Pool, r.options.Reader)
for i = 0; i < r.options.Reader; i++ {
r.readers[i].Init(r.options.ReadBuf, r.options.ReadBufSize)
}
// writer
r.writers = make([]bytes.Pool, r.options.Writer)
for i = 0; i < r.options.Writer; i++ {
r.writers[i].Init(r.options.WriteBuf, r.options.WriteBufSize)
}
// timer
r.timers = make([]time.Timer, r.options.Timer)
for i = 0; i < r.options.Timer; i++ {
r.timers[i].Init(r.options.TimerSize)
}
return
}
// Timer get a timer.
func (r *Round) Timer(rn int) *time.Timer {
return &(r.timers[rn%r.options.Timer])
}
// Reader get a reader memory buffer.
func (r *Round) Reader(rn int) *bytes.Pool {
return &(r.readers[rn%r.options.Reader])
}
// Writer get a writer memory buffer pool.
func (r *Round) Writer(rn int) *bytes.Pool {
return &(r.writers[rn%r.options.Writer])
}

304
comet/tcp.go Normal file
View File

@@ -0,0 +1,304 @@
package comet
import (
"context"
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
"github.com/Terry-Mao/goim/pkg/bufio"
"github.com/Terry-Mao/goim/pkg/bytes"
xtime "github.com/Terry-Mao/goim/pkg/time"
"github.com/golang/protobuf/proto"
"github.com/rs/zerolog/log"
"gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/im/dtask"
)
// InitTCP listen all tcp.bind and start accept connections.
func InitTCP(server *Comet, addrs []string, accept int) (err error) {
var (
bind string
listener *net.TCPListener
addr *net.TCPAddr
)
for _, bind = range addrs {
if addr, err = net.ResolveTCPAddr("tcp", bind); err != nil {
log.Error().Stack().Err(err).Msg(fmt.Sprintf("net.ResolveTCPAddr(tcp, %s)", bind))
return
}
if listener, err = net.ListenTCP("tcp", addr); err != nil {
log.Error().Stack().Err(err).Msg(fmt.Sprintf("net.ListenTCP(tcp, %s)", bind))
return
}
log.Info().Str("bind", bind).Msg("start tcp listen")
// split N core accept
for i := 0; i < accept; i++ {
go acceptTCP(server, listener)
}
}
return
}
// Accept accepts connections on the listener and serves requests
// for each incoming connection. Accept blocks; the caller typically
// invokes it in a go statement.
func acceptTCP(server *Comet, lis *net.TCPListener) {
var (
conn *net.TCPConn
err error
r int
)
for {
if conn, err = lis.AcceptTCP(); err != nil {
// if listener close then return
log.Error().Stack().Err(err).Msg(fmt.Sprintf("listener.Accept(\"%s\")", lis.Addr().String()))
continue
//return
}
log.Info().Str("remoteIP", conn.RemoteAddr().String()).Msg("accept tcp conn")
if err = conn.SetKeepAlive(server.c.TCP.KeepAlive); err != nil {
log.Error().Stack().Err(err).Msg("conn.SetKeepAlive()")
return
}
if err = conn.SetReadBuffer(server.c.TCP.Rcvbuf); err != nil {
log.Error().Stack().Err(err).Msg("conn.SetReadBuffer()")
return
}
if err = conn.SetWriteBuffer(server.c.TCP.Sndbuf); err != nil {
log.Error().Stack().Err(err).Msg("conn.SetWriteBuffer()")
return
}
go serveTCP(server, conn, r)
if r++; r == maxInt {
r = 0
}
}
}
func serveTCP(s *Comet, conn *net.TCPConn, r int) {
var (
// timer
tr = s.round.Timer(r)
rp = s.round.Reader(r)
wp = s.round.Writer(r)
)
s.ServeTCP(conn, rp, wp, tr)
}
// ServeTCP serve a tcp connection.
func (s *Comet) ServeTCP(conn *net.TCPConn, rp, wp *bytes.Pool, tr *xtime.Timer) {
var (
err error
hb time.Duration
p *grpc.Proto
b *Bucket
trd *xtime.TimerData
lastHb = time.Now()
rb = rp.Get()
wb = wp.Get()
ch = NewChannel(s.c.Protocol.CliProto, s.c.Protocol.SvrProto)
rr = &ch.Reader
wr = &ch.Writer
tsk *dtask.Task
)
ch.Reader.ResetBuffer(conn, rb.Bytes())
ch.Writer.ResetBuffer(conn, wb.Bytes())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// handshake
step := 0
trd = tr.Add(time.Duration(s.c.Protocol.HandshakeTimeout), func() {
conn.Close()
log.Error().Int("step", step).Str("key", ch.Key).Str("remoteIP", conn.RemoteAddr().String()).Msg("tcp handshake timeout")
})
ch.IP, ch.Port, _ = net.SplitHostPort(conn.RemoteAddr().String())
// must not setadv, only used in auth
step = 1
if p, err = ch.CliProto.Set(); err == nil {
if ch.Key, hb, err = s.authTCP(ctx, rr, wr, p); err == nil {
log.Info().Str("key", ch.Key).Str("remoteIP", conn.RemoteAddr().String()).Msg("authoried")
b = s.Bucket(ch.Key)
err = b.Put(ch)
}
}
step = 2
if err != nil {
conn.Close()
rp.Put(rb)
wp.Put(wb)
tr.Del(trd)
log.Error().Str("key", ch.Key).Err(err).Msg("handshake failed")
return
}
trd.Key = ch.Key
tr.Set(trd, hb)
step = 3
// hanshake ok start dispatch goroutine
tsk = dtask.NewTask()
go s.dispatchTCP(conn, wr, wp, wb, ch, tsk)
serverHeartbeat := s.RandServerHearbeat()
for {
if p, err = ch.CliProto.Set(); err != nil {
break
}
if err = p.ReadTCP(rr); err != nil {
log.Info().Err(err).Msg("ReadTCP failed")
break
}
if p.Op == int32(grpc.Op_Heartbeat) {
tr.Set(trd, hb)
p.Op = int32(grpc.Op_HeartbeatReply)
p.Body = nil
// NOTE: send server heartbeat for a long time
if now := time.Now(); now.Sub(lastHb) > serverHeartbeat {
if err1 := s.Heartbeat(ctx, ch.Key); err1 == nil {
lastHb = now
}
}
step++
} else {
if err = s.Operate(ctx, p, ch, tsk); err != nil {
break
}
}
// msg sent from client will be dispatched to client itself
ch.CliProto.SetAdv()
ch.Signal()
}
if err != nil && err != io.EOF && !strings.Contains(err.Error(), "closed") {
log.Error().Str("key", ch.Key).Err(err).Msg("server tcp failed")
}
b.Del(ch)
tr.Del(trd)
rp.Put(rb)
conn.Close()
ch.Close()
if err = s.Disconnect(ctx, ch.Key); err != nil {
log.Error().Str("key", ch.Key).Err(err).Msg("operator do disconnect")
}
}
// dispatch accepts connections on the listener and serves requests
// for each incoming connection. dispatch blocks; the caller typically
// invokes it in a go statement.
func (s *Comet) dispatchTCP(conn *net.TCPConn, wr *bufio.Writer, wp *bytes.Pool, wb *bytes.Buffer, ch *Channel, tsk *dtask.Task) {
var (
err error
finish bool
online int32
)
for {
var p = ch.Ready()
switch p {
case grpc.ProtoFinish:
finish = true
goto failed
case grpc.ProtoReady:
// fetch message from svrbox(client send)
for {
if p, err = ch.CliProto.Get(); err != nil {
break
}
if p.Op == int32(grpc.Op_HeartbeatReply) {
if err = p.WriteTCPHeart(wr, online); err != nil {
goto failed
}
} else if p.Op == int32(grpc.Op_ReceiveMsgReply) {
//skip
} else if p.Op == int32(grpc.Op_SendMsg) {
//skip
} else {
if err = p.WriteTCP(wr); err != nil {
goto failed
}
}
p.Body = nil // avoid memory leak
ch.CliProto.GetAdv()
}
default:
switch p.Op {
case int32(grpc.Op_SendMsgReply):
if err = p.WriteTCP(wr); err != nil {
goto failed
}
case int32(grpc.Op_RePush):
pro := proto.Clone(p)
if p, ok := pro.(*grpc.Proto); ok {
p.Op = int32(grpc.Op_ReceiveMsg)
if err = p.WriteTCP(wr); err != nil {
goto failed
}
} else {
log.Error().Msg("proto can`t clone Proto")
}
case int32(grpc.Op_ReceiveMsg):
if err = p.WriteTCP(wr); err != nil {
goto failed
}
p.Op = int32(grpc.Op_RePush)
seq := strconv.FormatInt(int64(p.Seq), 10)
if j := tsk.Get(seq); j != nil {
continue
}
//push into task pool
job, inserted := tsk.AddJobRepeat(time.Second*5, 0, func() {
if _, err = ch.Push(p); err != nil {
log.Error().Err(err).Msg("task job ch.Push error")
return
}
})
if !inserted {
log.Error().Err(err).Msg("tsk.AddJobRepeat error")
goto failed
}
tsk.Add(seq, job)
default:
continue
}
}
// only hungry flush response
if err = wr.Flush(); err != nil {
break
}
}
failed:
if err != nil {
log.Error().Str("key", ch.Key).Err(err).Msg("dispatch tcp failed")
}
tsk.Stop()
conn.Close()
wp.Put(wb)
// must ensure all channel message discard, for reader won't blocking Signal
for !finish {
finish = (ch.Ready() == grpc.ProtoFinish)
}
}
// auth for goim handshake with client, use rsa & aes.
func (s *Comet) authTCP(ctx context.Context, rr *bufio.Reader, wr *bufio.Writer, p *grpc.Proto) (key string, hb time.Duration, err error) {
for {
if err = p.ReadTCP(rr); err != nil {
return
}
if p.Op == int32(grpc.Op_Auth) {
break
} else {
log.Error().Int32("option", p.Op).Msg("tcp request option not auth")
}
}
if key, hb, err = s.Connect(ctx, p); err != nil {
log.Error().Err(err).Msg("can not call logic.Connect")
return
}
p.Op = int32(grpc.Op_AuthReply)
p.Body = nil
if err = p.WriteTCP(wr); err != nil {
return
}
err = wr.Flush()
return
}

341
comet/ws.go Normal file
View File

@@ -0,0 +1,341 @@
package comet
import (
"context"
"fmt"
"io"
"net"
"runtime"
"strconv"
"strings"
"time"
"github.com/Terry-Mao/goim/pkg/bytes"
xtime "github.com/Terry-Mao/goim/pkg/time"
"github.com/Terry-Mao/goim/pkg/websocket"
"github.com/rs/zerolog/log"
"gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/im/dtask"
)
const (
maxInt = 1<<31 - 1
)
// InitWebsocket listen all tcp.bind and start accept connections.
func InitWebsocket(server *Comet, addrs []string, accept int) (err error) {
var (
bind string
listener *net.TCPListener
addr *net.TCPAddr
)
for _, bind = range addrs {
if addr, err = net.ResolveTCPAddr("tcp", bind); err != nil {
log.Error().Stack().Err(err).Msg(fmt.Sprintf("net.ResolveTCPAddr(tcp, %s)", bind))
return
}
if listener, err = net.ListenTCP("tcp", addr); err != nil {
log.Error().Stack().Err(err).Msg(fmt.Sprintf("net.ListenTCP(tcp, %s)", bind))
return
}
log.Info().Str("bind", bind).Msg("start ws listen")
// split N core accept
for i := 0; i < accept; i++ {
go acceptWebsocket(server, listener)
}
}
return
}
// Accept accepts connections on the listener and serves requests
// for each incoming connection. Accept blocks; the caller typically
// invokes it in a go statement.
func acceptWebsocket(server *Comet, lis *net.TCPListener) {
defer func() {
buf := make([]byte, 1024*3)
runtime.Stack(buf, false)
log.Error().Str("panic", string(buf)).Msg("acceptWebsocket done")
if r := recover(); r != nil {
buf := make([]byte, 1024*3)
runtime.Stack(buf, false)
log.Error().Interface("recover", r).Str("panic", string(buf)).Msg("Recovered in acceptWebsocket")
}
}()
var (
conn *net.TCPConn
err error
r int
)
for {
if conn, err = lis.AcceptTCP(); err != nil {
// if listener close then return
log.Error().Stack().Err(err).Msg(fmt.Sprintf("listener.Accept(\"%s\")", lis.Addr().String()))
continue
//return
}
log.Info().Str("remoteIP", conn.RemoteAddr().String()).Msg("accept ws conn")
if err = conn.SetKeepAlive(server.c.TCP.KeepAlive); err != nil {
log.Error().Stack().Err(err).Msg("conn.SetKeepAlive()")
return
}
if err = conn.SetReadBuffer(server.c.TCP.Rcvbuf); err != nil {
log.Error().Stack().Err(err).Msg("conn.SetReadBuffer()")
return
}
if err = conn.SetWriteBuffer(server.c.TCP.Sndbuf); err != nil {
log.Error().Stack().Err(err).Msg("conn.SetWriteBuffer()")
return
}
go serveWebsocket(server, conn, r)
if r++; r == maxInt {
r = 0
}
}
}
func serveWebsocket(s *Comet, conn net.Conn, r int) {
var (
// timer
tr = s.round.Timer(r)
rp = s.round.Reader(r)
wp = s.round.Writer(r)
)
s.ServeWebsocket(conn, rp, wp, tr)
}
// ServeWebsocket serve a websocket connection.
func (s *Comet) ServeWebsocket(conn net.Conn, rp, wp *bytes.Pool, tr *xtime.Timer) {
var (
err error
hb time.Duration
p *grpc.Proto
b *Bucket
trd *xtime.TimerData
lastHB = time.Now()
rb = rp.Get()
ch = NewChannel(s.c.Protocol.CliProto, s.c.Protocol.SvrProto)
rr = &ch.Reader
wr = &ch.Writer
ws *websocket.Conn // websocket
req *websocket.Request
tsk *dtask.Task
)
// reader
ch.Reader.ResetBuffer(conn, rb.Bytes())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// handshake
step := 0
trd = tr.Add(time.Duration(s.c.Protocol.HandshakeTimeout), func() {
// NOTE: fix close block for tls
_ = conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_ = conn.Close()
log.Error().Int("step", step).Str("key", ch.Key).Str("remoteIP", conn.RemoteAddr().String()).Msg("ws handshake timeout")
})
// websocket
ch.IP, ch.Port, _ = net.SplitHostPort(conn.RemoteAddr().String())
step = 1
if req, err = websocket.ReadRequest(rr); err != nil || req.RequestURI != "/sub" {
conn.Close()
tr.Del(trd)
rp.Put(rb)
if err != io.EOF {
log.Error().Err(err).Msg("http.ReadRequest(rr)")
}
return
}
// writer
wb := wp.Get()
ch.Writer.ResetBuffer(conn, wb.Bytes())
step = 2
if ws, err = websocket.Upgrade(conn, rr, wr, req); err != nil {
conn.Close()
tr.Del(trd)
rp.Put(rb)
wp.Put(wb)
if err != io.EOF {
log.Error().Err(err).Msg("websocket.NewServerConn")
}
return
}
// must not setadv, only used in auth
step = 3
if p, err = ch.CliProto.Set(); err == nil {
if ch.Key, hb, err = s.authWebsocket(ctx, ws, p); err == nil {
log.Info().Str("key", ch.Key).Str("remoteIP", conn.RemoteAddr().String()).Msg("authoried")
b = s.Bucket(ch.Key)
err = b.Put(ch)
}
}
step = 4
if err != nil {
ws.Close()
rp.Put(rb)
wp.Put(wb)
tr.Del(trd)
if err != io.EOF && err != websocket.ErrMessageClose {
log.Error().Int("step", step).Str("key", ch.Key).Str("remoteIP", conn.RemoteAddr().String()).Msg("ws handshake failed")
}
return
}
trd.Key = ch.Key
tr.Set(trd, hb)
// hanshake ok start dispatch goroutine
step = 5
tsk = dtask.NewTask()
go s.dispatchWebsocket(ws, wp, wb, ch, tsk)
serverHeartbeat := s.RandServerHearbeat()
for {
if p, err = ch.CliProto.Set(); err != nil {
break
}
if err = p.ReadWebsocket(ws); err != nil {
break
}
if p.Op == int32(grpc.Op_Heartbeat) {
tr.Set(trd, hb)
p.Op = int32(grpc.Op_HeartbeatReply)
p.Body = nil
// NOTE: send server heartbeat for a long time
if now := time.Now(); now.Sub(lastHB) > serverHeartbeat {
if err1 := s.Heartbeat(ctx, ch.Key); err1 == nil {
lastHB = now
}
}
step++
} else {
if err = s.Operate(ctx, p, ch, tsk); err != nil {
break
}
}
ch.CliProto.SetAdv()
ch.Signal()
}
if err != nil && err != io.EOF && err != websocket.ErrMessageClose && !strings.Contains(err.Error(), "closed") {
log.Error().Err(err).Str("key", ch.Key).Msg("server ws failed")
}
b.Del(ch)
tr.Del(trd)
ws.Close()
ch.Close()
rp.Put(rb)
if err = s.Disconnect(ctx, ch.Key); err != nil {
log.Error().Err(err).Str("key", ch.Key).Msg("operator do disconnect")
}
}
// dispatch accepts connections on the listener and serves requests
// for each incoming connection. dispatch blocks; the caller typically
// invokes it in a go statement.
func (s *Comet) dispatchWebsocket(ws *websocket.Conn, wp *bytes.Pool, wb *bytes.Buffer, ch *Channel, tsk *dtask.Task) {
var (
err error
finish bool
online int32
)
for {
var p = ch.Ready()
switch p {
case grpc.ProtoFinish:
finish = true
goto failed
case grpc.ProtoReady:
// fetch message from svrbox(client send)
for {
if p, err = ch.CliProto.Get(); err != nil {
break
}
if p.Op == int32(grpc.Op_HeartbeatReply) {
if err = p.WriteWebsocketHeart(ws, online); err != nil {
goto failed
}
} else if p.Op == int32(grpc.Op_ReceiveMsgReply) {
//skip
} else if p.Op == int32(grpc.Op_SendMsg) {
//skip
} else {
if err = p.WriteWebsocket(ws); err != nil {
goto failed
}
}
p.Body = nil // avoid memory leak
ch.CliProto.GetAdv()
}
default:
switch p.Op {
case int32(grpc.Op_SendMsgReply):
if err = p.WriteWebsocket(ws); err != nil {
goto failed
}
case int32(grpc.Op_RePush):
p.Op = int32(grpc.Op_ReceiveMsg)
if err = p.WriteWebsocket(ws); err != nil {
goto failed
}
case int32(grpc.Op_ReceiveMsg):
if err = p.WriteWebsocket(ws); err != nil {
goto failed
}
p.Op = int32(grpc.Op_RePush)
seq := strconv.FormatInt(int64(p.Seq), 10)
if j := tsk.Get(seq); j != nil {
continue
}
//push into task pool
job, inserted := tsk.AddJobRepeat(time.Second*5, 0, func() {
if _, err = ch.Push(p); err != nil {
log.Error().Err(err).Msg("task job ch.Push error")
return
}
})
if !inserted {
log.Error().Err(err).Msg("tsk.AddJobRepeat error")
goto failed
}
tsk.Add(seq, job)
default:
continue
}
}
// only hungry flush response
if err = ws.Flush(); err != nil {
break
}
}
failed:
if err != nil && err != io.EOF && err != websocket.ErrMessageClose {
log.Error().Err(err).Str("key", ch.Key).Msg("dispatch ws error")
}
tsk.Stop()
ws.Close()
wp.Put(wb)
// must ensure all channel message discard, for reader won't blocking Signal
for !finish {
finish = (ch.Ready() == grpc.ProtoFinish)
}
}
// auth for goim handshake with client, use rsa & aes.
func (s *Comet) authWebsocket(ctx context.Context, ws *websocket.Conn, p *grpc.Proto) (key string, hb time.Duration, err error) {
for {
if err = p.ReadWebsocket(ws); err != nil {
return
}
if p.Op == int32(grpc.Op_Auth) {
break
} else {
log.Error().Int32("operation", p.Op).Msg("ws request operation not auth")
}
}
if key, hb, err = s.Connect(ctx, p); err != nil {
log.Error().Err(err).Msg("can not call logic.Connect")
return
}
p.Op = int32(grpc.Op_AuthReply)
p.Body = nil
if err = p.WriteWebsocket(ws); err != nil {
return
}
err = ws.Flush()
return
}