Java Netty快速入门与SpringBoot整合教程
Netty是一个基于NIO的网络通信框架,它使用异步事件驱动的机制,可以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty是一个Java开源项目,原由JBOSS提供,现在是Github上的独立项目。Netty对Java NIO进行了封装,降低了使用难度和门槛。
一、基本使用
使用Netty记住三行核心代码在加一个handler处理器
* 第一行:创建netty服务端的对象 new ServerBootstrap();
* 第二行:绑定配置参数 bootstrap.group
* 第三行:启动netty服务端 绑定端口号 bootstrap.bind(1234).sync()
* 添加处理器:socketChannel.pipeline().addLast(.....)主要使用的io.netty包下在类,废话不多说直接看看Netty是如何使用的吧
//文件:com.yyge.netty.NettyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 问:有了NIO之后为什么还要用Netty?
* Netty本质上就是一个对NIO程序的封装,对NIO程序做了优化和封装
* 解决传统NIO程序写起来非常复杂的问题,Netty简化了开发
*
* 核心就三行代码 在加一个handler处理器
* 第一行:创建netty服务端的对象 new ServerBootstrap();
* 第二行:绑定配置参数 bootstrap.group
* 第三行:启动netty服务端 绑定端口号 bootstrap.bind(1234).sync()
* 添加处理器:socketChannel.pipeline().addLast(.....)
*
* Netty底层线程模型比Redis性能还要高
* Netty机器配置较高的话可以支持单机百万并发
* Reactor响应式变成设计模式
*/
public class NettyServer {
public static void main(String[] args) {
//创建两个现场组bossGroup 和 workerGroup,含有的子线程NioEventLoop的个数默认为CPU的核数的两倍
//一主多从 一般主selector是一个 也可以多个 建议是一个(老板一个 员工多个...)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); //内部线程池 - 主
NioEventLoopGroup workerGroup = new NioEventLoopGroup(10); //内部线程池 - 从
try{
//创建服务端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来配置参数
bootstrap.group(bossGroup,workerGroup) //设置两个线程组
//使用NioServerSocketChannel 作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//初始化服务器连接队列大小,服务器处理客户端连接请求是顺序处理的,所以同一件只能处理一个客户端连接
// 多个客户端同时来的时候,服务端讲不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start...");
//绑定一个端口号并且同步,生成了一个ChannelFuture异步对象,通过isDone()能方法可以片段异步事件的执行情况
//启动服务器(绑定端口号) ,bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture channelFuture = bootstrap.bind(1234).sync();
//给cf注册监听器,监听我们关心的事件
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}}
如上代码中有一个new NettyServerHandler() 下面是 NettyServerHandler.java
//com.yyge.netty.NettyServerHandler.java
/**
* 绑定的处理器处理事件
* 自定义Handle需要继承netty规定好的某个HandlerAdapter
*/
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.Data;
import java.nio.ByteBuffer;
@Data
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}
/**
* 当客户端连接到服务器完成就会触发该方法
* @param ctx
* @throws Exception 抛出异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx){
System.out.println("客户端连接通过建立完成");
}
/**
* 读取客户端发送的数据
* @param ctx 上下文对象,含通道channel,管代pipeline
* @param msg 客户端发送的消息
* @throws Exception 抛出异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf =(ByteBuf) msg;
System.out.println("接收到客户端消息:"+buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
客户端与服务端的几大部分事件都可以在NettyServerHandler中写,比如连接事件、收发消息 等等
二、整合SpringBootz整合Netty
思路:因为SpringBoot 项目如果我Web项目启动的时候会占用一个端口,并且我们是与这个端口做请求交互的,那么Netty是与客户端建立连接 ,也要占用一个端口 ,我们如何整合呢,方案是直接在Spring容器初始化完毕时去开启Netty服务,整个过程遵循如下步骤
第一步: SpringBoot 加载完毕后 创建Netty对象 --ServerBootstrap 并且完成相关的配置
// com.yyge.init.NettyInit.java;
import com.yyge.service.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 初始化netty
*/
@Slf4j
@Data
@Component
public class NettyInit implements ApplicationRunner {
private Map<String, ChannelHandlerContext> map =new HashMap<>();
//这里是一个测试 如果你需要从外部注入类似于redisTemplate这类服务
//则可以在new NettyServerHandler 的时候将对象注入 否则在内部取到的是空值
//因为 NettyServerHandler 每一次连接客户端都会new一个新的对象
@Autowired
private RedisTemplate redisTemplate;
public void initNetty(){
log.info("Netty通知组件正在初始化...");
//log.info("TRC20转账==> fromAddress:{}, toAddress:{},price:{}", fromAddress, toAddress, amount);
//创建两个现场组bossGroup 和 workerGroup,含有的子线程NioEventLoop的个数默认为CPU的核数的两倍
//一主多从 一般主selector是一个 也可以多个 建议是一个(老板一个 员工多个...)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); //内部线程池 - 主
NioEventLoopGroup workerGroup = new NioEventLoopGroup(10); //内部线程池 - 从
try{
//创建服务端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来配置参数
bootstrap.group(bossGroup,workerGroup) //设置两个线程组
//使用NioServerSocketChannel 作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//初始化服务器连接队列大小,服务器处理客户端连接请求是顺序处理的,所以同一件只能处理一个客户端连接
// 多个客户端同时来的时候,服务端讲不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 这里建议直接new 不要使用@Autowre注入 每一次连接都会调用 !!!!!
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
//绑定一个端口号并且同步,生成了一个ChannelFuture异步对象,通过isDone()能方法可以片段异步事件的执行情况
//启动服务器(绑定端口号) ,bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture channelFuture = bootstrap.bind(1234).sync();
log.info("Netty通知组件初始化完成 ==> localhost:1234");
//给cf注册监听器,监听我们关心的事件
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void run(ApplicationArguments args) throws Exception {
//初始化netty
this.initNetty();
}
}
如上NettyInit .java 继承自 ApplicationRunner 当Bean加载完毕后悔自动执行Run方法内的内容
我们在Run内部去实例化了 Netty的对象 并且指定端口完成相关配置
第二步:编写Netty处理类 - NettyServerHandler.java 代码如下
// com.yyge.service.handler.NettyServerHandler.java;
/**
* 绑定的处理器处理事件
* 自定义Handle需要继承netty规定好的某个HandlerAdapter
*/
import io.netty.buffer.ByteBuf;;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//注入任务调度
private Map<String,Object> data = new HashMap<>();
// 存储定时任务的引用,以便之后取消任务
private ScheduledFuture<?> scheduledFuture;
//!!!如果您有Utils或者是其他的Service 需要通过构造器注入的方式来进行注入 拿到对象
// 该NettyServerHandler类 是每一个连接一个线程 不能在内部通过@Autowired 注入对象 否则为null
public NettyServerHandler() {
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}
/**
* 当客户端连接到服务器完成就会出发该方法
* @param ctx
* @throws Exception 抛出异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx){
log.info("客户端连接通过建立完成:{}",ctx.toString());
}
/**
* 读取客户端发送的数据
* @param ctx 上下文对象,含通道channel,管代pipeline
* @param msg 客户端发送的消息
* @throws Exception 抛出异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf =(ByteBuf) msg;
System.out.println("接收到客户端消息:"+buf.toString(CharsetUtil.UTF_8));
this.data.put("key",buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
String key = (String) this.data.get("key");
//开启一个线程
Runnable runnable = () -> {
System.out.println("每10秒执行一次任务");
};
// 安排一个定时任务
scheduledFuture = ctx.channel().eventLoop().scheduleAtFixedRate(runnable, 0, 10, TimeUnit.SECONDS);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {;
super.channelWritabilityChanged(ctx);
}
/**
* 如果发生异常 或者是用户主动断开连接 会被触发
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("有异常发生么..........");
//super.exceptionCaught(ctx, cause);
}
/**
* 停止定时任务
*/
public void stopTask() {
// 如果scheduledFuture不为null且未被取消,则取消任务
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(false);
scheduledFuture = null; // 将引用置为null,避免内存泄漏
}
}
}
第三步:在NettyServerHandler .java类中完成您的相关业务逻辑
注意点:NettyInit .java内 尽量去new NettyServerHandler 处理类 因为 每一次连接新的客户端都会new 保证了对象的安全性 在new NettyServerHandler () 的时候可以把必要的工具类传入进去 在NettyServerHandler 的构造函数内进行赋值
三、NettyServerHandler 相关操作
1. 给客户端发消息
//发送消息 例子
//ctx 内部遍历
String str = "订单失败!----->>@!";
ByteBuf buf = Unpooled.copiedBuffer(str.getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
2.关闭连接
ctx.close(); //断开连接
3.创建定时任务
import com.tron.context.QuartzContext;
import com.tron.context.RedisContext;
import com.tron.entity.PayOrder;
import com.tron.utils.tron.TronUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//注入任务调度
private Map<String,Object> data = new HashMap<>();
// 存储定时任务的引用,以便之后取消任务
private ScheduledFuture<?> scheduledFuture;
public NettyServerHandler() {
///....
}
/**
* 数据读取完毕处理方法
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
String key = (String) this.data.get("key");
System.out.println(tronUtils);
//开启一个线程
Runnable runnable = () -> {
System.out.println("每10秒执行一次");
//stopTask() 关闭定时任务
};
// 安排一个定时任务
scheduledFuture = ctx.channel().eventLoop().scheduleAtFixedRate(runnable, 0, 10, TimeUnit.SECONDS);
}
public void stopTask() {
// 如果scheduledFuture不为null且未被取消,则取消任务
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(false);
scheduledFuture = null; // 将引用置为null,避免内存泄漏
}
}
}
四、Demo代码项目下载
gitee:https://gitee.com/gityyge/java-demos/tree/master/netty
如果您觉得本篇文章对你有帮助的话欢迎Star,我们将持续更新干货教程和demo
发表评论