Netty是一个非租塞的异步通信框架,它是机遇NIO开发的,NIO的全称是NoneBlocking IO,非阻塞IO,区别于BIO,BIO的全称是Blocking IO,阻塞IO,非阻塞比阻塞式的编程更加节省服务器资源,下面使用netty编写一个socket客户端与服务器端,算是对Netty的Hello World入门吧!
第一步:使用Maven导入netty相关的jar包,代码如下:
<dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>2.0.5.Final</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>1.3.0.CR9</version> <scope>test</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.23.Final</version> </dependency>
注意:没有第二个jboss-marshalling-serial包客户端可能无法接收到消息,所以最好导入进来。
第二步:使用jboss的marshalling来对发送和接收的socket数据进行编码解码操作,代码如下:
public class MarshallingCodeCFactory {
//解码
public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024);
return decoder;
}
//编码
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}第三步:编写socket server服务器端代码,用于接收与反馈socket客户端的消息,代码如下:
public class Server {
public static void main(String[] args) throws InterruptedException {
//1.这个是客户端接收线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.这个线程是用于处理实际业务的
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);// 绑定两个线程池
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.SO_BACKLOG, 1024);//TCP Buffer settings
b.option(ChannelOption.SO_SNDBUF, 32*1024);// Sets the size of the send buffer
b.option(ChannelOption.SO_RCVBUF, 32*1024);// Sets the receive buffer size
b.option(ChannelOption.SO_KEEPALIVE, true);// 是否一直存活
b.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
// Set up Marshalling Encoding and decoding
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new ServertHandler()); // 绑定服务器接收端处理类
}
});
ChannelFuture future = b.bind(8765).sync();// 绑定接口
//Waiting to close ( The program is blocked here waiting for the client request )
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();// Close thread
workerGroup.shutdownGracefully();// Close thread
}
}socket服务器端消息处理类,代码如下:
public class ServertHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
Send send = (Send) msg;
System.out.println("client收到 :"+send);
//将收到的消息反馈给客户端(随你咋搞,我这里是原数据返回)
Receive receive = new Receive();
receive.setId(send.getId());
receive.setMessage(send.getMessage());
receive.setName(send.getName());
ctx.writeAndFlush(receive);
}
}第四步:编写socket client客户端,用于发送消息给服务器端,代码如下:
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHandler()); //绑定消息处理类
}
});
//连接socket服务器端并发送消息
ChannelFuture f=b.connect("127.0.0.1",8765).sync();
for(int i=1;i<=5;i++){
Send send = new Send();
send.setId(i);
send.setMessage("message"+i);
send.setName("name"+i);
f.channel().writeAndFlush(send);
}
f.channel().closeFuture().sync();
worker.shutdownGracefully();
}
}socket客户端消息处理类,代码如下:
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
Receive receive = (Receive) msg;
System.out.println("server反馈 :"+receive);
} }第五步:发送端和接收端实体类代码:
发送端实体类
public class Send implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private String name;
private String message;
//getter/setter.....
@Override
public String toString() {
return "Send [id=" + id + ", name=" + name + ", message=" + message + "]";
}
}接收端实体类
public class Receive implements Serializable{
private static final long serialVersionUID = 1L;
private Integer id;
private String name;
private String message;
//getter/setter.....
@Override
public String toString() {
return "Receive [id=" + id + ", name=" + name + ", message=" + message + "]";
}
}说明:我们从代码中也能体会到Netty是一种异步非阻塞式的编程,因为接受消息都是通过ChannelHandlerAdapter这个类来处理的,这种方式编写的socket比传统的socket效率更高,更节省资源!