问题背景
研究分布式锁,基于ZK实现,需要整合到SpringBoot使用
前言
- 参考自SpringBoot集成Curator实现Zookeeper基本操作,Zookeeper入门
- 本篇的代码笔者有自己运行过,需要注意组件的版本号是否兼容,否则会有比较多的坑
实现
搭建Zookeeper容器
采用Docker compose快速搭建ZK容器,很快,几分钟就好了,而且是集群方式搭建。详情见笔者的Docker搭建zookeeper
引入依赖
需要注意的点:
Curator 2.x.x-
兼容两个zk 3.4.x
和zk 3.5.x
,Curator 3.x.x
-兼容兼容zk 3.5
,根据搭建的zk的版本使用对应的curator依赖。引入的zk依赖,如果项目中有使用logback
日志 ,需要排除zk中的log4j12
依赖,详情见下面笔者给出的依赖:
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
ZK客户端的配置类
配置ZK的参数,使用
@ConfigurationProperties
可以令配置热更新,比如搭配Apollo、Nacos,如果使用@Valid
则无法热更新,必须重启项目才能生效
@Component
@ConfigurationProperties(prefix = "curator")
@Data
public class ZKClientProps {
private String connectString;
private int retryCount;
private int elapsedTimeMs;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
}
对应yml如下:
#curator配置
curator:
connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
retryCount: 1 # 重试次数
elapsedTimeMs: 2000 # 重试间隔时间
sessionTimeoutMs: 60000 # session超时时间
connectionTimeoutMs: 10000 # 连接超时时间
ZK客户端的工厂类
定制ZK客户端:
@Component
public class ZKClientFactory {
@Resource
private ZKClientProps zkClientProps;
public CuratorFramework createSimple() {
//重试策略:第一次重试等待1S,第二次重试等待2S,第三次重试等待4s
//第一个参数:等待时间的基础单位,单位为毫秒
//第二个参数:最大重试次数
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(zkClientProps.getElapsedTimeMs(), zkClientProps.getRetryCount());
//获取CuratorFramework示例的最简单方式
//第一个参数:zk的连接地址
//第二个参数:重试策略
return CuratorFrameworkFactory.newClient(zkClientProps.getConnectString(), retry);
}
public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy,
int connectionTimeoutMs, int sessionTimeoutMs) {
return CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).build();
}
}
注入bean
创建ZK的客户端,详情如下:
@Component
@Slf4j
public class ZKClient {
@Resource
private ZKClientFactory zkClientFactory;
public static final ZKClient INSTANCE = new ZKClient();
private ZKClient() {
}
public CuratorFramework getClient() {
return zkClientFactory.createSimple();
}
public boolean isNodeExist(String path) {
CuratorFramework client = getClient();
try {
client.start();
Stat stat = client.checkExists().forPath(path);
return stat != null;
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return false;
}
public void createNode(String path, byte[] bytes) {
CuratorFramework client = getClient();
try {
// 必须start,否则报错
client.start();
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, bytes);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}
public void deleteNode(String path) {
CuratorFramework client = getClient();
try {
client.start();
client.delete().forPath(path);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}
public List<String> getChildren(String path) {
List<String> result = new LinkedList<>();
CuratorFramework client = getClient();
try {
client.start();
result = client.getChildren().forPath(path);
} catch (Exception e) {
log.error("ZKClient getChildren error.");
}
return result;
}
}
构建测试类
测试基类,设置激活环境
@Slf4j
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = GmallZookeeperApplication.class)
@ContextConfiguration
public class BaseTest {
}
创建节点、删除节点、获取节点信息、分布式锁的方法如下:
@ActiveProfiles("company")
是激活笔者一个application-company.yml
文件
application.yml如下:
server:
port: 8022
spring:
profiles:
active: home
application-compay.yml如下:
#curator配置
curator:
connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
retryCount: 1 # 重试次数
elapsedTimeMs: 2000 # 重试间隔时间
sessionTimeoutMs: 60000 # session超时时间
connectionTimeoutMs: 10000 # 连接超时时间
创建节点、删除节点、获取节点信息、分布式锁的方法如下:
@Slf4j
@ActiveProfiles("company")
public class ZKClientTest extends BaseTest{
@Resource
private ZKClient zkClient;
public static final int THREAD_NUM = 10;
@Test
public void distributedLock() throws InterruptedException, BrokenBarrierException {
String lockPath = "/test/distributed2/lock";
CuratorFramework client = zkClient.getClient();
client.start();
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
// 阻塞主线程,等待全部子线程执行完
CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM);
for (int i = 0; i < THREAD_NUM; i++) {
new Thread(() -> {
log.info("{}->尝试竞争锁", Thread.currentThread().getName());
try {
lock.acquire(); // 阻塞竞争锁
log.info("{}->成功获得锁", Thread.currentThread().getName());
Thread.sleep(2000);
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release(); //释放锁
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-" + i).start();
}
// 目的是为了等子线程抢完锁再结束子线程,否则无法看到日志效果
cyclicBarrier.await();
log.info("全部子线程已执行完毕");
}
@Test
public void createNode() {
// 创建一个ZNode节点
String data = "hello";
byte[] payload = data.getBytes(StandardCharsets.UTF_8);
String zkPath = "/test/CRUD/node-1";
zkClient.createNode(zkPath, payload);
log.info("createNode succeeded!");
}
@Test
public void getChildren() {
String zkPath = "/test/CRUD";
List<String> children = zkClient.getChildren(zkPath);
printList(children);
}
@Test
public void deleteNode() {
String parentPath = "/test";
log.info("======================Before delete===================");
List<String> before = zkClient.getChildren(parentPath);
printList(before);
String zkPath = "/test/CRUD/node-1";
zkClient.deleteNode(zkPath);
log.info("delete node secceeded!");
log.info("======================After delete===================");
List<String> after = zkClient.getChildren(parentPath);
printList(after);
}
private void printList(List<String> data) {
if (!CollectionUtils.isEmpty(data)) {
for (String datum : data) {
log.info("datum:{}", data);
}
}
}
}