当先锋百科网

首页 1 2 3 4 5 6 7

是在zK分布式锁的基础上优化,之前是所有线程是对同一个节点进行监视,只要释放锁,就会通知其他所有线程。高并发容易出现资源竞争激烈情况。优化是在一个线程只关心自己的上一个节点锁,释放锁的时候只会通知一个线程取获取锁,这样减少了资源的竞争。

如果想看上一篇博客,可点击。这2篇博客都是写ZK分布式锁的,之间没有依赖关系,这边只是对上一篇的另一种实现,在性能上有了一定的提升,如果想学习一下分布式锁的思想可都看一下,必定会为你有一个很大的提升。

废话不多说了,直接上源码实现:

1、Maven项目的依赖包

<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.10</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.1.2</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.1.2</version>
		</dependency>
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.9</version>
		</dependency>

2、分布式锁的接口定义

//分布式锁定义
public interface IDistriLock {
	public void Lock();
	public void UnLock();
}

3、分布式锁的抽象类定义

package com.hongying.distrilock;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * ZK分布式锁实现
 * 
 * @author aaa 在永久节点下创建临时顺序节点,判断临时顺序节点是否最小的节点,是:获取锁;
 *         否:对上一个节点进行监听,上一个节点删除(释放锁),当前节点则获取锁。
 */
public abstract class ZKDisAbstracLock implements IDistriLock {
	Logger log=LoggerFactory.getLogger(ZKDisAbstracLock.class);
	String zkServers = "127.0.0.1:2181";
	int connectionTimeout = 30000;
	int sessionTimeout=30000;
	String DistrickPath = "/DistrictLock";// 分布式锁节点
	String SubDistrictPath = "/DistrictLock/SubDistrictLock";// 分布式锁子节点锁顺序节点
	ZkClient zk = new ZkClient(zkServers, sessionTimeout,connectionTimeout);
	Object lockObj=new Object();//创建父节点时的锁对象
	String currentThreadPath;//当前线程创建的临时节点Path
	public ZKDisAbstracLock() {
		// 未创建父分布式节点则创建
		synchronized (lockObj) {
			if (!zk.exists(DistrickPath)) {
				zk.createPersistent(DistrickPath);
			}
		}
	}
	@Override
	public void Lock() {
		if (TryLock()) {
			log.info("线程:"+Thread.currentThread().getName()+",获取分布式锁成功..");
		} else {
			//线程等待释放锁之后重新获取锁
			WaitLock();
			Lock();
		}
	}
	@Override
	public void UnLock() {
		//直接关闭ZK连接,ZK并不能立马释放临时节点,为了提高性能,手动删除临时节点
		if(currentThreadPath!=null){
			zk.delete(currentThreadPath);
		}
		zk.close();//关闭ZK则临时节点自动释放锁
		log.info("线程:"+Thread.currentThread().getName()+",关闭ZK连接");
	}
	//试着获取锁,异常则获取锁失败
	public abstract boolean TryLock();
	//线程等待释放锁
	public abstract void WaitLock();
}

4、ZK分布式锁的实现

package com.hongying.distrilock;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.I0Itec.zkclient.IZkDataListener;
public class ZKDistriLockImp extends ZKDisAbstracLock {
	CountDownLatch latch=null;
	@Override
	public boolean TryLock() {
		try{
			if(currentThreadPath==null){
				//未创建临时顺序节点则创建
				String seqNode=zk.createEphemeralSequential(SubDistrictPath, "1");
				log.info("线程:"+Thread.currentThread().getName()+",创建的节点:"+seqNode);
				currentThreadPath=seqNode;
			}
			
			//判断当前节点名称是否最小节点,是则获取锁,否则对上一个节点添加监听后等待锁释放
			List<String> subNodes=zk.getChildren(DistrickPath);
			subNodes.sort(new Comparator<String>(){
				@Override
				public int compare(String o1, String o2) {
					if(o1.isEmpty()||o2.isEmpty()){
						return -1;
					}if(o1.length() > o2.length()){
	                    return 1;
	                }
	                if(o1.length() < o2.length()){
	                    return -1;
	                }
	                if(o1.compareTo(o2) > 0){
	                    return 1;
	                }
	                if(o1.compareTo(o2) < 0){
	                    return -1;
	                }
	                if(o1.compareTo(o2) == 0){
	                    return 0;
	                }
					return 0;
				}
			});
			//排序后的节点打印出来
//			for(String n:subNodes){
//				log.info("线程:"+Thread.currentThread().getName()+",排序后节点:"+n);
//			}
			//判断当前节点是否最小节点
			String prevNode="";//上一个节点
			boolean isMinNode=false;
			for(int i=0;i<subNodes.size();i++){
				if(currentThreadPath.equals(DistrickPath+"/"+subNodes.get(0))){
					//第一个就是这个节点表示最小,直接返回
					isMinNode=true;
					break;
				}else{
					//不是最小节点找到上一个节点,作为监听的节点
					if(currentThreadPath.equals(DistrickPath+"/"+subNodes.get(i))){
						prevNode=DistrickPath+"/"+subNodes.get(i-1);
					}
				}
			}
			if(isMinNode){
				log.info("线程:"+Thread.currentThread().getName()+",成功获取锁..");
				return true;
			}
			else{
				//注意:这里判断节点是否存在非常重要,在并发下,当获取上一个要监视的节点,如果在添加监视的同时,删除了这个节点。
				//这个节点本身删除了,这个通知永远不会收到通知,导致后续对这个节点的监视永远不会触发锁释放
				if(!zk.exists(prevNode)){
					//当前节点不存在了则重新获取要监视的节点
				    return 	TryLock();
				}
				latch=new CountDownLatch(1);//如果当前线程未获取锁,则需要休眠
				log.info("线程:"+Thread.currentThread().getName()+",对节点:"+prevNode+",进行监视");
				//对当前的上一个节点进行监听
				zk.subscribeDataChanges(prevNode, new IZkDataListener() {
					@Override
					public void handleDataDeleted(String dataPath) throws Exception {
						log.info("线程:"+Thread.currentThread().getName()+",节点已释放,下个节点可获取锁...");
						latch.countDown();
					}
					@Override
					public void handleDataChange(String dataPath, Object data) throws Exception {
						int i=10;
					}
				});
				return false;
			}
		}
		catch(Exception e){
			e.printStackTrace();
			log.info("试着获取锁异常:",e.getMessage());
			//如果发生异常重新试着获取锁
			return TryLock();
		}
	}
	@Override
	public void WaitLock() {
		try {
			log.info("线程:"+Thread.currentThread().getName()+ ",已经休眠...");
			if(latch!=null){
				latch.await();//线程等待,直到监视的节点删除了,释放锁时候
			}
			log.info("线程:"+Thread.currentThread().getName()+",被唤醒..");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

5、分布式锁的测试案例

package com.hongying.distrilock;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OrderServiceTest implements Runnable  {
	private static Logger logger = LoggerFactory.getLogger(OrderServiceTest.class);
    private static int count = 700;//并发线程数量。缺点:线程过多或业务执行时间长,连接ZK Session超时。
    private static CountDownLatch cdl = new CountDownLatch(count);
    IDistriLock lock = new ZKDistriLockImp();
    public void run() {
    	//创建订单
        createOrderNum();
    }
    
    public void createOrderNum() {
        lock.Lock();
        String orderNum = OrderFactory.GetOrder();
        logger.info(Thread.currentThread().getName() + "创建了订单号:【" + orderNum
                + "】!");
        try {
			Thread.sleep(15);//时间长会导致大部分线程都休眠,但是都会连接着ZK,导致连接Session过长失去ZK连接,爆出异常
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
        lock.UnLock();
    }
    
    public static void main(String[] args) {
        for (int i = 0; i < count; i++) {
            new Thread(new OrderServiceTest()).start();
            //发令枪里面的数字减一
            cdl.countDown();
        }
    }
}

6、总结

以上是自己对ZK分布式锁的另外一种优化,个人采用以上的简单并发测试没有任何问题。但是从代码的逻辑来看当并发量上1000后,业务逻辑复杂,每个线程对锁的占用时间增长,ZK的连接数量急剧上升,会占用大量的ZK连接,会出现ZK连接不上等异常信息出现,这个就需要进一步优化对ZK的连接了。

小弟不才,以上只是自己对ZK的理解并实现。如有任何分歧或问题都可以留言,进一步学习提升。