当先锋百科网

首页 1 2 3 4 5 6 7

在这里插入图片描述

问题背景

研究分布式锁,基于ZK实现,需要整合到SpringBoot使用

前言

  1. 参考自SpringBoot集成Curator实现Zookeeper基本操作Zookeeper入门
  2. 本篇的代码笔者有自己运行过,需要注意组件的版本号是否兼容,否则会有比较多的坑

实现

搭建Zookeeper容器

采用Docker compose快速搭建ZK容器,很快,几分钟就好了,而且是集群方式搭建。详情见笔者的Docker搭建zookeeper

引入依赖

需要注意的点:Curator 2.x.x-兼容两个zk 3.4.xzk 3.5.xCurator 3.x.x-兼容兼容zk 3.5,根据搭建的zk的版本使用对应的curator依赖。引入的zk依赖,如果项目中有使用logback日志 ,需要排除zk中的log4j12依赖,详情见下面笔者给出的依赖:

<dependencies>
  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>2.12.0</version>
  </dependency>

  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.12.0</version>
  </dependency>

  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.12.0</version>
  </dependency>

  <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.5.7</version>
      <exclusions>
          <exclusion>
              <artifactId>slf4j-log4j12</artifactId>
              <groupId>org.slf4j</groupId>
          </exclusion>
          <exclusion>
              <artifactId>slf4j-api</artifactId>
              <groupId>org.slf4j</groupId>
          </exclusion>
      </exclusions>
  </dependency>

ZK客户端的配置类

配置ZK的参数,使用@ConfigurationProperties可以令配置热更新,比如搭配Apollo、Nacos,如果使用@Valid则无法热更新,必须重启项目才能生效

@Component
@ConfigurationProperties(prefix = "curator")
@Data
public class ZKClientProps {

    private String connectString;
    private int retryCount;
    private int elapsedTimeMs;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
}

对应yml如下:

#curator配置
curator:
  connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
  retryCount: 1 # 重试次数
  elapsedTimeMs: 2000 # 重试间隔时间
  sessionTimeoutMs: 60000 # session超时时间
  connectionTimeoutMs: 10000 # 连接超时时间

ZK客户端的工厂类

定制ZK客户端:

@Component
public class ZKClientFactory {

    @Resource
    private ZKClientProps zkClientProps;
    public CuratorFramework createSimple() {
        //重试策略:第一次重试等待1S,第二次重试等待2S,第三次重试等待4s

        //第一个参数:等待时间的基础单位,单位为毫秒
        //第二个参数:最大重试次数
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(zkClientProps.getElapsedTimeMs(), zkClientProps.getRetryCount());

        //获取CuratorFramework示例的最简单方式
        //第一个参数:zk的连接地址
        //第二个参数:重试策略
        return CuratorFrameworkFactory.newClient(zkClientProps.getConnectString(), retry);
    }


    public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy,
                                                     int connectionTimeoutMs, int sessionTimeoutMs) {
        return CuratorFrameworkFactory.builder().connectString(connectionString)
                .retryPolicy(retryPolicy).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).build();
    }
}

注入bean

创建ZK的客户端,详情如下:

@Component
@Slf4j
public class ZKClient {

    @Resource
    private ZKClientFactory zkClientFactory;
    public static final ZKClient INSTANCE = new ZKClient();

    private ZKClient() {
    }

    public CuratorFramework getClient() {
        return zkClientFactory.createSimple();
    }

    public boolean isNodeExist(String path) {
        CuratorFramework client = getClient();
        try {
            client.start();
            Stat stat = client.checkExists().forPath(path);
            return stat != null;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
        return false;
    }

    public void createNode(String path, byte[] bytes) {
        CuratorFramework client = getClient();
        try {
            // 必须start,否则报错
            client.start();
            client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, bytes);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

    public void deleteNode(String path) {
        CuratorFramework client = getClient();
        try {
            client.start();
            client.delete().forPath(path);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

    public List<String> getChildren(String path) {
        List<String> result = new LinkedList<>();
        CuratorFramework client = getClient();
        try {
            client.start();
            result = client.getChildren().forPath(path);
        } catch (Exception e) {
            log.error("ZKClient getChildren error.");
        }
        return result;
    }

}

构建测试类

测试基类,设置激活环境

@Slf4j
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = GmallZookeeperApplication.class)
@ContextConfiguration
public class BaseTest {

}

创建节点、删除节点、获取节点信息、分布式锁的方法如下:@ActiveProfiles("company")是激活笔者一个application-company.yml文件

application.yml如下:

server:
  port: 8022

spring:
  profiles:
    active: home

application-compay.yml如下:

#curator配置
curator:
  connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
  retryCount: 1 # 重试次数
  elapsedTimeMs: 2000 # 重试间隔时间
  sessionTimeoutMs: 60000 # session超时时间
  connectionTimeoutMs: 10000 # 连接超时时间

创建节点、删除节点、获取节点信息、分布式锁的方法如下:

@Slf4j
@ActiveProfiles("company")
public class ZKClientTest extends BaseTest{

    @Resource
    private ZKClient zkClient;
    public static final int THREAD_NUM = 10;

    @Test
    public void distributedLock() throws InterruptedException, BrokenBarrierException {
        String lockPath = "/test/distributed2/lock";
        CuratorFramework client = zkClient.getClient();
        client.start();
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);

        // 阻塞主线程,等待全部子线程执行完
        CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM);

        for (int i = 0; i < THREAD_NUM; i++) {
            new Thread(() -> {
                log.info("{}->尝试竞争锁", Thread.currentThread().getName());
                try {
                    lock.acquire(); // 阻塞竞争锁

                    log.info("{}->成功获得锁", Thread.currentThread().getName());
                    Thread.sleep(2000);

                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        lock.release(); //释放锁
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }, "Thread-" + i).start();
        }

        // 目的是为了等子线程抢完锁再结束子线程,否则无法看到日志效果
        cyclicBarrier.await();
        log.info("全部子线程已执行完毕");
    }

    @Test
    public void createNode() {
        // 创建一个ZNode节点
        String data = "hello";
        byte[] payload = data.getBytes(StandardCharsets.UTF_8);
        String zkPath = "/test/CRUD/node-1";

        zkClient.createNode(zkPath, payload);
        log.info("createNode succeeded!");

    }

    @Test
    public void getChildren() {
        String zkPath = "/test/CRUD";

        List<String> children = zkClient.getChildren(zkPath);
        printList(children);
    }

    @Test
    public void deleteNode() {
        String parentPath = "/test";

        log.info("======================Before delete===================");
        List<String> before = zkClient.getChildren(parentPath);
        printList(before);

        String zkPath = "/test/CRUD/node-1";
        zkClient.deleteNode(zkPath);
        log.info("delete node secceeded!");

        log.info("======================After delete===================");
        List<String> after = zkClient.getChildren(parentPath);
        printList(after);
    }

    private void printList(List<String> data) {
        if (!CollectionUtils.isEmpty(data)) {
            for (String datum : data) {
                log.info("datum:{}", data);
            }
        }
    }
}