This commit is contained in:
2022-03-17 15:54:23 +08:00
commit 437c38533d
25 changed files with 6943 additions and 0 deletions

170
chat/frame.go Normal file
View File

@@ -0,0 +1,170 @@
package chat
import (
"github.com/golang/protobuf/proto"
comet "gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/imparse"
)
const (
PrivateFrameType imparse.FrameType = "private"
GroupFrameType imparse.FrameType = "group"
SignalFrameType imparse.FrameType = "signal"
)
type Option func(*Options)
type Options struct {
mid int64
createTime uint64
target string
transmissionMethod imparse.Channel
}
func WithMid(t int64) Option {
return func(o *Options) {
o.mid = t
}
}
func WithCreateTime(t uint64) Option {
return func(o *Options) {
o.createTime = t
}
}
func WithTarget(t string) Option {
return func(o *Options) {
o.target = t
}
}
func WithTransmissionMethod(t imparse.Channel) Option {
return func(o *Options) {
o.transmissionMethod = t
}
}
//标准帧 定义了标准协议的Ack数据和Push数据格式
type StandardFrame struct {
body imparse.BizProto
base *comet.Proto
mid int64
createTime uint64
key string
from string
target string
transmissionMethod imparse.Channel
}
func NewStandardFrame(base *comet.Proto, key, from string, opts ...Option) *StandardFrame {
options := Options{}
for _, o := range opts {
o(&options)
}
return &StandardFrame{base: base, key: key, from: from, mid: options.mid, createTime: options.createTime, target: options.target, transmissionMethod: options.transmissionMethod}
}
func (f *StandardFrame) Data() ([]byte, error) {
p := comet.Proto{
Ver: f.base.GetVer(),
Op: f.base.GetOp(),
Seq: f.base.GetSeq(),
Ack: f.base.GetAck(),
}
var err error
p.Body, err = f.body.PushBody()
if err != nil {
return nil, err
}
//send transfer msg
return proto.Marshal(&p)
}
func (f *StandardFrame) AckData() ([]byte, error) {
p := comet.Proto{
Ver: f.base.GetVer(),
Op: int32(comet.Op_SendMsgReply),
Seq: f.base.GetSeq(),
Ack: f.base.GetAck(),
}
var err error
p.Body, err = f.body.AckBody()
if err != nil {
return nil, err
}
//send msg ack
return proto.Marshal(&p)
}
func (f *StandardFrame) PushData() ([]byte, error) {
p := comet.Proto{
Ver: f.base.GetVer(),
Op: int32(comet.Op_ReceiveMsg),
Seq: f.base.GetSeq(),
Ack: f.base.GetAck(),
}
var err error
p.Body, err = f.body.PushBody()
if err != nil {
return nil, err
}
//send to client B
return proto.Marshal(&p)
}
func (f *StandardFrame) GetMid() int64 {
return f.mid
}
func (f *StandardFrame) SetMid(mid int64) {
f.mid = mid
}
func (f *StandardFrame) GetCreateTime() uint64 {
return f.createTime
}
func (f *StandardFrame) SetCreateTime(createTime uint64) {
f.createTime = createTime
}
func (f *StandardFrame) GetTarget() string {
return f.target
}
func (f *StandardFrame) SetTarget(target string) {
f.target = target
}
func (f *StandardFrame) GetTransmissionMethod() imparse.Channel {
return f.transmissionMethod
}
func (f *StandardFrame) SetTransmissionMethod(transmissionMethod imparse.Channel) {
f.transmissionMethod = transmissionMethod
}
func (f *StandardFrame) GetFrom() string {
return f.from
}
func (f *StandardFrame) SetFrom(from string) {
f.from = from
}
func (f *StandardFrame) GetKey() string {
return f.key
}
func (f *StandardFrame) SetKey(key string) {
f.key = key
}
func (f *StandardFrame) GetBody() imparse.BizProto {
return f.body
}
func (f *StandardFrame) SetBody(body imparse.BizProto) {
f.body = body
}

145
chat/group.go Normal file
View File

@@ -0,0 +1,145 @@
package chat
import (
"context"
"fmt"
"strconv"
"time"
"github.com/golang/protobuf/proto"
"gitlab.33.cn/chat/imparse"
biz "gitlab.33.cn/chat/imparse/proto"
"gitlab.33.cn/chat/imparse/util"
)
//private
type GroupFrame struct {
*StandardFrame
base *biz.Common
stored bool
}
func NewGroupFrame(standardFrame *StandardFrame, bizPro *biz.Common) *GroupFrame {
frame := &GroupFrame{
StandardFrame: standardFrame,
base: bizPro,
}
frame.SetBody(frame)
bizPro.From = frame.GetFrom()
frame.SetTarget(bizPro.GetTarget())
frame.SetTransmissionMethod(imparse.GroupCast)
return frame
}
func (p *GroupFrame) Type() imparse.FrameType {
return GroupFrameType
}
func (p *GroupFrame) Filter(ctx context.Context, db imparse.Cache, filters ...imparse.Filter) (uint64, error) {
//查询是否有重复消息
msg, err := db.GetMsg(ctx, p.GetFrom(), p.base.GetSeq())
if err != nil {
return 0, err
}
if msg != nil {
p.stored = true
p.base.Mid, err = strconv.ParseInt(msg.Mid, 10, 64)
if err != nil {
return 0, err
}
p.base.Datetime = msg.CreateTime
} else {
for _, filter := range filters {
err = filter(ctx, p)
if err != nil {
return 0, err
}
}
p.stored = false
p.base.Mid, err = db.GetMid(ctx)
if err != nil {
return 0, err
}
p.base.Datetime = uint64(util.TimeNowUnixNano() / int64(time.Millisecond))
err := db.AddMsg(ctx, p.GetFrom(), &imparse.MsgIndex{
Mid: strconv.FormatInt(p.base.GetMid(), 10),
Seq: p.base.GetSeq(),
SenderId: p.GetFrom(),
CreateTime: p.base.GetDatetime(),
})
if err != nil {
return 0, err
}
}
return p.base.GetDatetime(), nil
}
func (p *GroupFrame) Transport(ctx context.Context, exec imparse.Exec) error {
if p.stored {
return nil
}
data, err := p.PushData()
if err != nil {
return err
}
return exec.Transport(ctx, p.base.GetMid(), p.GetKey(), p.GetFrom(), p.GetTarget(), p.GetTransmissionMethod(), p.Type(), data)
}
func (p *GroupFrame) Ack(ctx context.Context, exec imparse.Exec) (int64, error) {
ackBytes, err := p.AckData()
if err != nil {
return 0, err
}
return p.base.GetMid(), exec.RevAck(ctx, p.base.GetMid(), []string{p.GetKey()}, ackBytes)
}
func (p *GroupFrame) AckBody() ([]byte, error) {
body, err := proto.Marshal(&biz.CommonAck{
Mid: p.base.GetMid(),
Datetime: p.base.GetDatetime(),
})
if err != nil {
return nil, fmt.Errorf("marshal CommonAck err: %v", err)
}
data, err := proto.Marshal(&biz.Proto{
EventType: biz.Proto_commonAck,
Body: body,
})
if err != nil {
return nil, fmt.Errorf("marshal Proto err: %v", err)
}
return data, err
}
func (p *GroupFrame) PushBody() ([]byte, error) {
var err error
var data []byte
pro := biz.Proto{
EventType: biz.Proto_common,
}
pro.Body, err = proto.Marshal(p.base)
if err != nil {
return nil, fmt.Errorf("marshal Common err: %v", err)
}
data, err = proto.Marshal(&pro)
if err != nil {
return nil, fmt.Errorf("marshal Proto err: %v", err)
}
return data, err
}
//
func (p *GroupFrame) GetChannelType() biz.Channel {
return p.base.ChannelType
}
func (p *GroupFrame) GetMsgType() biz.MsgType {
return p.base.MsgType
}
func (p *GroupFrame) GetBase() *biz.Common {
return p.base
}

85
chat/parser.go Normal file
View File

@@ -0,0 +1,85 @@
package chat
import (
"io"
"io/ioutil"
"github.com/golang/protobuf/proto"
comet "gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/imparse"
biz "gitlab.33.cn/chat/imparse/proto"
)
type _handleEvent func(*StandardFrame, []byte) (imparse.Frame, error)
var events = map[biz.Proto_EventType]_handleEvent{
biz.Proto_common: func(s *StandardFrame, body []byte) (imparse.Frame, error) {
var pro biz.Common
err := proto.Unmarshal(body, &pro)
if err != nil {
return nil, err
}
switch pro.ChannelType {
case biz.Channel_ToUser:
return NewPrivateFrame(s, &pro), nil
case biz.Channel_ToGroup:
return NewGroupFrame(s, &pro), nil
default:
return nil, imparse.ErrExecSupport
}
},
biz.Proto_Signal: func(s *StandardFrame, body []byte) (imparse.Frame, error) {
var pro biz.Signal
err := proto.Unmarshal(body, &pro)
if err != nil {
return nil, err
}
return NewNoticeFrame(s, &pro), nil
},
}
//标准解析器 定义了解析标准协议的方法
type StandardParse struct {
}
func (s *StandardParse) NewFrame(key, from string, in io.Reader, opts ...Option) (imparse.Frame, error) {
data, err := ioutil.ReadAll(in)
if err != nil {
return nil, err
}
//
var p comet.Proto
err = proto.Unmarshal(data, &p)
if err != nil {
return nil, err
}
switch p.GetVer() {
case 0, 1:
default:
}
switch p.GetOp() {
case int32(comet.Op_SendMsg):
case int32(comet.Op_Auth):
}
//业务服务协议解析
var pro biz.Proto
err = proto.Unmarshal(p.Body, &pro)
if err != nil {
return nil, err
}
//解析event事件
event, ok := events[pro.EventType]
if !ok || event == nil {
return nil, imparse.ErrorEnvType
}
frame, err := event(NewStandardFrame(&p, key, from, opts...), pro.Body)
if err != nil {
return nil, err
}
return frame, err
}

146
chat/private.go Normal file
View File

@@ -0,0 +1,146 @@
package chat
import (
"context"
"fmt"
"strconv"
"time"
"github.com/golang/protobuf/proto"
"gitlab.33.cn/chat/imparse"
biz "gitlab.33.cn/chat/imparse/proto"
"gitlab.33.cn/chat/imparse/util"
)
//private
type PrivateFrame struct {
*StandardFrame
base *biz.Common
stored bool
}
func NewPrivateFrame(standardFrame *StandardFrame, bizPro *biz.Common) *PrivateFrame {
frame := &PrivateFrame{
StandardFrame: standardFrame,
base: bizPro,
}
frame.SetBody(frame)
bizPro.From = frame.GetFrom()
frame.SetTarget(bizPro.GetTarget())
frame.SetTransmissionMethod(imparse.UniCast)
return frame
}
func (p *PrivateFrame) Type() imparse.FrameType {
return PrivateFrameType
}
func (p *PrivateFrame) Filter(ctx context.Context, db imparse.Cache, filters ...imparse.Filter) (uint64, error) {
//查询是否有重复消息
msg, err := db.GetMsg(ctx, p.GetFrom(), p.base.GetSeq())
if err != nil {
return 0, err
}
if msg != nil {
p.stored = true
p.base.Mid, err = strconv.ParseInt(msg.Mid, 10, 64)
if err != nil {
return 0, err
}
p.base.Datetime = msg.CreateTime
} else {
for _, filter := range filters {
err = filter(ctx, p)
if err != nil {
return 0, err
}
}
p.stored = false
p.base.Mid, err = db.GetMid(ctx)
if err != nil {
return 0, err
}
p.base.Datetime = uint64(util.TimeNowUnixNano() / int64(time.Millisecond))
err := db.AddMsg(ctx, p.GetFrom(), &imparse.MsgIndex{
Mid: strconv.FormatInt(p.base.GetMid(), 10),
Seq: p.base.GetSeq(),
SenderId: p.GetFrom(),
CreateTime: p.base.GetDatetime(),
})
if err != nil {
return 0, err
}
}
p.mid = p.base.Mid
p.createTime = p.base.Datetime
return p.base.GetDatetime(), nil
}
func (p *PrivateFrame) Transport(ctx context.Context, exec imparse.Exec) error {
if p.stored {
return nil
}
data, err := p.PushData()
if err != nil {
return err
}
return exec.Transport(ctx, p.base.GetMid(), p.GetKey(), p.GetFrom(), p.GetTarget(), p.GetTransmissionMethod(), p.Type(), data)
}
func (p *PrivateFrame) Ack(ctx context.Context, exec imparse.Exec) (int64, error) {
ackBytes, err := p.AckData()
if err != nil {
return 0, err
}
return p.base.GetMid(), exec.RevAck(ctx, p.base.GetMid(), []string{p.GetKey()}, ackBytes)
}
func (p *PrivateFrame) AckBody() ([]byte, error) {
body, err := proto.Marshal(&biz.CommonAck{
Mid: p.base.GetMid(),
Datetime: p.base.GetDatetime(),
})
if err != nil {
return nil, fmt.Errorf("marshal CommonAck err: %v", err)
}
data, err := proto.Marshal(&biz.Proto{
EventType: biz.Proto_commonAck,
Body: body,
})
if err != nil {
return nil, fmt.Errorf("marshal Proto err: %v", err)
}
return data, err
}
func (p *PrivateFrame) PushBody() ([]byte, error) {
var err error
var data []byte
pro := biz.Proto{
EventType: biz.Proto_common,
}
pro.Body, err = proto.Marshal(p.base)
if err != nil {
return nil, fmt.Errorf("marshal Common err: %v", err)
}
data, err = proto.Marshal(&pro)
if err != nil {
return nil, fmt.Errorf("marshal Proto err: %v", err)
}
return data, err
}
//
func (p *PrivateFrame) GetChannelType() biz.Channel {
return p.base.ChannelType
}
func (p *PrivateFrame) GetMsgType() biz.MsgType {
return p.base.MsgType
}
func (p *PrivateFrame) GetBase() *biz.Common {
return p.base
}

101
chat/signal.go Normal file
View File

@@ -0,0 +1,101 @@
package chat
import (
"context"
"fmt"
"time"
"github.com/golang/protobuf/proto"
"gitlab.33.cn/chat/imparse"
biz "gitlab.33.cn/chat/imparse/proto"
"gitlab.33.cn/chat/imparse/util"
)
//private
type SignalFrame struct {
*StandardFrame
base *biz.Signal
mid int64
createTime uint64
}
func NewNoticeFrame(standardFrame *StandardFrame, bizPro *biz.Signal) *SignalFrame {
frame := &SignalFrame{
StandardFrame: standardFrame,
base: bizPro,
}
frame.SetBody(frame)
return frame
}
func (p *SignalFrame) Type() imparse.FrameType {
return SignalFrameType
}
func (p *SignalFrame) Filter(ctx context.Context, db imparse.Cache, filters ...imparse.Filter) (uint64, error) {
var err error
for _, filter := range filters {
err = filter(ctx, p)
if err != nil {
return 0, err
}
}
p.mid, err = db.GetMid(ctx)
if err != nil {
return 0, err
}
p.createTime = uint64(util.TimeNowUnixNano() / int64(time.Millisecond))
return p.createTime, nil
}
func (p *SignalFrame) Transport(ctx context.Context, exec imparse.Exec) error {
data, err := p.PushData()
if err != nil {
return err
}
return exec.Transport(ctx, p.mid, p.GetKey(), p.GetFrom(), p.GetTarget(), p.GetTransmissionMethod(), p.Type(), data)
}
func (p *SignalFrame) Ack(ctx context.Context, exec imparse.Exec) (int64, error) {
return p.mid, nil
}
func (p *SignalFrame) AckBody() ([]byte, error) {
body, err := proto.Marshal(p.base)
if err != nil {
return nil, fmt.Errorf("marshal NotifyMsg err: %v", err)
}
if err != nil {
return nil, fmt.Errorf("marshal NotifyMsgAck err: %v", err)
}
data, err := proto.Marshal(&biz.Proto{
EventType: biz.Proto_Signal,
Body: body,
})
if err != nil {
return nil, fmt.Errorf("marshal Proto err: %v", err)
}
return data, err
}
func (p *SignalFrame) PushBody() ([]byte, error) {
var err error
var data []byte
pro := biz.Proto{
EventType: biz.Proto_Signal,
}
pro.Body, err = proto.Marshal(p.base)
if err != nil {
return nil, fmt.Errorf("marshal NotifyMsg err: %v", err)
}
data, err = proto.Marshal(&pro)
if err != nil {
return nil, fmt.Errorf("marshal Proto err: %v", err)
}
return data, err
}
func (p *SignalFrame) GetBase() *biz.Signal {
return p.base
}

171
chat/test/action_test.go Normal file
View File

@@ -0,0 +1,171 @@
package test
import (
"bytes"
"context"
"math/rand"
"os"
"testing"
"time"
"github.com/golang/protobuf/proto"
comet "gitlab.33.cn/chat/im/api/comet/grpc"
"gitlab.33.cn/chat/imparse"
"gitlab.33.cn/chat/imparse/chat"
biz "gitlab.33.cn/chat/imparse/proto"
)
type msg []byte
var sourceData = []msg{}
func TestMain(t *testing.M) {
sourceData = genTestFrames()
os.Exit(t.Run())
}
func genTestFrames() []msg {
p := comet.Proto{
Ver: 0,
Op: int32(comet.Op_SendMsg),
Seq: 1,
Ack: 0,
Body: nil,
}
pro := biz.Proto{
EventType: biz.Proto_common,
Body: nil,
}
comm := biz.Common{
ChannelType: biz.Channel_ToUser,
Mid: 0,
Seq: "client-msg",
From: "client-from",
Target: "client-target",
MsgType: biz.MsgType_Text,
Msg: nil,
}
data2, err := proto.Marshal(&comm)
if err != nil {
panic(err)
}
pro.Body = data2
data1, err := proto.Marshal(&pro)
if err != nil {
panic(err)
}
p.Body = data1
data, err := proto.Marshal(&p)
if err != nil {
panic(err)
}
return []msg{data}
}
type testDB struct {
}
func (db *testDB) GetMsg(ctx context.Context, from, seq string) (*imparse.MsgIndex, error) {
return nil, nil
}
func (db *testDB) AddMsg(ctx context.Context, uid string, m *imparse.MsgIndex) error {
return nil
}
func (db *testDB) GetMid(ctx context.Context) (id int64, err error) {
//将时间戳设置成种子数
rand.Seed(time.Now().UnixNano())
id = int64(rand.Intn(100))
return
}
type testExec struct {
}
func (e *testExec) Transport(ctx context.Context, id int64, key, from, target string, ch imparse.Channel, frameType imparse.FrameType, data []byte) error {
return nil
}
func (e *testExec) RevAck(ctx context.Context, id int64, keys []string, data []byte) error {
return nil
}
func TestStandard_Filter(t *testing.T) {
//全局初始化
var db testDB
var e testExec
var exec = imparse.NewStandardAnswer(&db, &e, nil, nil)
var parser chat.StandardParse
//局部初始化
//来源模式 1. ws推送2. http推送3. 测试命令行
from := "server-client"
key := "server-key"
for _, m := range sourceData {
ctx := context.Background()
f, err := parser.NewFrame(key, from, bytes.NewReader(m))
if err != nil {
t.Error(err)
return
}
_, err = exec.Filter(ctx, f)
if err != nil {
t.Error(err)
continue
}
err = exec.Transport(ctx, f)
if err != nil {
t.Error(err)
continue
}
_, err = exec.Ack(ctx, f)
if err != nil {
t.Error(err)
continue
}
}
}
func BenchmarkCreateFrame(b *testing.B) {
//全局初始化
var db testDB
var e testExec
var exec = imparse.NewStandardAnswer(&db, &e, nil, nil)
var parser chat.StandardParse
data := sourceData[0]
//局部初始化
//来源模式 1. ws推送2. http推送3. 测试命令行
from := "server-client"
key := "server-key"
for n := 0; n < b.N; n++ {
ctx := context.Background()
f, err := parser.NewFrame(key, from, bytes.NewReader(data))
if err != nil {
b.Error(err)
return
}
_, err = exec.Filter(ctx, f)
if err != nil {
b.Error(err)
continue
}
err = exec.Transport(ctx, f)
if err != nil {
b.Error(err)
continue
}
_, err = exec.Ack(ctx, f)
if err != nil {
b.Error(err)
continue
}
}
}