java使用java netty教程5写得和硬件交互的程序,采用TCP,硬件上行一次数据后就会自动断开,怎么回写数据?

Netty网络聊天室之心跳检测及断线重连
前面介绍了Netty服务端客户端基本通信框架的搭建过程。下面将介绍Netty如何进行心跳检测以及处理客户端的断线重连。为了适应恶劣的网络环境,比如网络超时、闪断,客户端进程僵死,需要机制来保证双方的通信能正常工作或者自动恢复。对于服务端来说,当客户端由于某些原因导致无法与服务端通信的,服务端需要主动注销与客户端的连接,减少无效链接的资源消耗。对于客户端来说,当服务进程宕机后进行重启,客户端应该自动能发起重连操作。(一)心跳监测客户端采用心跳机制,来确保服务端能及时发现无效的客户端链接。这里有个问题,心跳机制的发起方应该由服务端还是客户端。假设服务端出现宕机,客户端唯一能做的就是保证能及时发现服务端重启后能进行重连。因此,可以只由服务端来发起心跳检测。一旦服务端发现客户端连接超时多次,则 立即关闭链路。心跳检测具体的设计思路如下:1.服务端定时查看客户端链路是否空闲,一旦持续时间T没有收到客户端的请求包,则主动发送Ping包给客户端,同时心跳超时次数加1。2.客户端收到服务的Ping请求,则立即发送一个Pong应答包。3.服务端每次收到客户端的数据包,则重置超时次数。若连续N次未收到心跳应答包,则关闭链接。心跳检测示例代码如下:1.服务端NettyChatServer类的ChannelPipeline增加空闲状态处理器(IdleStateHandler)。该类用于检测通信Channel的读写状态超时,以此来实现心跳检测。IdleStateHandler的构造函数有三个参数,依次为读超时秒数,写超时秒数,读写超时秒数。我们只需要用到第一个参数。2.ChatServerHandler类必须覆写userEventTriggered()方法处理超时逻辑。当超时次数少于指定次数时,向客户端发送Ping包;当超时次数大于指定次数时,注销客户端链接。package com.kingston.
import io.netty.channel.C
import io.netty.channel.ChannelHandlerA
import io.netty.channel.ChannelHandlerC
import io.netty.channel.ChannelP
import io.netty.handler.timeout.IdleS
import io.netty.handler.timeout.IdleStateE
import java.io.IOE
import java.util.M
import java.util.concurrent.ConcurrentHashM
import com.kingston.base.ServerM
import com.kingston.net.P
import com.kingston.net.PacketM
import com.kingston.net.PacketT
import com.kingston.service.login.ClientHeartB
import com.kingston.service.login.LoginManagerP
import com.kingston.service.login.ServerL
public class ChatServerHandler extends ChannelHandlerAdapter{
//客户端超时次数
private Map&ChannelHandlerContext,Integer& clientOvertimeMap = new ConcurrentHashMap&&();
private final int MAX_OVERTIME
//超时次数超过该值则注销连接
public void channelRead(ChannelHandlerContext context,Object msg)
throws Exception{
packet = (Packet)
if(packet.getPacketType() == PacketType.ServerLogin ){
ServerLogin loginPact = (ServerLogin)
LoginManagerProxy.getManager().validateLogin(context,loginPact.getUserId(), loginPact.getUserPwd());
if(validateSession(packet)){
PacketManager.execPacket(packet);
clientOvertimeMap.remove(context);//只要接受到数据包,则清空超时次数
boolean validateSession(Packet loginPact){
public void close(ChannelHandlerContext ctx,ChannelPromise promise){
System.err.println("TCP closed...");
ctx.close(promise);
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("客户端关闭1");
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
System.err.println("客户端关闭2");
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println("业务逻辑出错");
cause.printStackTrace();
ctx.fireExceptionCaught(cause);
Channel channel = ctx.channel();
if(cause instanceof
IOException && channel.isActive()){
System.err.println("simpleclient"+channel.remoteAddress()+"异常");
ctx.close();
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
//心跳包检测读超时
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent)
if (e.state() == IdleState.READER_IDLE) {
System.err.println("客户端读超时");
int overtimeTimes = clientOvertimeMap.getOrDefault(ctx, 0);
if(overtimeTimes & MAX_OVERTIME){
ServerManager.sendPacketTo(new ClientHeartBeat(), ctx);
addUserOvertime(ctx);
ServerManager.ungisterUserContext(ctx);
private void addUserOvertime(ChannelHandlerContext ctx){
int oldTimes = 0;
if(clientOvertimeMap.containsKey(ctx)){
oldTimes = clientOvertimeMap.get(ctx);
clientOvertimeMap.put(ctx, (int)(oldTimes+1));
3.增加下发包ClientHeartBeat类定义。客户端在收到该包的时候,需要向服务端发送一个应答包。package com.kingston.service.
import io.netty.buffer.ByteB
import com.kingston.base.ServerM
import com.kingston.net.P
import com.kingston.net.PacketT
public class ClientHeartBeat extends Packet{
public void writePacketMsg(ByteBuf buf) {
// TODO Auto-generated method stub
public void readFromBuff(ByteBuf buf) {
// TODO Auto-generated method stub
public PacketType getPacketType() {
return PacketType.ClientHeartB
public void execPacket() {
System.err.println("收到服务端的ping请求后,回复一个pong响应");
ServerManager.sendServerRequest(new ServerHeartBeat());
4.服务端在收到应答包后,重置超时次数为0心跳调试技巧:如果需要演示心跳超时,只需在客户端启动后在任意代码里加个断点,这样服务端就会检测到客户端读超时。(二)客户端断线重连当服务端宕机后,客户端需要定时检测服务端开启状态,重新连接。实现逻辑也比较简单,只要在NettyChatClient类断开链接的逻辑后加上重连逻辑即可(reConnectServer()方法)。每次重连检测不必过于频繁,可以让线程休眠一段时间。package com.kingston.
import java.net.InetSocketA
import io.netty.bootstrap.B
import io.netty.channel.ChannelF
import io.netty.channel.ChannelI
import io.netty.channel.ChannelP
import io.netty.channel.EventLoopG
import io.netty.channel.nio.NioEventLoopG
import io.netty.channel.socket.SocketC
import io.netty.channel.socket.nio.NioSocketC
import io.netty.handler.codec.LengthFieldP
import com.kingston.net.codec.PacketD
import com.kingston.net.codec.PacketE
public class NettyChatClient {
public void connect(String host,int port) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b
= new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer&SocketChannel&(){
protected void initChannel(SocketChannel arg0)
throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
pipeline.addLast(new PacketDecoder(,2,0,2));
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new PacketEncoder());
pipeline.addLast(new HeartBeatReqHandler());
pipeline.addLast(new NettyClientHandler());
ChannelFuture f = b.connect(new InetSocketAddress(host, port),
new InetSocketAddress(NettyContants.LOCAL_SERVER_IP, NettyContants.LOCAL_SERVER_PORT))
f.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
group.shutdownGracefully();
//这里不再是优雅关闭了
reConnectServer();
* 断线重连
private void reConnectServer(){
Thread.sleep(5000);
System.err.println("客户端进行断线重连");
connect(NettyContants.REMOTE_SERVER_IP,
NettyContants.REMOTE_SERVER_PORT);
} catch (Exception e) {
e.printStackTrace();
}调试技巧:启动服务端与客户端后,单方面关闭服务端,即可看见客户端定时重连了。需要保证客户端重连成功后,能够与服务端收发数据,同时客户端也不无须继续检测重连。全部代码已在github上托管(代码经过多次重构,与博客上的代码略有不同)完整服务端代码请移步 --& 完整客户端代码请移步 --&
没有更多推荐了,
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!netty处理tcp粘包/拆包问题&&
&所谓的粘包/拆包,用一个例子来说明就是:
加入客户端向服务端发送1000条数据,如果不加以处理的话,那么服务端接收的数据可能就是如图所示了:
&数据要么几段粘在了一起,要么一段数据被拆成了几段,这肯定会造成很大的影响。
而解决后的所接收的正确数据该如下所示:
简单讲了一下粘包/拆包是什么样的问题,详细解释可见csdn博客
客户端代码:
package com.netty.
import java.io.IOE
import java.net.S
import java.net.UnknownHostE
import java.nio.ByteB
* @author Chalmers 日 下午2:35:39
public class Client {
public static void main(String[] args) throws UnknownHostException,
IOException {
Socket socket = new Socket(&127.0.0.1&, 9090);
String message = &hello&;
byte[] bytes = message.getBytes();
// 设置空间大小为一个存储了长度的int型数据(长度)加上转换后的byte数组
ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);
// 将长度存入
buffer.putInt(bytes.length);
// 将数据存入
buffer.put(bytes);
// 转换成字节数组
byte[] array = buffer.array();
// 向服务端发送1000次
for (int i = 0; i & 1000; i++) {
socket.getOutputStream().write(array);
socket.close();
处理问题代码:
package com.netty.
import org.jboss.netty.buffer.ChannelB
import org.jboss.netty.channel.C
import org.jboss.netty.channel.ChannelHandlerC
import org.jboss.netty.handler.codec.frame.FrameD
* @author Chalmers 日 下午2:23:49
public class MyDecoder extends FrameDecoder {
protected Object decode(ChannelHandlerContext chc, Channel channel,
ChannelBuffer buffer) throws Exception {
// 如果buffer中的可读字节大于4个(即除了长度以外还有数据,因为长度可能是为0的)
if (buffer.readableBytes() & 4) {
// 标记,指向当前指针位置,读取数据时使用
buffer.markReaderIndex();
// 取得长度
int len = buffer.readInt();
// 如果剩余可读字节小于长度的话,则表明发生了拆包现象,那么不对它进行处理
if (buffer.readableBytes() & len) {
// 重置标记
buffer.resetReaderIndex();
// 返回null,表示等待
// 对数据进行处理
byte[] bytes = new byte[len];
buffer.readBytes(bytes);
// 将数据返回到ServerHandler中进行处理
return new String(bytes);
package com.netty.
import org.jboss.netty.channel.ChannelHandlerC
import org.jboss.netty.channel.MessageE
import org.jboss.netty.channel.SimpleChannelH
* @author Chalmers 日 下午2:22:41
public class ServerHandler extends SimpleChannelHandler {
int count = 1;
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
// 对从MyDecoder中传递过来的数据进行处理
System.out.println((String) e.getMessage() + &
& + count);
服务端代码:
package com.netty.
import java.net.InetSocketA
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import org.jboss.netty.bootstrap.ServerB
import org.jboss.netty.channel.ChannelP
import org.jboss.netty.channel.ChannelPipelineF
import org.jboss.netty.channel.C
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelF
import org.jboss.netty.handler.codec.string.StringE
* @author Chalmers 日 下午2:21:33
public class Server {
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
ExecutorService boss = Executors.newCachedThreadPool();
ExecutorService worker = Executors.newCachedThreadPool();
serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast(&decoder&, new MyDecoder());
pipeline.addLast(&encoder&, new StringEncoder());
pipeline.addLast(&handler&, new ServerHandler());
serverBootstrap.bind(new InetSocketAddress(9090));
System.out.println(&start...&);
被转藏 : 1次
被转藏 : 1次3个netty5的例子,简单介绍netty的用法
日期: 17:04:01
来源:csdn
3个netty5的例子,简单介绍netty的用法
这是一个netty快速入门的例子,也是我的学习笔记,比较简单,翻译于官方的文档整理后把所有代码注释放在每一行代码中间,简单明了地介绍一些基础的用法。
首页这是基于netty5的例子,如果需要使用请依赖netty5的包。maven引用方式
&dependency&
&groupId&io.netty&/groupId&
&artifactId&netty-all&/artifactId&
&version&5.0.0.Alpha2&/version&
&/dependency&
0.Netty Server
package com.tjbsl.netty.demo0.
import com.tjbsl.netty.demo3.time.TimeServerH
import io.netty.bootstrap.ServerB
import io.netty.channel.ChannelF
import io.netty.channel.ChannelI
import io.netty.channel.ChannelO
import io.netty.channel.EventLoopG
import io.netty.channel.nio.NioEventLoopG
import io.netty.channel.socket.SocketC
import io.netty.channel.socket.nio.NioServerSocketC
* 处理数据
public class NettyServer {
public NettyServer(int port) {
this.port =
public void run() throws Exception {
* NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器,
* Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议。
* 在这个例子中我们实现了一个服务端的应用,
* 因此会有2个NioEventLoopGroup会被使用。
* 第一个经常被叫做‘boss’,用来接收进来的连接。
* 第二个经常被叫做‘worker’,用来处理已经被接收的连接,
* 一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。
* 如何知道多少个线程已经被使用,如何映射到已经创建的Channels上都需要依赖于EventLoopGroup的实现,
* 并且可以通过构造函数来配置他们的关系。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
System.out.println("准备运行端口:" + port);
* ServerBootstrap 是一个启动NIO服务的辅助启动类
* 你可以在这个服务中直接使用Channel
ServerBootstrap b = new ServerBootstrap();
* 这一步是必须的,如果没有设置group将会报java.lang.IllegalStateException: group not set异常
b = b.group(bossGroup, workerGroup);
* ServerSocketChannel以NIO的selector为基础进行实现的,用来接收新的连接
* 这里告诉Channel如何获取新的连接.
b = b.channel(NioServerSocketChannel.class);
* 这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。
* ChannelInitializer是一个特殊的处理类,
* 他的目的是帮助使用者配置一个新的Channel。
* 也许你想通过增加一些处理类比如NettyServerHandler来配置一个新的Channel
* 或者其对应的ChannelPipeline来实现你的网络程序。
* 当你的程序变的复杂时,可能你会增加更多的处理类到pipline上,
* 然后提取这些匿名类到最顶层的类上。
b = b.childHandler(new ChannelInitializer&SocketChannel&() { // (4)
public void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast(new DiscardServerHandler());//demo1.discard
//ch.pipeline().addLast(new ResponseServerHandler());//demo2.echo
ch.pipeline().addLast(new TimeServerHandler());//demo3.time
* 你可以设置这里指定的通道实现的配置参数。
* 我们正在写一个TCP/IP的服务端,
* 因此我们被允许设置socket的参数选项比如tcpNoDelay和keepAlive。
* 请参考ChannelOption和详细的ChannelConfig实现的接口文档以此可以对ChannelOptions的有一个大概的认识。
b = b.option(ChannelOption.SO_BACKLOG, 128);
* option()是提供给NioServerSocketChannel用来接收进来的连接。
* childOption()是提供给由父管道ServerChannel接收到的连接,
* 在这个例子中也是NioServerSocketChannel。
b = b.childOption(ChannelOption.SO_KEEPALIVE, true);
* 绑定端口并启动去接收进来的连接
ChannelFuture f = b.bind(port).sync();
* 这里会一直等待,直到socket被关闭
f.channel().closeFuture().sync();
} finally {
* 优雅关闭
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
public static void main(String[] args) throws Exception {
if (args.length & 0) {
port = Integer.parseInt(args[0]);
port = 8000;
new NettyServer(port).run();
//通过cmd窗口的telnet 127.0.0.1 8000运行
1.DISCARD服务(丢弃服务,指的是会忽略所有接收的数据的一种协议)
package com.tjbsl.netty.demo1.
import io.netty.buffer.ByteB
import io.netty.channel.ChannelHandlerA
import io.netty.channel.ChannelHandlerC
import io.netty.util.CharsetU
import io.netty.util.ReferenceCountU
* 服务端处理通道.这里只是打印一下请求的内容,并不对请求进行任何的响应
* DiscardServerHandler 继承自 ChannelHandlerAdapter,
* 这个类实现了ChannelHandler接口,
* ChannelHandler提供了许多事件处理的接口方法,
* 然后你可以覆盖这些方法。
* 现在仅仅只需要继承ChannelHandlerAdapter类而不是你自己去实现接口方法。
public class DiscardServerHandler extends ChannelHandlerAdapter {
* 这里我们覆盖了chanelRead()事件处理方法。
* 每当从客户端收到新的数据时,
* 这个方法会在收到消息时被调用,
* 这个例子中,收到的消息的类型是ByteBuf
* @param ctx 通道处理的上下文信息
* @param msg 接收的消息
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf)
while (in.isReadable()) {
System.out.print((char) in.readByte());
System.out.flush();
//这一句和上面注释的的效果都是打印输入的字符
System.out.println(in.toString(CharsetUtil.US_ASCII));
}finally {
* ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。
* 请记住处理器的职责是释放所有传递到处理器的引用计数对象。
ReferenceCountUtil.release(msg);
* 这个方法会在发生异常时触发
* @param ctx
* @param cause
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
* 发生异常后,关闭连接
cause.printStackTrace();
ctx.close();
以上是一个丢弃服务的处理方式,你可以运行后通过telnet来发送消息,来查看是否正常运行,注意console里会打印你的输入内容。
2.ECHO服务(响应式协议)
到目前为止,我们虽然接收到了数据,但没有做任何的响应。然而一个服务端通常会对一个请求作出响应。让我们学习怎样在ECHO协议的实现下编写一个响应消息给客户端,这个协议针对任何接收的数据都会返回一个响应。
和discard server唯一不同的是把在此之前我们实现的channelRead()方法,返回所有的数据替代打印接收数据到控制台上的逻辑。
说明NettyServer 还是用上面已经提供的类,只是把这段里的注销部分修改成如下。
package com.tjbsl.netty.demo2.
import io.netty.buffer.ByteB
import io.netty.channel.ChannelHandlerA
import io.netty.channel.ChannelHandlerC
import io.netty.util.CharsetU
* 服务端处理通道.
* ResponseServerHandler 继承自 ChannelHandlerAdapter,
* 这个类实现了ChannelHandler接口,
* ChannelHandler提供了许多事件处理的接口方法,
* 然后你可以覆盖这些方法。
* 现在仅仅只需要继承ChannelHandlerAdapter类而不是你自己去实现接口方法。
* 用来对请求响应
public class ResponseServerHandler extends ChannelHandlerAdapter {
* 这里我们覆盖了chanelRead()事件处理方法。
* 每当从客户端收到新的数据时,
* 这个方法会在收到消息时被调用,
*ChannelHandlerContext对象提供了许多操作,
* 使你能够触发各种各样的I/O事件和操作。
* 这里我们调用了write(Object)方法来逐字地把接受到的消息写入
* @param ctx 通道处理的上下文信息
* @param msg 接收的消息
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf)
System.out.println(in.toString(CharsetUtil.UTF_8));
ctx.write(msg);
//cxt.writeAndFlush(msg)
//请注意,这里我并不需要显式的释放,因为在进入的时候netty已经自动释放
// ReferenceCountUtil.release(msg);
* ctx.write(Object)方法不会使消息写入到通道上,
* 他被缓冲在了内部,你需要调用ctx.flush()方法来把缓冲区中数据强行输出。
* 或者你可以在channelRead方法中用更简洁的cxt.writeAndFlush(msg)以达到同样的目的
* @param ctx
* @throws Exception
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
* 这个方法会在发生异常时触发
* @param ctx
* @param cause
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
* 发生异常后,关闭连接
cause.printStackTrace();
ctx.close();
同样以上运行后,可以通过telnet发送数据,console里会打印出你发送的数据,同时你的命令行界面里应该也会接收到相同的数据。
3.TIME服务(时间协议的服务)
在这个部分被实现的协议是TIME协议。和之前的例子不同的是在不接受任何请求时他会发送一个含32位的整数的消息,并且一旦消息发送就会立即关闭连接。在这个例子中,你会学习到如何构建和发送一个消息,然后在完成时主动关闭连接。
因为我们将会忽略任何接收到的数据,而只是在连接被创建发送一个消息,所以这次我们不能使用channelRead()方法了,代替他的是,我们需要覆盖channelActive()方法,下面的就是实现的内容:
说明NettyServer 还是用上面已经提供的类,只是把这段里的注销部分修改成如下。
//ch.pipeline().addLast(new DiscardServerHandler());
//ch.pipeline().addLast(new ResponseServerHandler());
ch.pipeline().addLast(new TimeServerHandler());
TimeServerHandler类的如下:
package com.tjbsl.netty.demo3.
import io.netty.buffer.ByteB
import io.netty.channel.ChannelF
import io.netty.channel.ChannelFutureL
import io.netty.channel.ChannelHandlerA
import io.netty.channel.ChannelHandlerC
import io.netty.util.CharsetU
import java.util.S
public class TimeServerHandler extends ChannelHandlerAdapter {
* channelActive()方法将会在连接被建立并且准备进行通信时被调用。
* 因此让我们在这个方法里完成一个代表当前时间的32位整数消息的构建工作。
* @param ctx
public void channelActive(final ChannelHandlerContext ctx) {
/*Scanner cin=new Scanner(System.in);
System.out.println("请输入发送信息:");
String name=cin.nextLine();*/
String name="HelloWorld!";
* 为了发送一个新的消息,我们需要分配一个包含这个消息的新的缓冲。
* 因为我们需要写入一个32位的整数,因此我们需要一个至少有4个字节的ByteBuf。
* 通过ChannelHandlerContext.alloc()得到一个当前的ByteBufAllocator,
* 然后分配一个新的缓冲。
final ByteBuf time = ctx.alloc().buffer(4);
time.writeBytes(name.getBytes());
* 和往常一样我们需要编写一个构建好的消息
* 。但是等一等,flip在哪?难道我们使用NIO发送消息时不是调用java.nio.ByteBuffer.flip()吗?
* ByteBuf之所以没有这个方法因为有两个指针,
* 一个对应读操作一个对应写操作。
* 当你向ByteBuf里写入数据的时候写指针的索引就会增加,
* 同时读指针的索引没有变化。
* 读指针索引和写指针索引分别代表了消息的开始和结束。
* 比较起来,NIO缓冲并没有提供一种简洁的方式来计算出消息内容的开始和结尾,
* 除非你调用flip方法。
* 当你忘记调用flip方法而引起没有数据或者错误数据被发送时,
* 你会陷入困境。这样的一个错误不会发生在Netty上,
* 因为我们对于不同的操作类型有不同的指针。
* 你会发现这样的使用方法会让你过程变得更加的容易,
* 因为你已经习惯一种没有使用flip的方式。
* 另外一个点需要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法会返回一个ChannelFuture对象,
* 一个ChannelFuture代表了一个还没有发生的I/O操作。
* 这意味着任何一个请求操作都不会马上被执行,
* 因为在Netty里所有的操作都是异步的。
* 因此你需要在write()方法返回的ChannelFuture完成后调用close()方法,
* 然后当他的写操作已经完成他会通知他的监听者。
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
* 当一个写请求已经完成是如何通知到我们?
* 这个只需要简单地在返回的ChannelFuture上增加一个ChannelFutureListener。
* 这里我们构建了一个匿名的ChannelFutureListener类用来在操作完成时关闭Channel。
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
assert f ==
* 请注意,close()方法也可能不会立马关闭,他也会返回一个ChannelFuture。
ctx.close();
//接收结果
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf)
System.out.println("client:"+buf.toString(CharsetUtil.UTF_8));
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
4.Time客户端
不像DISCARD和ECHO的服务端,对于TIME协议我们需要一个客户端因为人们不能把一个32位的二进制数据翻译成一个日期或者日历。在这一部分,我们将会讨论如何确保服务端是正常工作的,并且学习怎样用Netty编写一个客户端。
在Netty中,编写服务端和客户端最大的并且唯一不同的使用了不同的BootStrap和Channel的实现。
package com.tjbsl.netty.demo3.time.
import io.netty.bootstrap.B
import io.netty.channel.ChannelF
import io.netty.channel.ChannelI
import io.netty.channel.ChannelO
import io.netty.channel.EventLoopG
import io.netty.channel.nio.NioEventLoopG
import io.netty.channel.socket.SocketC
import io.netty.channel.socket.nio.NioSocketC
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port =8000;
EventLoopGroup workerGroup = new NioEventLoopGroup();
* 如果你只指定了一个EventLoopGroup,
* 那他就会即作为一个‘boss’线程,
* 也会作为一个‘workder’线程,
* 尽管客户端不需要使用到‘boss’线程。
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
* 代替NioServerSocketChannel的是NioSocketChannel,这个类在客户端channel被创建时使用
b.channel(NioSocketChannel.class); // (3)
* 不像在使用ServerBootstrap时需要用childOption()方法,
* 因为客户端的SocketChannel没有父channel的概念。
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer&SocketChannel&() {
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
//用connect()方法代替了bind()方法
ChannelFuture f = b.connect(host, port).sync();
//等到运行结束,关闭
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
新建虚拟机 开始自定义硬件 移除声卡和打印机 虚拟机首页 开始安装操作系统 1选项:安装或者升级现有的系统 2选项:使用基本的显卡驱动安装系统 3选项:拯救模式安装系统 4选项:从本地磁盘启动 5选项:测试内存 检测你电脑的媒介是否正常 选择系统安装过程中所使用的语言 选择系统输入键盘的种类 选择硬盘驱动器的类型,“Basic Storage Device”(基础存储设备),“Speciallzed Storage Dvices”(特殊存储设备) 是否清空磁盘数据 设置主机名 选择时区 设置root用户的
本节主要内容: 1. 重要概念 2. Actor模型 3. Akka架构简介 多核处理器的出现使并发编程(Concurrent Programming)成为开发人员必备的一项技能,许多现代编程语言都致力于解决并发编程问题。并发编程虽然能够提高程序的性能,但传统并发编程的共享内存通信机制对开发人员的编程技能要求很高,需要开发人员通过自身的专业编程技能去避免死锁、互斥等待及竞争条件(Race Condition)等,熟悉Java语言并发编程的读者们对这些问题的理解会比较深刻,这些问题使得并发编程比顺序编程要困
目录 目录 前言 虚拟化 虚拟机Virtual Machine 虚拟化的分类 x86 CPU架构与虚拟化的关系 全虚拟化 Full virtualization 半虚拟化 Paravirtualization 硬件辅助虚拟化 HVM 内存虚拟化 内存虚拟化的映射实现 总线虚拟化 前言 现在市场上最常见的虚拟化软件有VMWare workstation(VMWare)、VirtualBox(Oracle)、Hyper-V(Microsoft)、KVM(Redhat)、Xen等,这些软件统称之为VMM(Vir
参考: 从源代码剖析Mahout推荐引擎
mahout 推荐系统示例
Mahout推荐算法API详解 使用Mahout实现协同过滤
Mahout的taste推荐系统里的几种Recommender分析 前言:Mahout框架集成了大量的常用的机器学习算法,且都支持在Hadoop分布式环境下运行,很大程度上节约了数据处理的时间成本,其中的推荐算法引擎有cf.taste包实现,它提供了一套完整的推荐算法工具库,同时规范了数据结构,并标准了程序开发过程。 1:Mahout推荐算
OpenStack入门 之 架构分析 写在前面 学习目标: 了解 OpenStack 各组件的逻辑关系; 了解 OpenStack 的各组件的通信和部署关系; 了解 OpenStack 的工作流程; 接下来我会掌握: OpenStack 组件间的逻辑关系; OpenStack 的API; OpenStack 组件间的通信关系; OpenStack 中几种不同的存储; OpenStack 工作流程; OpenStack 的部署架构; OpenStack 各组件之间的关系有:逻辑关系,通信关系,部署关系… 1
1. 概述 本文是对spark1.6.0分布式集群的安装的一个详细说明,旨在帮助相关人员按照本说明能够快速搭建并使用spark集群。
2. 安装环境 本安装说明的示例环境部署如下: IP 外网IP hostname 备注 10.47.110.38 120.27.153.137 iZqZ Master、Slaver 10.24.35.51 114.55.56.190 iZ23pd81xqaZ Slaver 10.45.53.136 114.55.11.55 iZ23mr5ukpzZ Sl
今天主要是收集了些拼写检查方面的资料和 尝试使用一下拼写检查的功能--=遇到了不少问题 拼写检查的四种配置目前我只算是成功了半个吧 --------------------------------- 拼写检查功能,能在搜索时,提供一个较好用户体验,所以,主流的搜索引擎都有这个功能。在这之前,笔者先简单的说一下什么是拼写检查,其实很好理解,就是你输入的搜索词,可能是你输错了,也有可能在它的检索库里面根本不存在这个词,但是这时候它能给你返回,相似或相近的结果来帮助你校正。 举个例子,假如你在百度里面输入在在线
最近,参与了公司的一个大数据接口平台的开发,具体的处理过程是这样的。我们公司负责数据的入库,也就是一个etl过程,使用MR将数据入到hive里面,然后同步到impala,然后此接口平台提供查询接口,前台会将sql语句以参数传过来,然后接口平台通过调用impala提供的java api接口,将数据查询出来返回给用户。另外,如果查询的数据量很大,那么前台就会传一个taskId过来,第一次只需将数据查询出来,入到impala临时表,下次再查便将数据返回。那么,如何记录此任务的状态变化呢,这里我们就使用到了hba
CDH的简单介绍 大家经常说CDH,其全称是:Cloudera’s Distribution Including Apache Hadoop,简单的说是Cloudera公司的Hadoop平台,是在Apache原生的Hadoop组件基础上进行了封装和加强。CDH里面有些什么东西呢?如下图: 那么这个CDH软件如何安装呢?Cloudera公司提供了一套安装CDH,管理、维护CDH各组件的一个软件,叫做Cloudera Manager(以下简称为CM)。CM本身是一种主从结构,由CM Server和CM age
Deep Visual-Semantic Alignments for Generating Image Descriptions Andrej Karpathy Li Fei-Fei 摘要 这篇文章的作者提出了一种方法,可以用于生成图像的自然语言描述。主要包含了两个部分(1)视觉语义的对齐模型;(2)为新图像生成文本描述的 Multimodal RNN 模型。 其中视觉语义的对齐模型主要由3部分组成: 应用于图像区域的卷积神经网络(Convolution Neural Networks)。 应用于语句的
Copyright (C)
ITfish.net}

我要回帖

更多关于 java netty websocket 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信