简介

目前来看,这个框架确实比较好用,Hooks已基本提供需要的回调,只需选择其中需要的实现就好了。

gmqtt地址:

git clone https://github.com/DrmagicE/gmqtt.git

0. hooks 列表

//OnAccept:   建立连接调用,返回false会关闭连接
OnStop:  server.Stop的时候调用 (2.6)         
OnSubscribe:订阅(2.2)
//OnSubscribed: 订阅成功        
OnUnsubscribe:  取消订阅(2.9//OnUnsubscribed: 已经取消订阅     
OnMsgArrived:   消息到达,返回false不会继续转发(2.3) 
OnConnect: 连接(2.1)     
OnConnected:  客户端成功连接后触发 [客户端上线]  (2.8)
OnSessionCreated: 新建session的时候触发(2.7)  [上线]
OnSessionResumed: 恢复session时触发 (2.7) 
OnSessionTerminated:  下线触发(2.7) [离线]
OnDeliver:  分发消息的时候触发(2.4//OnAcked:  客户端对qos1或qos2返回确认的时候调用
OnClose: TCP断掉后触发 (2.5)   [客户端主动断开]    
//OnMsgDropped: 丢弃报文后触发    

1. 主函数 funmain

main.go

package main

import (
    "context"
    gmqtt "github.com/DrmagicE/gmqtt"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile) //显示行号

    //监听 tcp 1883
    ln, err := net.Listen("tcp", ":1883")
    if err != nil {
        log.Fatalln(err.Error())
        return
    }


    //设置Hooks        nil改成对应的函数名
    hooks := gmqtt.Hooks{
        OnAccept:            nil,
        OnStop:              nil,
        OnSubscribe:         nil,
        OnSubscribed:        nil,
        OnUnsubscribe:       nil,
        OnUnsubscribed:      nil,
        OnMsgArrived:        nil,
        OnConnect:           nil,
        OnConnected:         nil,
        OnSessionCreated:    nil,
        OnSessionResumed:    nil,
        OnSessionTerminated: nil,
        OnDeliver:           nil,
        OnAcked:             nil,
        OnClose:             nil,
        OnMsgDropped:        nil,
    }

    //创建server
    server := gmqtt.NewServer(
        gmqtt.WithTCPListener(ln),
        gmqtt.WithHook(hooks),
    )

    //启动server
    log.Println("started...")
    server.Run()
    signalCh := make(chan os.Signal, 1)
    signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
    <-signalCh
    server.Stop(context.Background())
    log.Println("stopped")
}

2. Hook

hooks.go

2.1 onConnect连接

func OnConnectFunc(ctx context.Context, client gmqtt.Client) (code uint8) {
    username := client.OptionsReader().Username()
    //password := client.OptionsReader().Password()
    //在这里可以处理是否允许连接
    if username == "test"{
        return packets.CodeAccepted
    }
    return packets.CodeBadUsernameorPsw
}
client.OptionsReader().↓  //可以获取到的数据

  • 允许登陆 返回return packets.CodeAccepted
  • 用户名密码错误,返回return packets.CodeBadUsernameorPsw
  • 其余可以返回的Code如下:

2.2 onSubscribe 客户端订阅Topic

func OnSubscribeFunc(ctx context.Context, client gmqtt.Client, topic packets.Topic) (qos uint8){
    log.Info("clint:,client.OptionsReader().ClientID()," subcrive ",topic.Name," Qos:",topic.Qos)
    if client.OptionsReader().Username() == "test" {
        if topic.Qos > 1 {
            return packets.SUBSCRIBE_FAILURE
        }
        if topic.Name =="test0" {
            return packets.QOS_0
        }
        if topic.Name =="test" {
            return topic.Qos
        }
    }
    return packets.SUBSCRIBE_FAILURE
}
  • 返回最高允许订阅的Qos等级

2.3 onMsgArrived 客户端的消息到达

func OnMsgArrivedFunc(ctx context.Context, client gmqtt.Client, msg packets.Message) (valid bool) {
    if client.OptionsReader().Username() != "testpub" {
        client.Close()
        return false
    }
    //只有 qos1 & qos0 会被转发,QOS2直接拒绝
    if msg.Qos() == packets.QOS_2 {
        return false
    }
    //数据处理
    log.Infof("Receive Message=>[%s]:%s",msg.Topic(),Uint8ToString(msg.Payload()))
    //处理是否分发
    if Uint8ToString(msg.Payload())=="fuck" {
        return false  //拦截payload是“fuck”的数据包
    }
    return true
}

返回false 报文不会被继续转发

2.4 onDeliver 消息分发

如果一个客户端发布,两个客户端订阅,则onMsgArrived调用一次,OnDeliver调用2次

func OnDeliverFunc(ctx context.Context, client gmqtt.Client, msg packets.Message) {
    log.Infof("delivering message %s to client %s", msg.Payload(), client.OptionsReader().ClientID())
}

2.5 onClose TCP断掉(客户端关闭,掉线?)

func OnCloseFunc(ctx context.Context, client gmqtt.Client, err error) {
    log.Infof("client id:"+client.OptionsReader().ClientID()+"is closed with error:", err)
}

2.6 onStop Client.close

服务器关掉?

func OnStopFunc(ctx context.Context) {
    log.Infof("stop")
}

2.7 OnSessionXXX

OnSessionCreated: (上线)
func OnSessionCreatedFunc(ctx context.Context, client gmqtt.Client){
    log.Infof("Session*Client:",client.OptionsReader().ClientID(),"Create")
}

OnSessionResumed: 恢复?
//TODO:不知道这个什么时候会触发

func OnSessionResumed(ctx context.Context, client gmqtt.Client){
    log.Infof("Session*Client:",client.OptionsReader().ClientID(),"Resumed")

OnSessionTerminated: (离线)

func OnSessionTerminated(ctx context.Context, client gmqtt.Client, reason gmqtt.SessionTerminatedReason){
    log.Infof("Session*Client:",client.OptionsReader().ClientID(),"Terminated reason is",reason)

}

###2.8 OnConnected 客户端连接成功

func OnConnected(ctx context.Context, client gmqtt.Client){
    log.Infof(client.OptionsReader().ClientID(),"Connected!!!!!!***")
}

### 2.9 OnUnsubscribe 取消订阅

func OnUnsubscribeFunc(ctx context.Context, client gmqtt.Client, topicName string){
log.Infof(client.OptionsReader().ClientID(),” unsubcribe: “,topicName)
}

3.工具函数

tools.go

uint8[] 转 string
Mqtt的Payload是uint8切片,需要转成string才能处理

func Uint8ToString(bs []uint8) string {
    ba := []byte{}
    for _, b := range bs {
        ba = append(ba, byte(b))
    }
    return string(ba)
}

4. 主动定时发送数据

pub := server.PublishService()
    go func() {
        for true {
            <-time.NewTimer(5 * time.Second).C
            pub.Publish(gmqtt.NewMessage("test", []byte("helloworld"), packets.QOS_0))
        }
    }()

5. 接收消息并回应


来源:
作者:xuehu96
链接:https://blog.csdn.net/xuehu96/article/details/110958386