查看: 281|回复: 0
打印 上一主题 下一主题

[大数据] 基于zookeeper实现Java分布式锁

[复制链接]
  • TA的每日心情
    开心
    2017-7-5 20:14
  • 签到天数: 4 天

    [LV.2]偶尔看看I

    1698

    主题

    1707

    帖子

    7350

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    7350
    跳转到指定楼层
    楼主
    发表于 2019-7-30 19:42:34 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

    一 安装zookeeper
    http://www.zydgbbs.com/thread-2828-1-1.html
    实现第一部分即可

    二 引入依赖

    1. ? ?
    2. ? ?? ?org.apache.zookeeper
    3. ? ?? ?zookeeper
    4. ? ?? ?3.4.14
    5. ? ?? ?pom
    6. ? ?
    复制代码

    三 售票操作类
    TicketOperation.java
    1. import org.apache.zookeeper.*;
    2. import org.apache.zookeeper.data.Stat;

    3. import java.io.IOException;
    4. import java.util.Collections;
    5. import java.util.List;

    6. public class TicketOperation {

    7. ? ? private ZooKeeper zkClient;
    8. ? ? private static final String LOCK_ROOT_PATH = "/Locks";
    9. ? ? private static final String LOCK_NODE_NAME = "Lock_";
    10. ? ? private String lockPath;

    11. ? ? // 监控lockPath的前一个节点的watcher
    12. ? ? private Watcher watcher = new Watcher() {
    13. ? ?? ???@Override
    14. ? ?? ???public void process(WatchedEvent event) {
    15. ? ?? ?? ?? ?System.out.println(event.getPath() + " 前锁释放");
    16. ? ?? ?? ?? ?synchronized (this) {
    17. ? ?? ?? ?? ?? ? notifyAll();
    18. ? ?? ?? ?? ?}

    19. ? ?? ???}
    20. ? ? };

    21. ? ? public TicketOperation() throws IOException {
    22. ? ?? ???zkClient= new ZooKeeper("master:2181", 10000, new Watcher() {
    23. ? ?? ?? ?? ?@Override
    24. ? ?? ?? ?? ?public void process(WatchedEvent event) {
    25. ? ?? ?? ?? ?? ? if(event.getState()== Event.KeeperState.Disconnected){
    26. ? ?? ?? ?? ?? ?? ???System.out.println("失去连接");

    27. ? ?? ?? ?? ?? ? }
    28. ? ?? ?? ?? ?}
    29. ? ?? ???});
    30. ? ? }

    31. ? ? //获取锁的原语实现.
    32. ? ? public??void acquireLock() throws InterruptedException, KeeperException {
    33. ? ?? ???//创建锁节点
    34. ? ?? ???createLock();
    35. ? ?? ???//尝试获取锁
    36. ? ?? ???attemptLock();
    37. ? ? }

    38. ? ? //创建锁的原语实现。在lock节点下创建该线程的锁节点
    39. ? ? private void createLock() throws KeeperException, InterruptedException {
    40. ? ?? ???//如果根节点不存在,则创建根节点
    41. ? ?? ???Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
    42. ? ?? ???if (stat == null) {
    43. ? ?? ?? ?? ?zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    44. ? ?? ???}

    45. ? ?? ???// 创建EPHEMERAL_SEQUENTIAL类型节点
    46. ? ?? ???String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
    47. ? ?? ?? ?? ?? ? Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
    48. ? ?? ?? ?? ?? ? CreateMode.EPHEMERAL_SEQUENTIAL);
    49. ? ?? ???System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath);
    50. ? ?? ???this.lockPath=lockPath;
    51. ? ? }

    52. ? ? private void attemptLock() throws KeeperException, InterruptedException {
    53. ? ?? ???// 获取Lock所有子节点,按照节点序号排序
    54. ? ?? ???List lockPaths = null;

    55. ? ?? ???lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);

    56. ? ?? ???Collections.sort(lockPaths);

    57. ? ?? ???int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));

    58. ? ?? ???// 如果lockPath是序号最小的节点,则获取锁
    59. ? ?? ???if (index == 0) {
    60. ? ?? ?? ?? ?System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath);
    61. ? ?? ?? ?? ?return ;
    62. ? ?? ???} else {
    63. ? ?? ?? ?? ?// lockPath不是序号最小的节点,监控前一个节点
    64. ? ?? ?? ?? ?String preLockPath = lockPaths.get(index - 1);

    65. ? ?? ?? ?? ?Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

    66. ? ?? ?? ?? ?// 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁
    67. ? ?? ?? ?? ?if (stat == null) {
    68. ? ?? ?? ?? ?? ? attemptLock();
    69. ? ?? ?? ?? ?} else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock
    70. ? ?? ?? ?? ?? ? System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath);
    71. ? ?? ?? ?? ?? ? synchronized (watcher) {
    72. ? ?? ?? ?? ?? ?? ???watcher.wait();
    73. ? ?? ?? ?? ?? ? }
    74. ? ?? ?? ?? ?? ? attemptLock();
    75. ? ?? ?? ?? ?}
    76. ? ?? ???}
    77. ? ? }

    78. ? ? //释放锁的原语实现
    79. ? ? public void releaseLock() throws KeeperException, InterruptedException {
    80. ? ?? ???zkClient.delete(lockPath, -1);
    81. ? ?? ???zkClient.close();
    82. ? ?? ???System.out.println(" 锁释放:" + lockPath);
    83. ? ? }

    84. }
    复制代码

    四 售票测试类
    TicketSellerTest.java
    1. import org.apache.zookeeper.*;

    2. import java.io.IOException;

    3. public class TicketSellerTest {

    4. ? ? private void sell(){
    5. ? ?? ???System.out.println("售票开始");
    6. ? ?? ???// 线程随机休眠数毫秒,模拟现实中的费时操作
    7. ? ?? ???int sleepMillis = (int) (Math.random() * 2000);
    8. ? ?? ???try {
    9. ? ?? ?? ?? ?//代表复杂逻辑执行了一段时间
    10. ? ?? ?? ?? ?Thread.sleep(sleepMillis);
    11. ? ?? ???} catch (InterruptedException e) {
    12. ? ?? ?? ?? ?e.printStackTrace();
    13. ? ?? ???}
    14. ? ?? ???System.out.println("售票结束");
    15. ? ? }

    16. ? ? public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
    17. ? ?? ???TicketOperation lock = new TicketOperation();
    18. ? ?? ???lock.acquireLock();
    19. ? ?? ???sell();
    20. ? ?? ???lock.releaseLock();
    21. ? ? }

    22. ? ? public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
    23. ? ?? ???TicketSellerTest ticketSeller = new TicketSellerTest();
    24. ? ?? ???for(int i=0;i<1000;i++){
    25. ? ?? ?? ?? ?ticketSeller.sellTicketWithLock();
    26. ? ?? ???}

    27. ? ? }

    28. }
    复制代码

    资源帝国 - 论坛版权1、本主题所有言论和图片纯属会员个人意见,与本论坛立场无关
    2、本站所有主题由该帖子作者发表,该帖子作者与资源帝国享有帖子相关版权
    3、其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和资源帝国的同意
    4、帖子作者须承担一切因本文发表而直接或间接导致的民事或刑事法律责任
    5、本帖部分内容转载自其它媒体,但并不代表本站赞同其观点和对其真实性负责
    6、如本帖侵犯到任何版权问题,请立即告知本站,本站将及时予与删除并致以最深的歉意
    7、资源帝国管理员和版主有权不事先通知发贴者而删除本文

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|资源帝国 ( 皖ICP备14009953号 )?

    GMT+8, 2019-8-11 01:52 , Processed in 0.330012 second(s), 29 queries .

    Powered by Discuz! X3.2

    ? 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表