package cn.hhj.rpcsend; import cn.hhj.balance.LoadBalanceStrategy; import cn.hhj.balance.RandomLoadBalance; import cn.hhj.request.RpcRequest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import java.util.List; import java.util.concurrent.TimeUnit; public class RpcNetTransport extends SimpleChannelInboundHandler { private List serviceAddress; private Object result; private NioEventLoopGroup bootStrap = new NioEventLoopGroup(); private Bootstrap bootstrap = new Bootstrap(); public RpcNetTransport(List serviceAddress) { this.serviceAddress = serviceAddress; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { this.result = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("客户端发送消息到服务端时产生异常"); ctx.close(); } public Object send(RpcRequest request) { bootstrap.group(bootStrap).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline(). addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))). addLast(new ObjectEncoder()). addLast(RpcNetTransport.this); } }).option(ChannelOption.TCP_NODELAY, true); connect(request); return result; } private String[] getUrl() { LoadBalanceStrategy loadBalanceStrategy = new RandomLoadBalance(); return loadBalanceStrategy.selectHost(serviceAddress).split(":"); } private void connect(RpcRequest request) { try{ String[] urls = getUrl(); ChannelFuture future = bootstrap.connect(urls[0], Integer.parseInt(urls[1])).sync(); future.addListener((ChannelFutureListener) ch -> { if (!ch.isSuccess()) { ch.channel().eventLoop().schedule(() -> connect(request), 3L, TimeUnit.SECONDS); } }); future.channel().writeAndFlush(request).sync(); if (request != null) { future.channel().closeFuture().sync(); } }catch (Exception e){ e.printStackTrace(); } } }