侧边栏壁纸
博主头像
顾小诺 博主等级

行动起来,活在当下

  • 累计撰写 30 篇文章
  • 累计创建 14 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

基于Zookeeper的选举机制

顾小诺
2025-08-09 / 0 评论 / 0 点赞 / 16 阅读 / 0 字

Zookeeper 的选举机制主要用于在分布式系统中选出一个“领导者”(Leader),以协调多个节点之间的操作,确保系统的一致性和可靠性。下面是使用 Zookeeper 实现选举机制的基本思路和步骤:

一、核心原理

Zookeeper 通过创建临时顺序节点(Ephemeral Sequential znodes)来实现选举:

  • 每个参与选举的客户端在指定目录(如 /election)下创建一个临时顺序节点。

  • 这些节点会根据节点名称中的顺序号进行排序。

  • 拥有最小序号的节点即为当前的 Leader。

  • 其他节点通过监视(watch)紧邻它们的前一个节点来获得变更通知。

二、具体步骤

  1. 创建选举根节点

    • 在 Zookeeper 上创建选举的根节点,例如 /election,该节点通常是永久节点。

  2. 各实例创建临时顺序节点

    • 每个竞选者在 /election 下创建一个临时顺序节点,例如 /election/node_000000001

  3. 获取所有子节点,排序,判断自己是不是最小的节点

    • 客户端获取 /election 下所有子节点,并排序。

    • 如果自己创建的节点是排序后的最小节点,则成为 Leader。

  4. 如果不是 Leader,监听前一个节点

    • 客户端找到自己节点的前一个节点(序号比自己小的最大节点),对该节点设置 watch。

    • 当前一个节点被删除时(通常表示 Leader 挂掉),触发通知,重新执行选举。

  5. Leader 进行管理工作

    • 作为 Leader 节点,负责协调管理集群中的业务逻辑。

三、代码实现

1、pom.xml 引入

注意:因为我的环境需要同时使用 vertx、jraft,给出的配置是全部配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.9</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.workflow</groupId>
    <artifactId>engine</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>workflow-engine</name>
    <description>Workflow engine for executing serial/parallel tasks</description>
    
    <properties>
        <java.version>11</java.version>
        <jackson.version>2.15.2</jackson.version>
    </properties>
    
    <dependencyManagement>
        <dependencies>
            <!-- Vert.x BOM 确保所有Vert.x组件版本一致 -->
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-stack-depchain</artifactId>
                <version>4.5.15</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            
            <!-- Jackson BOM 确保所有Jackson组件版本一致 -->
            <dependency>
                <groupId>com.fasterxml.jackson</groupId>
                <artifactId>jackson-bom</artifactId>
                <version>${jackson.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    
    <dependencies>
        <!-- Spring Boot Starters -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        
        <!-- Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>

        <!-- MyBatis -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
        
        <!-- PageHelper 分页插件 -->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.4.6</version>
        </dependency>
        
        <!-- 多数据源 -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>3.5.0</version>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- Jackson JSON -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- 工具类 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <!-- hutool -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.38</version>
        </dependency>
        
        <!-- Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- vertx 通信 -->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- vertx TCP EventBus Bridge 用于外部客户端连接 -->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-tcp-eventbus-bridge</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- vertx 客户端桥接 -->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- vertx SockJS Bridge for WebSocket -->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-bridge-common</artifactId>
        </dependency>

        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-zookeeper</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- ZooKeeper client -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.8.0</version>
        </dependency>

        <!-- 显式声明Jackson依赖以确保版本一致性 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
        </dependency>
        
        <!-- Spring Boot Configuration Processor -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- raft 通信使用 -->
         <dependency>
            <groupId>com.alipay.sofa</groupId>
            <artifactId>jraft-core</artifactId>
            <version>1.3.14</version>
        </dependency>

        <dependency>
            <groupId>com.alipay.sofa</groupId>
            <artifactId>jraft-rheakv-core</artifactId>
            <version>1.3.14</version>
        </dependency>
        
        <dependency>
            <groupId>com.alipay.sofa</groupId>
            <artifactId>sofa-rpc-all</artifactId>
            <version>5.13.2</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project> 

2. zkClient工具类

注意:我使用curator 框架位于 vertx-zookeeper 包内,如果不使用vertx 可以单独引入curator包

package com.workflow.engine.util;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import com.workflow.engine.eventbus.ZkWatch;

import java.util.List;

public class ZKClient {

    private CuratorFramework client;

    public CuratorFramework getClient() {
        return client;
    }

    public ZKClient(String zkAdr) {
        client = CuratorFrameworkFactory.builder()
                .connectString(zkAdr)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))   //重试策略
                .sessionTimeoutMs(1000 * 6)
                .connectionTimeoutMs(1000 * 6)
                .build();
        client.start();
    }

    public ZKClient(CuratorFramework curator) {
        client = curator;
    }


    public boolean nodeExist(String path) throws Exception {
        Stat stat = client.checkExists().forPath(path);
        return stat != null;
    }

    public void createTmpNode(String path, String data) throws Exception {
        client.create()
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                // PERSISTENT 永久节点  EPHEMERAL 临时节点
                .forPath(path, data.getBytes());
    }

    public String createTmpSequentialNode(String path, String data) throws Exception {
        return client.create()
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(path, data.getBytes());
    }

    public void createNode(String path, String data, CreateMode createMode) throws Exception {
        //        因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,
        //        如果不存在直接创建会抛出NoNodeException,
        //        使用creatingParentContainersIfNeeded()之后Curator
        //        能够自动递归创建所有所需的父节点。
        client.create()
                .creatingParentContainersIfNeeded()
                .withMode(createMode)
                .forPath(path, data.getBytes());
    }

    public void deleteNode(String path) throws Exception {
        client.delete()
                .deletingChildrenIfNeeded().forPath(path);
    }

    public void watchChildNode(String path, ZkWatch zkWatch) throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); 
        pathChildrenCache.getListenable().addListener((curatorFramework, cacheEvent) ->
                zkWatch.onEvent(
                        cacheEvent.getType(),
                        cacheEvent.getData().getPath(),
                        cacheEvent.getData().getData())
        );
    }

    public List<String> getChildren(String path) throws Exception {
        return client.getChildren().forPath(path);
    }

    public String getData(String path) throws Exception {
        return new String(client.getData().forPath(path));
    }

    public void close() {
        client.close();
    }

}

3. zkWatch 监听接口

package com.workflow.engine.eventbus;

import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;

public interface ZkWatch {
    void onEvent(PathChildrenCacheEvent.Type type, String path, byte[] data) throws Exception;
}

4. leader选举

package com.workflow.engine.eventbus;

import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager;
import lombok.extern.slf4j.Slf4j;

import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import com.workflow.engine.constant.VertxAddressConstant;
import com.workflow.engine.eventbus.listener.ExternalNodeEventListener;
import com.workflow.engine.util.ZKClient;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@Component
@Slf4j
public class VertxServer {

    @Value("${vertx.zookeeper.hosts}")
    private String zookeeperHosts;

    @Value("${tcp-eventbus-bridge.enabled:false}")
    private boolean bridgeEnabled;

    @Value("${tcp-eventbus-bridge.port:7001}")
    private int bridgePort;

    @Value("${tcp-eventbus-bridge.host:0.0.0.0}")
    private String bridgeHost;

    @Autowired
    private EventBusService eventBusService;

    @Autowired
    private Vertx vertx;

    private TcpEventBusBridge tcpBridge;

    //zkClient
    private ZKClient zkClient;

    @Value("${engine.address}")
    private String engineAddr;

    @Value("${engine.rootPath}")
    private String engineRootPath;

    // 引擎所有地址
    private List<String> handles = new CopyOnWriteArrayList<>();

    private String znodePath;

    private volatile boolean isLeader = false;

    @Autowired
    private ExternalNodeEventListener externalNodeEventListener;


    @PostConstruct
    public void startup() {
        log.info("VertxServer is starting up...");
        // 1. 初始化 ZKClient
        ClusterManager clusterManager = ((VertxInternal) vertx).getClusterManager();
        if (clusterManager instanceof ZookeeperClusterManager) {
            this.zkClient = new ZKClient(((ZookeeperClusterManager) clusterManager).getCuratorFramework());
        } else {
            throw new IllegalStateException("ClusterManager is not an instance of ZookeeperClusterManager. Cannot initialize ZKClient.");
        }

        // 2. 启动 TCP EventBus Bridge(如果启用)
        if (bridgeEnabled) {
            startTcpEventBusBridge();
        }

        // 3. 监听路由处理器变化
        registerAndWatch();

        // 4. 监听引擎节点变化
        watchEngine();
        log.info("VertxServer startup complete.");
    }

    private void watchEngine() {
        try {
            //初始化
            registerEventListener();
            zkClient.watchChildNode(engineRootPath, (type, path, data) -> {
                //节点发生变动,重新获取节点数据
                registerEventListener();
            });
        } catch (Exception e) {
            log.error("watch engineHandlers fail on Zk({})", zookeeperHosts, e);
            System.exit(-1);
        }
    }

    private void registerEventListener() {
        try {
            log.info("engine change");
            if (!zkClient.nodeExist(engineRootPath)) {
                zkClient.createNode(engineRootPath, "", CreateMode.PERSISTENT);
            }

            List<String> list = zkClient.getChildren(engineRootPath);

            if (CollectionUtils.isEmpty(list)) {
                log.warn("No engine nodes found under path: {}", engineRootPath);
                if (isLeader) {
                    log.info("This node [{}] is no longer the leader as no nodes are present.", engineAddr);
                    isLeader = false;
                }
                return;
            }
            // 对子节点进行排序,以确保选举逻辑的正确性
            Collections.sort(list);

            // 获取leader节点(排序后的第一个)
            String leaderNode = list.get(0);
            String leaderAddr = zkClient.getData(engineRootPath + "/" + leaderNode);

            log.info("Current engine nodes: {}, Leader is: {}", list, leaderAddr);

            // 判断当前节点是否为leader
            if (engineAddr.equals(leaderAddr)) {
                if (!isLeader) {
                    log.info("This node [{}] has become the leader. Registering external event listener.", engineAddr);
                    // 注册外部地址
                    eventBusService.consumerOrder(VertxAddressConstant.EXTERNAL_EVENT_INGRESS, String.class,
                            MessageHandler.create(externalNodeEventListener::routeExternalEvent));
                    isLeader = true;
                } else {
                    log.info("This node [{}] is already the leader. Listener already registered.", engineAddr);
                }
            } else {
                if (isLeader) {
                    log.info("This node [{}] is no longer the leader.", engineAddr);
                    eventBusService.unregister(VertxAddressConstant.EXTERNAL_EVENT_INGRESS);
                    isLeader = false;
                }
            }
            log.info("new engineNode [{}]", list);
        } catch (Exception e) {
            log.error("engineAddrs fail on Zk[{}]", zookeeperHosts, e);
        }

    }

    /**
     * 注册并监听处理器变化
     */
    private void registerAndWatch() {
        try {
            //"/strategy_backend.engine/engine-N"
            //需要先判断根节点是否存在,否则会抛异常
            if (!zkClient.nodeExist(engineRootPath)) {
                zkClient.createNode(engineRootPath, "", CreateMode.PERSISTENT);
            }

            //是否重复登记
            List<String> childNodes = zkClient.getChildren(engineRootPath);
            log.info("current childNodes [{}]", childNodes);
            if (!CollectionUtils.isEmpty(childNodes)) {
                // 2. 遍历每个子节点
                for (String nodeName : childNodes) {
                    // 3. 获取该子节点存储的数据
                    String nodeData = zkClient.getData(engineRootPath + "/" + nodeName);
                    // 4. 比较节点数据是否与当前 engineAddr 相同
                    if (engineAddr.equals(nodeData)) {
                        // 如果相同,说明已注册,记录错误并退出
                        log.error("Engine with address {} is already registered under node {}", engineAddr, nodeName);
                        System.exit(-1);
                        return; // 避免继续执行
                    }
                }
            }

            //register
            this.znodePath = zkClient.createTmpSequentialNode(engineRootPath + "/engine-", engineAddr);
            log.info("Registered engine node with path: {}", znodePath);

            //watch
            zkClient.watchChildNode(engineRootPath, (type, path, data) -> {
                //节点数据发生变动,重新获取节点数据
                log.info("counter order handler node change");
                List<String> list = zkClient.getChildren(engineRootPath);
                handles.clear();
                if (!CollectionUtils.isEmpty(list)) {
                    handles.addAll(list);
                }
                log.info("new engineHandlers [{}]", list);
            });

            //获取最新节点
            List<String> tmpData = zkClient.getChildren(engineRootPath);
            if (!CollectionUtils.isEmpty(tmpData)) {
                handles.addAll(tmpData);
            }


        } catch (Exception e) {
            log.error("register counter order handler fail on Zk({})", zookeeperHosts, e);
            System.exit(-1);
        }

    }


    // 获取当前节点
    public String getCurrentNodeId() {
        if (vertx != null) {
            ClusterManager clusterManager = ((VertxInternal) vertx).getClusterManager();
            return clusterManager.getNodeId();
        }
        return null;
    }
    
    // 在你的 VertxServer 类中添加这个方法
    public List<String> getNodes() {
        if (vertx != null) {
            ClusterManager clusterManager = ((VertxInternal) vertx).getClusterManager();
            return clusterManager.getNodes();
        }
        return new ArrayList<String>();
    }

    @PreDestroy
    public void shutdown() {
        if (vertx != null) {
            log.info("正在关闭 Vertx 服务器...");
            
            // 关闭TCP Bridge
            if (tcpBridge != null) {
                tcpBridge.close();
                log.info("TCP EventBus Bridge 已关闭");
            }
            
            vertx.close(ar -> {
                if (ar.succeeded()) {
                    log.info("Vertx 服务器关闭成功");
                } else {
                    log.error("Vertx 服务器关闭失败", ar.cause());
                }
            });
        }
    }

    /**
     * 启动TCP EventBus Bridge
     */
    private void startTcpEventBusBridge() {
        try {
            log.info("正在启动 TCP EventBus Bridge,地址: {}:{}", bridgeHost, bridgePort);
            
            BridgeOptions bridgeOptions = createBridgeOptions();
            // 创建带有事件处理器的Bridge
             tcpBridge = TcpEventBusBridge.create(vertx, bridgeOptions, null, event -> {
                 try {
                     log.debug("Bridge事件: {} - 原始消息: {}", event.type(), event.getRawMessage());
                     switch (event.type()) {
                         case SOCKET_CREATED:
                             log.info("Bridge: 客户端连接建立 - {}", event.socket().remoteAddress());
                             break;
                         case SOCKET_CLOSED:
                             log.info("Bridge: 客户端连接关闭 - {}", event.socket().remoteAddress());
                             break;
                         case SEND:
//                             log.info("Bridge: 收到send消息 - 地址: {}, 消息体: {}",
//                                     event.getRawMessage().getString("address"),
//                                     event.getRawMessage().getValue("body"));
                             break;
                         case PUBLISH:
//                             log.info("Bridge: 收到publish消息 - 地址: {}, 消息体: {}",
//                                     event.getRawMessage().getString("address"),
//                                     event.getRawMessage().getValue("body"));
                             break;
                         case REGISTER:
                             log.info("Bridge: 客户端注册监听器 - 地址: {}", event.getRawMessage().getString("address"));
                             break;
                         case UNREGISTER:
                             log.info("Bridge: 客户端取消监听器 - 地址: {}", event.getRawMessage().getString("address"));
                             break;
                         case RECEIVE:
//                             log.info("Bridge: EventBus消息发送到客户端 - 地址: {}, 消息体: {}",
//                                     event.getRawMessage().getString("address"),
//                                     event.getRawMessage().getValue("body"));
                             break;
                         default:
                             log.debug("Bridge: 其他事件 - {}", event.type());
                             break;
                     }

                     // 检查是否被拒绝
                     boolean shouldComplete = true;
                     //String rejectReason = null;

                     // 这里可以添加自定义的权限检查逻辑
                     // 目前允许所有事件通过
                     //log.debug("事件 {} 权限检查: {}", event.type(), shouldComplete ? "通过" : "拒绝 - " + rejectReason);
                    
                     event.complete(shouldComplete);
                 } catch (Exception e) {
                     log.error("Bridge事件处理异常", e);
                     event.complete(false); // 异常时拒绝事件
                 }
             });

            tcpBridge.listen(bridgePort, bridgeHost, ar -> {
                if (ar.succeeded()) {
                    log.info("TCP EventBus Bridge 启动成功,外部客户端可通过 {}:{} 连接", bridgeHost, bridgePort);
                    log.info("Python客户端现在可以通过TCP连接到EventBus了!");
                } else {
                    log.error("TCP EventBus Bridge 启动失败", ar.cause());
                }
            });
        } catch (Exception e) {
            log.error("TCP EventBus Bridge 启动异常", e);
        }
    }
    
    /**
     * 创建Bridge选项配置
     */
     private BridgeOptions createBridgeOptions() {
         BridgeOptions options = new BridgeOptions();
         options.addInboundPermitted(new PermittedOptions());
         options.addOutboundPermitted(new PermittedOptions());
         return options;
     }
}

0

评论区