壹影博客.
我在下午4点钟开始想你
Java Netty快速入门与SpringBoot整合教程
  • 2024-4-20日
  • 0评论
  • 79围观

Java Netty快速入门与SpringBoot整合教程

Netty是一个基于NIO的网络通信框架,它使用异步事件驱动的机制,可以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty是一个Java开源项目,原由JBOSS提供,现在是Github上的独立项目。Netty对Java NIO进行了封装,降低了使用难度和门槛。

 

一、基本使用

 使用Netty记住三行核心代码在加一个handler处理器
 * 第一行:创建netty服务端的对象  new ServerBootstrap();
 * 第二行:绑定配置参数 bootstrap.group
 * 第三行:启动netty服务端 绑定端口号 bootstrap.bind(1234).sync()
 * 添加处理器:socketChannel.pipeline().addLast(.....)

主要使用的io.netty包下在类,废话不多说直接看看Netty是如何使用的吧

//文件:com.yyge.netty.NettyServer.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 问:有了NIO之后为什么还要用Netty?
 * Netty本质上就是一个对NIO程序的封装,对NIO程序做了优化和封装
 * 解决传统NIO程序写起来非常复杂的问题,Netty简化了开发
 *
 * 核心就三行代码 在加一个handler处理器
 * 第一行:创建netty服务端的对象  new ServerBootstrap();
 * 第二行:绑定配置参数 bootstrap.group
 * 第三行:启动netty服务端 绑定端口号 bootstrap.bind(1234).sync()
 * 添加处理器:socketChannel.pipeline().addLast(.....)
 *
 * Netty底层线程模型比Redis性能还要高
 * Netty机器配置较高的话可以支持单机百万并发
 * Reactor响应式变成设计模式
 */
public class NettyServer {
    public static void main(String[] args) {
        //创建两个现场组bossGroup 和 workerGroup,含有的子线程NioEventLoop的个数默认为CPU的核数的两倍
        //一主多从  一般主selector是一个 也可以多个 建议是一个(老板一个 员工多个...)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);  //内部线程池 - 主
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(10); //内部线程池 - 从
        try{
            //创建服务端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程来配置参数
            bootstrap.group(bossGroup,workerGroup) //设置两个线程组

                    //使用NioServerSocketChannel 作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)

                    //初始化服务器连接队列大小,服务器处理客户端连接请求是顺序处理的,所以同一件只能处理一个客户端连接
                    // 多个客户端同时来的时候,服务端讲不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });

                   System.out.println("netty server start...");
                   //绑定一个端口号并且同步,生成了一个ChannelFuture异步对象,通过isDone()能方法可以片段异步事件的执行情况
                   //启动服务器(绑定端口号) ,bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture channelFuture = bootstrap.bind(1234).sync();
            //给cf注册监听器,监听我们关心的事件

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }}

如上代码中有一个new NettyServerHandler() 下面是 NettyServerHandler.java

//com.yyge.netty.NettyServerHandler.java

/**
 *   绑定的处理器处理事件
 *   自定义Handle需要继承netty规定好的某个HandlerAdapter
 */


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.Data;

import java.nio.ByteBuffer;

@Data
public class NettyServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }

    /**
     * 当客户端连接到服务器完成就会触发该方法
     * @param ctx
     * @throws Exception 抛出异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx){
        System.out.println("客户端连接通过建立完成");
    }

    /**
     * 读取客户端发送的数据
     * @param ctx 上下文对象,含通道channel,管代pipeline
     * @param msg 客户端发送的消息
     * @throws Exception 抛出异常
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf =(ByteBuf) msg;
        System.out.println("接收到客户端消息:"+buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕处理方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
        super.channelReadComplete(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

客户端与服务端的几大部分事件都可以在NettyServerHandler中写,比如连接事件、收发消息 等等

二、整合SpringBootz整合Netty

 思路:因为SpringBoot 项目如果我Web项目启动的时候会占用一个端口,并且我们是与这个端口做请求交互的,那么Netty是与客户端建立连接 ,也要占用一个端口 ,我们如何整合呢,方案是直接在Spring容器初始化完毕时去开启Netty服务,整个过程遵循如下步骤

 第一步: SpringBoot 加载完毕后 创建Netty对象 --ServerBootstrap 并且完成相关的配置

// com.yyge.init.NettyInit.java;

import com.yyge.service.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 初始化netty
 */
@Slf4j
@Data
@Component
public class NettyInit implements ApplicationRunner {

    private Map<String, ChannelHandlerContext> map =new HashMap<>();

    //这里是一个测试 如果你需要从外部注入类似于redisTemplate这类服务
    //则可以在new NettyServerHandler 的时候将对象注入 否则在内部取到的是空值
    //因为 NettyServerHandler 每一次连接客户端都会new一个新的对象
    @Autowired
    private RedisTemplate redisTemplate;

    public void initNetty(){
        log.info("Netty通知组件正在初始化...");
        //log.info("TRC20转账==> fromAddress:{}, toAddress:{},price:{}", fromAddress, toAddress, amount);
        //创建两个现场组bossGroup 和 workerGroup,含有的子线程NioEventLoop的个数默认为CPU的核数的两倍
        //一主多从  一般主selector是一个 也可以多个 建议是一个(老板一个 员工多个...)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);  //内部线程池 - 主
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(10); //内部线程池 - 从
        try{
            //创建服务端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程来配置参数
            bootstrap.group(bossGroup,workerGroup) //设置两个线程组

                    //使用NioServerSocketChannel 作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)

                    //初始化服务器连接队列大小,服务器处理客户端连接请求是顺序处理的,所以同一件只能处理一个客户端连接
                    // 多个客户端同时来的时候,服务端讲不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 这里建议直接new 不要使用@Autowre注入 每一次连接都会调用 !!!!!
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
                   //绑定一个端口号并且同步,生成了一个ChannelFuture异步对象,通过isDone()能方法可以片段异步事件的执行情况
                   //启动服务器(绑定端口号) ,bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture channelFuture = bootstrap.bind(1234).sync();
            log.info("Netty通知组件初始化完成 ==> localhost:1234");
            //给cf注册监听器,监听我们关心的事件

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
     //初始化netty
      this.initNetty();
    }
}

如上NettyInit .java 继承自 ApplicationRunner 当Bean加载完毕后悔自动执行Run方法内的内容

我们在Run内部去实例化了 Netty的对象 并且指定端口完成相关配置

第二步:编写Netty处理类 - NettyServerHandler.java 代码如下

// com.yyge.service.handler.NettyServerHandler.java;
/**
 *   绑定的处理器处理事件
 *   自定义Handle需要继承netty规定好的某个HandlerAdapter
 */

import io.netty.buffer.ByteBuf;;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Data
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //注入任务调度
    private Map<String,Object> data =  new HashMap<>();

    // 存储定时任务的引用,以便之后取消任务
    private ScheduledFuture<?> scheduledFuture;

    //!!!如果您有Utils或者是其他的Service 需要通过构造器注入的方式来进行注入 拿到对象
    // 该NettyServerHandler类 是每一个连接一个线程 不能在内部通过@Autowired 注入对象 否则为null
    public NettyServerHandler() {

    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }

    /**
     * 当客户端连接到服务器完成就会出发该方法
     * @param ctx
     * @throws Exception 抛出异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx){
        log.info("客户端连接通过建立完成:{}",ctx.toString());
    }

    /**
     * 读取客户端发送的数据
     * @param ctx 上下文对象,含通道channel,管代pipeline
     * @param msg 客户端发送的消息
     * @throws Exception 抛出异常
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf =(ByteBuf) msg;
        System.out.println("接收到客户端消息:"+buf.toString(CharsetUtil.UTF_8));
        this.data.put("key",buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕处理方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        String key = (String) this.data.get("key");
        //开启一个线程
        Runnable runnable = () -> {
            System.out.println("每10秒执行一次任务");
        };

        // 安排一个定时任务
        scheduledFuture = ctx.channel().eventLoop().scheduleAtFixedRate(runnable, 0, 10, TimeUnit.SECONDS);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {;
        super.channelWritabilityChanged(ctx);
    }

    /**
     * 如果发生异常 或者是用户主动断开连接 会被触发
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("有异常发生么..........");
        //super.exceptionCaught(ctx, cause);
    }



    /**
     * 停止定时任务
     */
    public void stopTask() {
        // 如果scheduledFuture不为null且未被取消,则取消任务
        if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
            scheduledFuture.cancel(false);
            scheduledFuture = null; // 将引用置为null,避免内存泄漏
        }
    }

}

第三步:在NettyServerHandler .java类中完成您的相关业务逻辑

 注意点:NettyInit .java内 尽量去new NettyServerHandler 处理类 因为 每一次连接新的客户端都会new 保证了对象的安全性 在new NettyServerHandler () 的时候可以把必要的工具类传入进去 在NettyServerHandler 的构造函数内进行赋值

三、NettyServerHandler 相关操作

1. 给客户端发消息

//发送消息 例子
//ctx 内部遍历
String str = "订单失败!----->>@!";
ByteBuf buf = Unpooled.copiedBuffer(str.getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);

 2.关闭连接

ctx.close(); //断开连接

3.创建定时任务

import com.tron.context.QuartzContext;
import com.tron.context.RedisContext;
import com.tron.entity.PayOrder;
import com.tron.utils.tron.TronUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Data
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //注入任务调度
    private Map<String,Object> data =  new HashMap<>();

    // 存储定时任务的引用,以便之后取消任务
    private ScheduledFuture<?> scheduledFuture;

    public NettyServerHandler() {
      ///....
    }

    
    /**
     * 数据读取完毕处理方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        String key = (String) this.data.get("key");
        System.out.println(tronUtils);
        //开启一个线程
        Runnable runnable = () -> {
            System.out.println("每10秒执行一次");
            //stopTask() 关闭定时任务
        };

        // 安排一个定时任务
        scheduledFuture = ctx.channel().eventLoop().scheduleAtFixedRate(runnable, 0, 10, TimeUnit.SECONDS);
    }

    public void stopTask() {
        // 如果scheduledFuture不为null且未被取消,则取消任务
        if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
            scheduledFuture.cancel(false);
            scheduledFuture = null; // 将引用置为null,避免内存泄漏
        }
    }
}

四、Demo代码项目下载

 gitee:https://gitee.com/gityyge/java-demos/tree/master/netty

 如果您觉得本篇文章对你有帮助的话欢迎Star,我们将持续更新干货教程和demo

发表评论