分布式锁功能:
在分布式场景中,我们为了保证数据的一致性,经常在程序运行的某一个点,需要进行同步操作,(java提供synchronized或者Reentrantlock实现),
使用curator基于zookeeper的特性提供的分布式锁来处理分布式场景的数据一致性,zookeeper本身的分布式是有写问题的,这里推荐使用Curator框架的
InterProcessMutex来实现。
package bjsxt.curator.lock; import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry; public class Lock2 { /** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.2.2:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;// ms static int count = 10; public static void genarNo() {
try {
count--;
System.out.println(count);
} finally { }
} public static void main(String[] args) throws Exception { // 1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
// 2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
// .namespace("super")
.build();
// 3 开启连接
cf.start(); // 4 分布式锁
final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
// final ReentrantLock reentrantLock = new ReentrantLock();
final CountDownLatch countdown = new CountDownLatch(1); for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
countdown.await();
// 加锁
lock.acquire();
// reentrantLock.lock();
// -------------业务处理开始
// genarNo();
SimpleDateFormat sdf = new SimpleDateFormat(
"HH:mm:ss|SSS");
System.out.println(sdf.format(new Date()));
// System.out.println(System.currentTimeMillis());
// -------------业务处理结束
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 释放
lock.release();
// reentrantLock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, "t" + i).start();
}
Thread.sleep(100);
countdown.countDown(); }
}
分布式计数器功能
分布式计数器,在单JVM中,我们可以通过AtomicInteger这种经典的方式实现,但是在分布式的场景下,就需要利用Curator框架的DistributedAtomicInteger来实现
package bjsxt.curator.atomicinteger; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes; public class CuratorAtomicInteger { /** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.2.2:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;// ms public static void main(String[] args) throws Exception { // 1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
// 2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy).build();
// 3 开启连接
cf.start();
// cf.delete().forPath("/super"); // 4 使用DistributedAtomicInteger
DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(
cf, "/super", new RetryNTimes(3, 1000)); AtomicValue<Integer> value = atomicIntger.add(1);
System.out.println(value.succeeded());
System.out.println(value.postValue()); // 最新值
System.out.println(value.preValue()); // 原始值 }
}
Curator框架,让一些很困难的问题,简单化了