ServiceDiscoveryWithZk.java 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package cn.hhj.disovery.impl;
  2. import cn.hhj.balance.LoadBalanceStrategy;
  3. import cn.hhj.balance.RandomLoadBalance;
  4. import cn.hhj.config.ZkConfig;
  5. import cn.hhj.disovery.IServiceDiscovery;
  6. import org.apache.curator.framework.CuratorFramework;
  7. import org.apache.curator.framework.CuratorFrameworkFactory;
  8. import org.apache.curator.framework.recipes.cache.PathChildrenCache;
  9. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
  10. import org.apache.curator.retry.ExponentialBackoffRetry;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. public class ServiceDiscoveryWithZk implements IServiceDiscovery {
  14. private CuratorFramework curatorFramework =null;
  15. List<String> serviceRepos=new ArrayList<>(); //服务地址的本地缓存
  16. {
  17. //初始化zookeeper的连接, 会话超时时间是5s,衰减重试
  18. curatorFramework = CuratorFrameworkFactory.builder().
  19. connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
  20. retryPolicy(new ExponentialBackoffRetry(1000, 3)).
  21. namespace("registry")
  22. .build();
  23. curatorFramework.start();
  24. }
  25. @Override
  26. public List<String> discovery(String serviceName) {
  27. String path="/"+serviceName;
  28. if(serviceRepos.isEmpty()) {
  29. try {
  30. serviceRepos = curatorFramework.getChildren().forPath(path);
  31. registryWatch(path);
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. return serviceRepos;
  37. }
  38. private void registryWatch(String path) throws Exception {
  39. PathChildrenCache nodeCache=new PathChildrenCache(curatorFramework,path,true);
  40. PathChildrenCacheListener cacheListener=(curatorFramework1, pathChildrenCacheEvent)->{
  41. System.out.println("客户端收到变更节点的事件");
  42. serviceRepos=curatorFramework1.getChildren().forPath(path);
  43. };
  44. nodeCache.getListenable().addListener(cacheListener);
  45. nodeCache.start();
  46. }
  47. }