12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- package cn.hhj.rpcsend;
- import cn.hhj.balance.LoadBalanceStrategy;
- import cn.hhj.balance.RandomLoadBalance;
- import cn.hhj.request.RpcRequest;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.serialization.ClassResolvers;
- import io.netty.handler.codec.serialization.ObjectDecoder;
- import io.netty.handler.codec.serialization.ObjectEncoder;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
- public class RpcNetTransport extends SimpleChannelInboundHandler<Object> {
- private List<String> serviceAddress;
- private Object result;
- private NioEventLoopGroup bootStrap = new NioEventLoopGroup();
- private Bootstrap bootstrap = new Bootstrap();
- public RpcNetTransport(List<String> serviceAddress) {
- this.serviceAddress = serviceAddress;
- }
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- this.result = msg;
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- System.out.println("客户端发送消息到服务端时产生异常");
- ctx.close();
- }
- public Object send(RpcRequest request) {
- bootstrap.group(bootStrap).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().
- addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).
- addLast(new ObjectEncoder()).
- addLast(RpcNetTransport.this);
- }
- }).option(ChannelOption.TCP_NODELAY, true);
- connect(request);
- return result;
- }
- private String[] getUrl() {
- LoadBalanceStrategy loadBalanceStrategy = new RandomLoadBalance();
- return loadBalanceStrategy.selectHost(serviceAddress).split(":");
- }
- private void connect(RpcRequest request) {
- try{
- String[] urls = getUrl();
- ChannelFuture future = bootstrap.connect(urls[0], Integer.parseInt(urls[1])).sync();
- future.addListener((ChannelFutureListener) ch -> {
- if (!ch.isSuccess()) {
- ch.channel().eventLoop().schedule(() -> connect(request), 3L, TimeUnit.SECONDS);
- }
- });
- future.channel().writeAndFlush(request).sync();
- if (request != null) {
- future.channel().closeFuture().sync();
- }
- }catch (Exception e){ e.printStackTrace(); }
- }
- }
|