Mittwoch, 12. März 2014

Netty TCP client with reconnect handling

There are situations where it makes sense to have a netty client doing a reconnect in case of a lost connection. There are 2 states where we need to do a reconnect:

  • The client gets no connection at startup
  • The client lost the connection during processing
Getting both situations handled we need to implement a ChannelFutureListener which handles the reconnect during the bootstrap phase and of course a ChannelHandler which we can use to get informed if the channel gets inactive.
We starting with a small bootstraping client:

 public class Client  
 {  
   private EventLoopGroup loop = new NioEventLoopGroup();  
   public static void main( String[] args )  
   {  
     new Client().run();  
   }  
   public Bootstrap createBootstrap(Bootstrap bootstrap, EventLoopGroup eventLoop) {  
     if (bootstrap != null) {  
       final MyInboundHandler handler = new MyInboundHandler(this);  
       bootstrap.group(eventLoop);  
       bootstrap.channel(NioSocketChannel.class);  
       bootstrap.option(ChannelOption.SO_KEEPALIVE, true);  
       bootstrap.handler(new ChannelInitializer<SocketChannel>() {  
         @Override  
         protected void initChannel(SocketChannel socketChannel) throws Exception {  
           socketChannel.pipeline().addLast(handler);  
         }  
       });  
       bootstrap.remoteAddress("localhost", 8888);
       bootstrap.connect().addListener(new ConnectionListener(this)); 
     }  
     return bootstrap;  
   }  
   public void run() {  
     createBootstrap(new Bootstrap(), loop);
   }  
 } 
The createBootstrap method is public because we need it to be called from within the reconnect handler.

The following shows the ConnectionListener which is responsible for reconnecting during connect invocation. If the connect fails we schedule a reconnect within the eventloop of the channel which means basically create the bootstrap again and try another connect.

 public class ConnectionListener implements ChannelFutureListener {  
   private Client client;  
   public ConnectionListener(Client client) {  
     this.client = client;  
   }  
   @Override  
   public void operationComplete(ChannelFuture channelFuture) throws Exception {  
     if (!channelFuture.isSuccess()) {  
       System.out.println("Reconnect");  
       final EventLoop loop = channelFuture.channel().eventLoop();  
       loop.schedule(new Runnable() {  
         @Override  
         public void run() {  
           client.createBootstrap(new Bootstrap(), loop);  
         }  
       }, 1L, TimeUnit.SECONDS);  
     }  
   }  
 }  

The same as we do in the connection listener is done in the ChannelHandler below. If we lost the connection to the server we schedule a reconnect from within the handler which is also basically just create a new bootstrap and try to connect again.

 public class MyInboundHandler extends SimpleChannelInboundHandler {  
   private Client client;  
   public MyInboundHandler(Client client) {  
     this.client = client;  
   }  
   @Override  
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
     final EventLoop eventLoop = ctx.channel().eventLoop();  
     eventLoop.schedule(new Runnable() {  
       @Override  
       public void run() {  
         client.createBootstrap(new Bootstrap(), eventLoop);  
       }  
     }, 1L, TimeUnit.SECONDS);  
     super.channelInactive(ctx);  
   }  
 }  

This is all to get a netty client with reconnection handling at least for me. I don't know if it is the right way to do it but it works at least for me. You are welcome to post better solutions or maybe the right solution or just some corrections. 

Kommentare:

  1. Hello There,

    You just saved my day with this solution. Worked perfectly for me :)

    Thank you very much.

    AntwortenLöschen
  2. A more standard way of achieving this would be using EventLoop.schedule(), see e.g. https://github.com/netty/netty/blob/master/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java

    AntwortenLöschen
    Antworten
    1. Is this not what I did in my handler? What do you mean is wrong here?

      Löschen
  3. just out of curiosity, will this create too much thread if many failures happens in a short amount of time?

    AntwortenLöschen
  4. No. You just schedule a new runnable with the existing event loop.

    AntwortenLöschen
  5. This worded great for me. I ported the code to scala at:

    https://github.com/trex-paxos/trex/blob/netty0.1/netty/src/main/scala/com/github/trex_paxos/netty/Client.scala

    I added a send method to transmit and the ability to pass in an optional handler to handle the response. A simple test driver is at:

    https://github.com/trex-paxos/trex/blob/netty0.1/netty/src/it/scala/com/github/trex_paxos/netty/TestClient.scala

    Many thanks!

    AntwortenLöschen