RpcNetTransport.java 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package cn.hhj.rpcsend;
  2. import cn.hhj.balance.LoadBalanceStrategy;
  3. import cn.hhj.balance.RandomLoadBalance;
  4. import cn.hhj.request.RpcRequest;
  5. import io.netty.bootstrap.Bootstrap;
  6. import io.netty.channel.*;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.serialization.ClassResolvers;
  11. import io.netty.handler.codec.serialization.ObjectDecoder;
  12. import io.netty.handler.codec.serialization.ObjectEncoder;
  13. import java.util.List;
  14. import java.util.concurrent.TimeUnit;
  15. public class RpcNetTransport extends SimpleChannelInboundHandler<Object> {
  16. private List<String> serviceAddress;
  17. private Object result;
  18. private NioEventLoopGroup bootStrap = new NioEventLoopGroup();
  19. private Bootstrap bootstrap = new Bootstrap();
  20. public RpcNetTransport(List<String> serviceAddress) {
  21. this.serviceAddress = serviceAddress;
  22. }
  23. @Override
  24. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  25. this.result = msg;
  26. }
  27. @Override
  28. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  29. System.out.println("客户端发送消息到服务端时产生异常");
  30. ctx.close();
  31. }
  32. public Object send(RpcRequest request) {
  33. bootstrap.group(bootStrap).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
  34. @Override
  35. protected void initChannel(SocketChannel socketChannel) throws Exception {
  36. socketChannel.pipeline().
  37. addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).
  38. addLast(new ObjectEncoder()).
  39. addLast(RpcNetTransport.this);
  40. }
  41. }).option(ChannelOption.TCP_NODELAY, true);
  42. connect(request);
  43. return result;
  44. }
  45. private String[] getUrl() {
  46. LoadBalanceStrategy loadBalanceStrategy = new RandomLoadBalance();
  47. return loadBalanceStrategy.selectHost(serviceAddress).split(":");
  48. }
  49. private void connect(RpcRequest request) {
  50. try{
  51. String[] urls = getUrl();
  52. ChannelFuture future = bootstrap.connect(urls[0], Integer.parseInt(urls[1])).sync();
  53. future.addListener((ChannelFutureListener) ch -> {
  54. if (!ch.isSuccess()) {
  55. ch.channel().eventLoop().schedule(() -> connect(request), 3L, TimeUnit.SECONDS);
  56. }
  57. });
  58. future.channel().writeAndFlush(request).sync();
  59. if (request != null) {
  60. future.channel().closeFuture().sync();
  61. }
  62. }catch (Exception e){ e.printStackTrace(); }
  63. }
  64. }