百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

实战Netty!基于私有协议,怎样快速开发网络通信服务

wptr33 2025-07-01 23:41 61 浏览

私有协议

编写目的

本文档用于描述边缘计算单元(以下简称边缘盒)与上位机配置软件(以下简称上位机)之间进行的数据交互通信协议。

通信方式

边缘盒作为服务端,上位机作为客户端,采用TCP/IP协议的socket连接,端口号默认为6000,数据包采用字节的二进制数据传输。

数据包

包头(10字节)

负载(消息内容)

数据包由包头和消息内容组成,包头固定10个字节,其内容如下:

标志(4)

负载长度(2)

协议版本号(1)

包类型(1)

校验位(1)

Reserve(1)

标志: 包的前导字符,固定CYRC;

负载长度: 负载字节数(不包括包头的长度);

协议版本号: 标识通信协议的版本,初版值为0x10;

包类型: 标识数据包的操作类型,具体见下表:

取值

含义

说明

1

查询

上位机发送查询消息。

2

设置

上位机发送设置消息。

3

查询应答

边缘盒对查询请求的应答。

4

设置应答

边缘盒对设置请求的应答。

5

订阅

上位机发送给边缘盒订阅数据主动上报请求。

6

主动上报

边缘盒主动向上位机发送数据。

7

心跳

上位机发送心跳消息

8

心跳应答

边缘盒对心跳消息的应答

其他

保留


校验位: 负载数据所有字节之和;

Reserve: 预留,值填0;

包体负载(消息内容)表示具体的数据对象,其内容如下:

对象标识(1)

对象数据内容(0…n)

对于查询、心跳等包类型,包体负载(消息内容)只需要对象标识,对象数据内容省略。

对象标识: 标识数据表的操作对象,具体如下:

取值

含义

说明

0

心跳

上位机连接后间隔时间发送心跳消息给边缘盒。

具体的协议内容就不做展示了,下面就开始服务的编写。

服务开发

这里我们开发一个上位机的配置软件(客户端),我们首先要来分析,怎么对数据包进行编解码,其实工作中,这个也是服务开发的核心所在,也是难点所在。

编写消息类

public class MyProtocol
{
    /**
     * 消息的开头的信息标志
     */
    private String head = "CYRC";
    /**
     * 消息的长度
     */
    private int contentLength;
    /**
     * 消息的内容
     */
    private byte[] content;

    public MyProtocol(int contentLength, byte[] content)
    {
        this.contentLength = contentLength;
        this.content = content;
    }

    public String getHead()
    {
        return head;
    }

    public void setHead(String head)
    {
        this.head = head;
    }

    public int getContentLength()
    {
        return contentLength;
    }

    public void setContentLength(int contentLength)
    {
        this.contentLength = contentLength;
    }

    public byte[] getContent()
    {
        return content;
    }

    public void setContent(byte[] content)
    {
        this.content = content;
    }

    public String byteToHex(byte[] bytes, int cnt)
    {
        String strHex;
        StringBuilder sb = new StringBuilder();
        for (int n = 0; n < cnt; n++)
        {
            strHex = Integer.toHexString(bytes[n] & 0xFF);
            sb.append((strHex.length() == 1) ? "0" + strHex : strHex);
            sb.append(" ");
        }
        return sb.toString().trim();
    }

    @Override
    public String toString()
    {
        return "MyProtocol [head=" + head + ", contentLength="
                + contentLength + ", content=" + byteToHex(content, contentLength) + "]";
    }
}
复制代码

MyDecoder解码器

@Slf4j
public class MyDecoder extends ByteToMessageDecoder
{
    /**
     * <pre>
     * 协议开始的标准head_data,CYRC,占据4个字节.
     * 表示数据的长度contentLength,占据2个字节.
     * </pre>
     */
    public final int BASE_LENGTH = 10;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception
    {
        if (buffer.readableBytes() >= BASE_LENGTH)
        {
            if (buffer.readableBytes() > 2048)
            {
                buffer.skipBytes(buffer.readableBytes());
            }

            // 记录包头开始的index
            int beginReader;
            //CYRC   43 59 52 43
            while (true)
            {
                // 获取包头开始的index
                beginReader = buffer.readerIndex();
                // 标记包头开始的index
                buffer.markReaderIndex();
                // 读到了协议的开始标志,结束while循环
                int head1 = buffer.readUnsignedShort();
                int head2 = buffer.readUnsignedShort();
                if (head1 == 17241 && head2 == 21059)
                {
                    break;
                }

                // 未读到包头,略过一个字节
                // 每次略过,一个字节,去读取,包头信息的开始标记
                buffer.resetReaderIndex();
                buffer.readByte();
                // 当略过,一个字节之后,数据包的长度,又变得不满足
                // 此时,应该结束。等待后面的数据到达
                if (buffer.readableBytes() < BASE_LENGTH)
                {
                    return;
                }
            }

            // 消息的长度
            int length = buffer.readUnsignedShort() + 4;
            // 判断请求数据包数据是否到齐
            if (buffer.readableBytes() < length)
            {
                // 还原读指针
                buffer.readerIndex(beginReader);
                return;
            }

            // 读取data数据
            byte[] data = new byte[length];
            buffer.readBytes(data);

            MyProtocol protocol = new MyProtocol(data.length, data);
            out.add(protocol);
        }
    }
}
复制代码

MyEncoder编码器

public class MyEncoder extends MessageToByteEncoder<MyProtocol>
{
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MyProtocol myProtocol, ByteBuf out) throws Exception
    {
        // 1.写入消息的开头的信息标志(CYCR)
        out.writeBytes(myProtocol.getHead().getBytes());
        // 2.写入消息的长度(负载长度)
        out.writeShort(myProtocol.getContentLength() - 4);
        // 3.写入消息的内容(byte[]类型)
        out.writeBytes(myProtocol.getContent());
    }
}
复制代码

自定义ChannelInboundHandlerAdapter

@Slf4j
public class BootNettyClientChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter
{
    public BootNettyClientChannelInboundHandlerAdapter()
    {

    }

    /**
     * 从服务端收到新的数据时,这个方法会在收到消息时被调用
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        MyProtocol protocol = (MyProtocol) msg;
        log.info("接收到服务端的消息:" + protocol);
    }

    /**
     * 从服务端收到新的数据、读取完成时调用
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException
    {
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws InterruptedException
    {
        log.error("exceptionCaught:{}", cause.getMessage());
        ctx.close();//抛出异常,断开与客户端的连接
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        super.channelActive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        log.info("channelActive------TCP客户端新建连接------clientIp:{}", clientIp);
    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        super.channelInactive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close(); //断开连接时,必须关闭,否则造成资源浪费
        log.info("channelInactive------TCP客户端断开连接----------clientIp:{}", clientIp);
    }
}
复制代码

BootNettyClient客户端

@Slf4j
public class BootNettyClient
{
    public void connect(String host,int port)
    {
        /**
         * 客户端的NIO线程组
         *
         */
        EventLoopGroup group = new NioEventLoopGroup();
        try
        {
            /**
             * Bootstrap 是一个启动NIO服务的辅助启动类 客户端的
             */
            Bootstrap bootstrap = new Bootstrap();
            /**
             * 设置group
             */
            bootstrap = bootstrap.group(group);
            /**
             * 关联客户端通道
             */
            bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
            /**
             * 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
             */
            bootstrap = bootstrap.handler(new ChannelInitializer<SocketChannel>()
            {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception
                {
                    ChannelPipeline pipeline = channel.pipeline();
                    // 添加自定义协议的编解码工具
                    pipeline.addLast(new MyDecoder());
                    pipeline.addLast(new MyEncoder());
                    /**
                     * 自定义ChannelInboundHandlerAdapter
                     */
                    pipeline.addLast(new BootNettyClientChannelInboundHandlerAdapter());
                }
            });
            /**
             * 连接服务端
             */
            ChannelFuture f = bootstrap.connect(host, port).sync();
            log.info("TCP客户端连接成功, 地址是: " + host + ":" + port);
            /**
             * 等待连接端口关闭
             */
            f.channel().closeFuture().sync();
        }
        catch (Exception e)
        {
            log.error("启动netty client失败:", e);
        }
        finally
        {
            /**
             * 退出,释放资源
             */
            group.shutdownGracefully();
        }
    }
}
复制代码

NettyClientApplication程序启动类

@SpringBootApplication
public class NettyClientApplication implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(NettyClientApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		new BootNettyClient().connect("172.16.1.100", 6000);
	}
}
复制代码

测试

利用网络调试助手工具,开启一个服务端,模拟发送数据

发送一个完整的包(43 59 52 43 00 01 10 02 00 00 0f),如下图,客户端完整接收数据。

半包测试数据(43 59 52 43 00 01 10 02 00),无日志打印,说明客户端没有接收该不完整数据。

粘包数据测试,两个包一起发送(43 59 52 43 00 01 10 02 00 00 0f 43 59 52 43 00 01 10 02 00 00 0f),如下图,客户端同时接收到两条数据。

粘包数据测试,一个半包发送(43 59 52 43 00 01 10 02 00 00 0f 43 59 52 43 00 01 10 02),如下图,可以看出,只接收到前面完整包的数据,后面的半包数据被忽略。

业务代码编写

业务代码,无非就是将收到的数据进行一些逻辑处理,数据的解析。编写一个接收消息处理类即可。示例如下

通信参数对象:

序号

名称

字节数

取值范围

备注

1

对象标识

1

3

对象标识号:3

2

IP地址

4


每个字节表示一段地址值(A.B.C.D,第一字节对应A,依次类推)

3

端口

2



4

标志

1

[0-1]

0:网络通信,1: 485通信(端口赋波特率,IP赋值0)

@Slf4j
public class ClientService {

  /** 接收边缘盒子消息 */
  public void receiveData(MyProtocol myProtocol) {
    try {
      byte[] data = myProtocol.getContent();
      int type = data[1];
      int objId = data[4];

      // 心跳应答
      if (type == PackageTypeConstant.HEART_BEAT_REPLY) {
        log.info("--------收到心跳回复----------");
      }
      // 查询应答
      else if (type == PackageTypeConstant.QUERY_RESULT) {
        switch (objId) {
          case ObjectIdConstant.SIGNAL:
            {
              reSignalParameter(data); // 接收通信参数
              break;
            }
    
  					//.....
     
          default:
            {
              break;
            }
        }
      }
    } catch (Exception e) {
      log.error("错误的消息指令..", e);
    }
  }

  private void reSignalParameter(byte[] data) {
    EventCenterService.getInstance()
        .submitEvent(
            new IEvent() {
              @Override
              public void execute() {
                try {
                  SignalParameter sp = new SignalParameter();

                  int idx = 5;
                  byte temp1;
                  byte temp2;
                  int a = (data[idx++] & 0xFF);
                  int b = (data[idx++] & 0xFF);
                  int c = (data[idx++] & 0xFF);
                  int d = (data[idx++] & 0xFF);
                  String ip = a + "." + b + "." + c + "." + d;

                  temp1 = data[idx++];
                  temp2 = data[idx++];
                  // 端口号
                  int port = ((char) (temp1 & 0xFF) << 8) | (char) (temp2 & 0xFF);
                  int sign = data[idx];

                  sp.setIp(ip);
                  sp.setPort(port);
                  sp.setSign(sign);

                  DataConfig.signalParameterList.add(sp);
                } catch (Exception e) {
                  log.error("通信参数解析出错:", e);
                }
              }
            });
  }
}
复制代码

后记

工作中,利用netty开发网络通信服务,数据的编解码处理好了,后面的业务代码相对就很容易了。

本篇文章,是我在工作中的一些实战经验,希望对netty感兴趣的小伙伴有点帮助。关于netty的原理这篇文章就不做过多介绍了,前面的文章也讲了很多,后面时间主要讲讲netty实际的运用。

相关推荐

oracle数据导入导出_oracle数据导入导出工具

关于oracle的数据导入导出,这个功能的使用场景,一般是换服务环境,把原先的oracle数据导入到另外一台oracle数据库,或者导出备份使用。只不过oracle的导入导出命令不好记忆,稍稍有点复杂...

继续学习Python中的while true/break语句

上次讲到if语句的用法,大家在微信公众号问了小编很多问题,那么小编在这几种解决一下,1.else和elif是子模块,不能单独使用2.一个if语句中可以包括很多个elif语句,但结尾只能有一个else解...

python continue和break的区别_python中break语句和continue语句的区别

python中循环语句经常会使用continue和break,那么这2者的区别是?continue是跳出本次循环,进行下一次循环;break是跳出整个循环;例如:...

简单学Python——关键字6——break和continue

Python退出循环,有break语句和continue语句两种实现方式。break语句和continue语句的区别:break语句作用是终止循环。continue语句作用是跳出本轮循环,继续下一次循...

2-1,0基础学Python之 break退出循环、 continue继续循环 多重循

用for循环或者while循环时,如果要在循环体内直接退出循环,可以使用break语句。比如计算1至100的整数和,我们用while来实现:sum=0x=1whileTrue...

Python 中 break 和 continue 傻傻分不清

大家好啊,我是大田。今天分享一下break和continue在代码中的执行效果是什么,进一步区分出二者的区别。一、continue例1:当小明3岁时不打印年龄,其余年龄正常循环打印。可以看...

python中的流程控制语句:continue、break 和 return使用方法

Python中,continue、break和return是控制流程的关键语句,用于在循环或函数中提前退出或跳过某些操作。它们的用途和区别如下:1.continue(跳过当前循环的剩余部分,进...

L017:continue和break - 教程文案

continue和break在Python中,continue和break是用于控制循环(如for和while)执行流程的关键字,它们的作用如下:1.continue:跳过当前迭代,...

作为前端开发者,你都经历过怎样的面试?

已经裸辞1个月了,最近开始投简历找工作,遇到各种各样的面试,今天分享一下。其实在职的时候也做过面试官,面试官时,感觉自己问的问题很难区分候选人的能力,最好的办法就是看看候选人的github上的代码仓库...

面试被问 const 是否不可变?这样回答才显功底

作为前端开发者,我在学习ES6特性时,总被const的"善变"搞得一头雾水——为什么用const声明的数组还能push元素?为什么基本类型赋值就会报错?直到翻遍MDN文档、对着内存图反...

2023金九银十必看前端面试题!2w字精品!

导文2023金九银十必看前端面试题!金九银十黄金期来了想要跳槽的小伙伴快来看啊CSS1.请解释CSS的盒模型是什么,并描述其组成部分。答案:CSS的盒模型是用于布局和定位元素的概念。它由内容区域...

前端面试总结_前端面试题整理

记得当时大二的时候,看到实验室的学长学姐忙于各种春招,有些收获了大厂offer,有些还在苦苦面试,其实那时候的心里还蛮忐忑的,不知道自己大三的时候会是什么样的一个水平,所以从19年的寒假放完,大二下学...

由浅入深,66条JavaScript面试知识点(七)

作者:JakeZhang转发链接:https://juejin.im/post/5ef8377f6fb9a07e693a6061目录由浅入深,66条JavaScript面试知识点(一)由浅入深,66...

2024前端面试真题之—VUE篇_前端面试题vue2020及答案

添加图片注释,不超过140字(可选)1.vue的生命周期有哪些及每个生命周期做了什么?beforeCreate是newVue()之后触发的第一个钩子,在当前阶段data、methods、com...

今年最常见的前端面试题,你会做几道?

在面试或招聘前端开发人员时,期望、现实和需求之间总是存在着巨大差距。面试其实是一个交流想法的地方,挑战人们的思考方式,并客观地分析给定的问题。可以通过面试了解人们如何做出决策,了解一个人对技术和解决问...