关注

【即时通讯系统】环境搭建2——etcd

目录

一.etcd

1.1.什么是etcd

1.2.etcd在服务注册和发现的作用

1.3.安装etcd

二.搭建服务注册发现中心

2.1.etcd-cpp-apiv3的安装

2.2.etcd-cpp-apiv3的使用

2.2.1. 客户端初始化:etcd::Client 构造函数

2.2.2. 租约管理(Lease)

2.2.3. 键值操作

2.2.4. 监听变化(Watch)

2.2.5.使用示例

三.封装服务发现与注册功能

3.1.设计思想

3.2.代码+测试


一.etcd

1.1.什么是etcd

etcd 是一个分布式、高可用的键值存储系统,主要用于在分布式系统中可靠地存储和管理关键数据(如配置信息、元数据、服务发现地址),并为其提供强一致性的数据访问保证。

接下来,我们从几个关键层面来分解这个定义。

  • 键值存储

    • 这是它的数据模型,非常简单。你可以把它想象成一个可以远程访问的“字典”或“Map”。

    • 你通过一个唯一的“键”(Key)来存放或读取对应的“值”(Value)。例如,键可以是 /database/config/host,值可以是 192.168.1.100

    • 它还支持按“目录”结构组织键,方便管理,例如所有数据库配置都放在 /database/config/ 目录下。

  • 分布式

    • 一个 etcd 服务通常由 3、5 或 7 个节点(服务器)组成一个集群

    • 数据会自动在集群中的多个节点上复制和保存。这意味着即使其中一两个节点发生故障,整个集群依然可以正常工作,数据也不会丢失。这是实现“高可用”的基础。

  • 强一致性

    • 这是 etcd 最核心、最重要的特性。

    • 它意味着,当你向 etcd 集群写入一个新数据后,之后任何客户端从集群中任何一个健康的节点读取这个数据,保证都能读到最新写入的值,绝对不会读到旧数据。

    • 这是通过一个名为 Raft 的共识算法实现的。Raft 算法确保集群中所有节点对数据的修改顺序达成完全一致。写入操作必须得到集群中多数节点的确认才会成功,从而保证了数据的一致性。

  • 高可用

    • 基于其分布式架构和 Raft 算法,当少数节点宕机、网络出现分区时,只要集群中还能形成“多数派”(例如 3 个节点中存活 2 个,5 个节点中存活 3 个),整个集群就能继续提供读写服务。

关键能力:Watch 和 Lease

除了基本的“读/写/删”,etcd 提供了两个至关重要的高级功能:

  • Watch(监听)

    • 客户端可以“监视”一个键或一个目录。当被监视的键的值发生变化(被修改、删除)时,etcd 会主动、实时地通知监听的客户端。

    • 作用:这使得客户端无需不断轮询查询,就能立刻感知到配置变更,从而实现动态配置更新。

  • Lease(租约)

    • 你可以为一个键值对附加一个具有 TTL(生存时间) 的租约。例如,设置一个 10 秒的租约。

    • 客户端(比如一个微服务实例)必须定期“续租”。如果客户端崩溃了,无法续租,10 秒后,这个租约及其关联的所有键值对都会自动删除

    • 作用:这是实现服务发现节点健康检查的基石。服务实例注册时带一个租约,它正常运行时不断续租,注册信息就存在;它一旦宕机,注册信息会自动清理,其他服务就知道它不可用了。

与类似技术的简单对比

  • 与 Redis 对比

    • Redis:更侧重高性能缓存、丰富的数据结构、消息队列。虽然可以持久化,但其主从复制模式在默认情况下是异步的,不保证强一致性(有数据丢失风险)。

    • etcd:首要目标是强一致性和可靠性,性能其次。数据结构简单(键值),核心是提供分布式系统所需的协调原语(Watch, Lease)。

  • 与 ZooKeeper 对比

    • ZooKeeper:是 etcd 的前辈,设计目标高度相似(分布式协调)。但其 API 相对复杂,使用自定义协议。

    • etcd:设计更简洁,使用 HTTP/gRPC + JSON/Protocol Buffers 作为通信协议,提供了更清晰、易用的 API。在 Kubernetes 生态的带动下,已成为该领域的事实标准。

1.2.etcd在服务注册和发现的作用

在没有服务发现之前,服务间调用通常采用硬编码或配置文件的方式指定对方的地址。当服务实例增减或发生故障时,需要手动修改所有调用方的配置,这在分布式、弹性伸缩的环境中是完全不可行的。

服务发现的目标就是解决这个问题:让服务消费者能自动、实时地找到它依赖的服务提供者的当前可用实例列表

etcd 以其强一致性Watch 机制Lease 特性,完美地支撑了这一过程。整个流程可以分为两个核心部分:服务注册(写) 和 服务发现(读+监听)

1. 服务注册:服务如何“上线”?

当一个服务实例(例如 用户服务 UserService)启动并准备好接收请求时,它需要向整个系统宣告自己的存在。

详细步骤:

  1. 生成唯一标识:服务实例启动后,通常会生成一个唯一标识符,例如 userservice-instance-192.168.1.101:8080

  2. 创建租约服务实例向 etcd 请求创建一个具有 TTL 的租约(例如 30 秒)。这个租约是这个实例的“心跳契约”。

  3. 写入键值对:服务实例将自身的关键信息(主要是网络地址,如 192.168.1.101:8080,也可以包含元数据如版本、权重等)作为一个,以一个特定的写入 etcd。

    • 键的格式通常是有层次的,例如:

      • /services/userservice/192.168.1.101:8080

      • /services/userservice/instances/instance-01

    • 关键操作:在写入这个键值对时,将步骤2创建的租约ID与之关联

  4. 定期续约服务实例必须周期性地(比如每 10 秒) 向 etcd 发送“续租”请求,刷新这个租约的 TTL,告诉 etcd:“我还活着”。

etcd 在此环节的作用:

  • 租约机制:这是实现自动健康检查和实例下线清理的核心。如果服务实例崩溃、网络分区或进程僵死,它将无法继续“续租”。

  • 自动过期:当租约的 TTL 到期(例如实例宕机后 30 秒),etcd 会自动删除所有与该租约绑定的键值对。

  • 结果:这意味着,etcd 中 /services/userservice/ 目录下存储的,永远都是当前健康存活的服务实例列表。宕机的实例会被自动移除,无需人工干预。

2. 服务发现:消费者如何找到提供者?

当另一个服务(例如 API 网关 或 订单服务)需要调用 用户服务 时,它需要获取后者的可用地址列表。

详细步骤:

  1. 初次获取列表:服务消费者(调用方)首先查询 etcd 中特定前缀的键(例如 /services/userservice/)。

  2. 解析实例信息:etcd 返回该目录下所有的键值对。消费者解析这些值,得到当前所有可用的 UserService 实例地址列表,例如 [192.168.1.101:8080, 192.168.1.102:8080]

  3. 建立 Watch 监听:这是实现动态发现的关键。服务消费者在获取到初始列表后,立即在同一个前缀(/services/userservice/)上创建一个 Watch

  4. 处理变更事件

    • 新增事件:当一个新的 UserService 实例启动并完成注册(写入新键值对)时,etcd 会通过 Watch 通道立即通知所有监听者。消费者收到通知后,将新实例地址加入自己的本地可用列表。

    • 删除事件:当一个已有的 UserService 实例宕机(租约过期,键值对被删除)时,etcd 同样会立即通知。消费者收到通知后,将失效的实例地址从本地列表中移除。

etcd 在此环节的作用:

  • 强一致性读取:保证消费者获取到的列表是最新的、一致的。

  • Watch 机制:提供了实时、推送式的变更通知。消费者无需定时轮询(拉取),从而减少了延迟和 etcd 服务器的压力。这是实现服务列表动态、实时更新的技术保障。

1.3.安装etcd

注意:ubuntu24.04可能使用下面这种方法是安装不了的。所以我建议还是使用ubuntu22.04来。

安装Etcd: 

sudo apt-get install etcd

启动Etcd服务: 

sudo systemctl start etcd

设置Etcd开机自启: 

sudo systemctl start etcd

节点配置

如果是单节点集群其实就可以不用进行配置,默认 etcd 的集群节点通信端口为 2380, 客户端访问端口为2379。

etcd 的两个端口分工明确:

  • 2379 端口:用于接收客户端请求。所有需要读取或写入数据的操作(比如存储配置、查询键值)都通过这个端口进行。这是 etcd 对外提供的服务端口。

  • 2380 端口:用于 etcd 集群内部节点之间的通信。当多个 etcd 节点组成一个集群时,它们通过这个端口交换数据、选举领导者、同步状态。如果只运行单节点,这个端口虽然默认打开,但实际上没有数据流量。

两个端口不能互换使用:客户端无法通过 2380 访问数据,节点之间也不会通过 2379 进行内部协调。它们各自服务于完全不同的通信场景。

总结:2379 对外服务,2380 对内协作。两者分工明确,各司其职。

如果说你去配置我们的集群模式,则可以去配置文件里面进行修改:/etc/default/etcd

但是,我们这里是不用进行配置的,我们这里只需要一台主机即可,所以我们不需要进行任何配置。

我们也不需要进行修改

运行验证

etcdctl put mykey "this is awesome"

如果出现报错:

则 sudo vi /etc/profile 在末尾声明环境变量 ETCDCTL_API=3 以确定etcd版本。

export ETCDCTL_API=3

我们进行重新加载一下配置文件

source /etc/profile

然后我们就发现,我们可以正常运行这个etcd了

二.搭建服务注册发现中心

使用Etcd作为服务注册发现中心,你需要定义服务的注册和发现逻辑。

这通常涉及到以下几个操作: 

  • 1. 服务注册:服务启动时,向Etcd注册自己的地址和端口。 
  • 2. 服务发现:客户端通过Etcd获取服务的地址和端口,用于远程调用。 
  • 3. 健康检查:服务定期向Etcd发送心跳,以维持其注册信息的有效性。 

etcd 采用 golang 编写,v3版本通信采用grpc API,即(HTTP2+protobuf); 

官方只维护了go语言版本的client库, 因此需要找到C/C++ 非官方的client 开发库:

2.1.etcd-cpp-apiv3的安装

etcd-cpp-apiv3 是一个 etcd 的 C++版本客户端API。它依赖于mipsasm, boost, protobuf, gRPC, cpprestsdk 等库。

它的GitHub地址是GitHub - etcd-cpp-apiv3/etcd-cpp-apiv3: The etcd-cpp-apiv3 is a C++ library for etcd's v3 client APIs, i.e., ETCDCTL_API=3. · GitHub

etcd-cpp-apiv3的依赖安装:

sudo apt-get install libboost-all-dev libssl-dev && sudo apt-get install libprotobuf-dev protobuf-compiler-grpc && sudo apt-get install libgrpc-dev libgrpc++-dev && sudo apt-get install libcpprest-dev

安装好依赖之后,我们就可以对etcd-cpp-apiv3进行安装了

git clone https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3

在这个目录里面执行下面这个命令

mkdir build && cd build &&\
cmake .. -DCMAKE_INSTALL_PREFIX=/usr &&\
make -j$(nproc) && sudo make install

注意我们需要提前安装好CMake

安装完毕了啊!!

2.2.etcd-cpp-apiv3的使用

在 etcd-cpp-apiv3 库中,服务注册与发现功能主要依赖于以下几个核心组件和函数。

2.2.1. 客户端初始化:etcd::Client 构造函数

  • 作用:创建一个与 etcd 服务器通信的客户端对象。
  • 参数:传入 etcd 服务器的地址,例如 "http://127.0.0.1:2379"。
  • 用途:所有后续的 etcd 操作(如写入键值、创建租约、查询等)都需要通过这个客户端对象发起。它封装了与 etcd 的 gRPC 连接。

2.2.2. 租约管理(Lease)

在 etcd 中,租约(Lease) 是一种用于控制键值对生命周期的机制。你可以把它想象成一份“租房合同”,它决定了关联的键值对能“住”在 etcd 里多久。

1. 租约的基本概念

  • 租约有一个固定的有效期(TTL,即生存时间),比如 10 秒、30 秒。这个时间由创建租约时指定。

  • 租约本身是一个独立的对象,创建成功后会得到一个唯一的租约 ID。

  • 租约可以绑定任意多个键值对。一旦绑定,这些键值对的存亡就完全依赖于租约的状态。

2. 租约的生命周期

✅ 租约有效时

只要租约还“活着”(即在有效期内),所有绑定到该租约的键值对都会正常存在,可以被查询、读取、修改。租约的有效性可以通过定期续约(KeepAlive)来维持。续约操作会重置租约的 TTL,相当于告诉 etcd:“这个租约对应的服务还在运行,请延长它的有效期。”

❌ 租约过期时

如果租约到期且没有被续约(例如服务进程崩溃、网络中断导致无法发送心跳),租约就会过期。一旦过期,所有绑定在该租约上的键值对会被 etcd 自动、立即删除,无需人工干预。

🔨 租约被撤销时

除了被动过期,租约也可以被主动撤销(Revoke)。例如,当服务正常停止时,程序可以调用撤销租约的接口,这样所有关联的键值对也会立刻被删除。主动撤销相当于主动退租,比被动过期更快地清理数据。

3. 在服务注册与发现中的应用

让我们用服务注册与发现的场景来理解租约的妙用:

服务注册(服务提供方)

  • 服务启动时,它先向 etcd 申请一个租约(比如 TTL=10 秒)。

  • 然后,它将自身的地址信息(例如 192.168.1.100:8080)作为一个键值对写入 etcd,并将这个键值对绑定到刚才创建的租约上。

  • 接着,服务在后台启动一个心跳机制,定期向 etcd 发送续约请求,不断重置租约的 TTL。只要心跳持续,租约就永远不过期,键值对就一直存在。

服务发现(服务消费方)

  • 其他服务通过查询 etcd 中特定前缀下的键值对,就能获得所有当前在线的服务实例列表。

  • 如果某个服务实例崩溃了,它的心跳停止,租约在 10 秒后自然过期,对应的键值对自动消失。消费方可以通过监听 etcd 的变化(Watch)立刻感知到这一变化,从而将失效的实例从本地列表中移除。

  • 如果服务正常退出,它可以主动撤销自己的租约,让键值对立即消失,通知消费方更快地更新。

那么接下来我们就来讲讲租约常用的3个函数吧

  1. leasegrant(ttl_seconds)
  2. leasekeepalive(ttl_seconds)
  3. leaserevoke(lease_id)

我们一个一个来讲解

leasegrant(ttl_seconds)

  1. 作用:向 etcd 申请一个租约,指定其有效时长(秒)。
  2. 参数:ttl_seconds —— 租约的初始 TTL
  3. 返回值:返回一个 etcd::Response 对象,通过它可以获取租约 ID(lease_id)以及操作状态。
  4. 用途:在服务注册时,先创建一个租约,后面将服务实例的键值对绑定到这个租约上。如果服务进程崩溃或主动下线,租约将无法续约,到期后键值对自动消失,实现服务下线的自动发现。

leasekeepalive(ttl_seconds)

  • 作用:创建租约的同时,立即启动一个后台线程定期向 etcd 发送心跳,以维持租约有效(自动续约)。
  • 参数:ttl_seconds —— 租约的 TTL。
  • 返回值:返回一个 std::shared_ptr<etcd::KeepAlive> 对象,该对象持有租约 ID,并负责后台心跳。
  • 用途:比 leasegrant 更方便,因为不需要手动调用续约接口。服务进程只要保持 KeepAlive 对象存活,租约就不会过期。当对象析构时,租约自动撤销(主动下线)。

leaserevoke(lease_id)

  • 作用:主动撤销一个租约。
  • 参数:lease_id —— 要撤销的租约 ID。
  • 用途:当服务进程正常退出时,可以调用此函数立即删除租约,从而让绑定的键值对立刻消失,避免等待 TTL 超时。在示例中,用户按回车后调用它实现主动注销。

etcd::KeepAlive 类

  • 作用:一个管理租约续约的类,内部启动线程定期发送 KeepAlive 请求。
  • 常用方法:Lease() 返回绑定的租约 ID。
  • 用途:将 KeepAlive 对象保存在全局或长期存在的变量中,即可保持租约一直有效。当对象被销毁(例如程序退出或主动 reset)时,自动撤销租约。

有的人可能好奇leasegrant(ttl_seconds)和leasekeepalive(ttl_seconds)到底有啥区别?

我们可以看看下面这2段代码,它们实现的功能是一模一样的

示例 1:使用 leasegrant 手动管理续约

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <iostream>
#include <memory>
#include <thread>

int main() {
    etcd::Client etcd("http://127.0.0.1:2379");

    // 1. 使用 leasegrant 创建租约,TTL=10秒
    etcd::Response lease_resp = etcd.leasegrant(10).get();
    if (!lease_resp.is_ok()) {
        std::cerr << "租约创建失败: " << lease_resp.error_message() << std::endl;
        return -1;
    }
    int64_t lease_id = lease_resp.value().lease();  // 手动提取租约ID
    std::cout << "租约创建成功,ID: " << lease_id << std::endl;

    // 2. 手动创建 KeepAlive 对象来续约
    std::unique_ptr<etcd::KeepAlive> keepalive =
        std::make_unique<etcd::KeepAlive>(etcd, lease_id);
    // KeepAlive 对象会在后台自动发送心跳,保持租约有效

    // 3. 模拟运行一段时间
    std::this_thread::sleep_for(std::chrono::seconds(30));

    // 4. 停止续约(keepalive 析构时自动撤销租约)
    keepalive.reset();  // 租约撤销,绑定的键值对将被删除
    std::cout << "已停止续约,租约已撤销" << std::endl;

    return 0;
}

示例 2:使用 leasekeepalive 一步完成创建和自动续约

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <iostream>
#include <memory>
#include <thread>

int main() {
    etcd::Client etcd("http://127.0.0.1:2379");

    // 1. 使用 leasekeepalive 创建租约并立即启动自动续约
    std::shared_ptr<etcd::KeepAlive> keepalive = etcd.leasekeepalive(10).get();
    // leasekeepalive 返回的 shared_ptr<KeepAlive> 内部已包含租约ID和心跳线程
    int64_t lease_id = keepalive->Lease();  // 直接从 KeepAlive 对象获取租约ID
    std::cout << "租约创建并自动续约已启动,ID: " << lease_id << std::endl;

    // 2. 模拟运行一段时间(无需手动续约)
    std::this_thread::sleep_for(std::chrono::seconds(30));

    // 3. 停止续约(shared_ptr 离开作用域或 reset 时自动撤销租约)
    keepalive.reset();  // 租约撤销
    std::cout << "已停止续约,租约已撤销" << std::endl;

    return 0;
}

这2段代码实现的功能是一模一样的。

2.2.3. 键值操作

put(key, value, lease_id)

  • 作用:向 etcd 写入一个键值对,并可选择绑定到一个租约上。
  • 参数:
  • key:要写入的键(字符串)。
  • value:要写入的值(字符串)。
  • lease_id(可选):绑定的租约 ID。如果提供,该键值对的生命周期与租约绑定。
  • 返回值:返回一个 std::future<etcd::Response>,可调用 .get() 等待结果。
  • 用途:服务注册时,将服务实例的地址(值)写入到以服务名命名的键下,并绑定到之前创建的租约上。这样,实例信息就与租约生命周期挂钩。

ls(prefix)

  • 作用:列出所有以指定前缀开头的键,并返回它们的当前值。
  • 参数:prefix —— 要查找的键前缀。
  • 返回值:etcd::Response 对象,包含匹配的键列表和对应的值。
  • 用途:服务发现时,通过服务名前缀(如 /services/my_service/)一次性获取当前所有在线实例的键值对。这是服务发现的第一步,用于初始化本地实例列表。

话不多说,我们直接看例子好吧

我们这里举一个完整的例子

首先使用put 写入带租约的键值对

#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/Response.hpp>

int main() {
    // 连接 etcd
    etcd::Client etcd("http://127.0.0.1:2379");

    // 1. 创建一个租约,TTL=10秒
    etcd::Response lease_resp = etcd.leasegrant(10).get();//注意这里不会自动续约,想要自动续约就要使用leasekeepalive
    if (!lease_resp.is_ok()) {
        std::cerr << "租约创建失败" << std::endl;
        return -1;
    }
    int64_t lease_id = lease_resp.value().lease();//获取它的租约ID
    std::cout << "租约 ID: " << lease_id << std::endl;

    // 2. 写入键值对,并绑定租约
    std::string key = "/test/mykey";
    std::string value = "hello etcd";
    etcd::Response put_resp = etcd.put(key, value, lease_id).get();
    if (put_resp.is_ok()) {
        std::cout << "写入成功: " << key << " -> " << value << std::endl;
    } else {
        std::cerr << "写入失败: " << put_resp.error_message() << std::endl;
    }

    return 0;
}

说明:

  1. 先通过 leasegrant 创建一个租约,获取租约 ID。
  2. 调用 put 时传入租约 ID,使键值对的生命周期与租约绑定。
  3. 如果程序后续没有续约,租约过期后该键值对会自动删除。

我们使用 ls 列出前缀下的所有键值对

#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/Response.hpp>

int main() {
    // 连接 etcd
    etcd::Client etcd("http://127.0.0.1:2379");

    // 要查询的前缀
    std::string prefix = "/test/";

    // 列出该前缀下的所有键值对
    etcd::Response resp = etcd.ls(prefix).get();
    if (!resp.is_ok()) {
        std::cerr << "查询失败: " << resp.error_message() << std::endl;
        return -1;
    }

    // 输出结果
    if (resp.keys().empty()) {
        std::cout << "前缀 " << prefix << " 下没有键值对" << std::endl;
    } else {
        std::cout << "前缀 " << prefix << " 下的键值对:" << std::endl;
        for (int i = 0; i < resp.keys().size(); ++i) {
            std::cout << "  " << resp.key(i) << " -> " << resp.value(i).as_string() << std::endl;
        }
    }

    return 0;
}

说明:

  • ls 返回匹配前缀的所有键值对。
  • 通过 resp.keys().size() 获取数量,然后遍历 key(i) 和 value(i).as_string() 输出。
  • 常用于服务发现的第一步,获取当前所有在线实例。

CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(etcd_examples)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(etcd-cpp-api REQUIRED)
find_package(OpenSSL REQUIRED)
find_package(Threads REQUIRED)

add_executable(put_example put_example.cpp)
add_executable(ls_example ls_example.cpp)

target_link_libraries(put_example etcd-cpp-api OpenSSL::SSL OpenSSL::Crypto Threads::Threads)
target_link_libraries(ls_example etcd-cpp-api OpenSSL::SSL OpenSSL::Crypto Threads::Threads)

我们直接编译运行

10秒之内

10秒后

2.2.4. 监听变化(Watch)

Watcher 是 etcd 提供的一种“监控”机制,可以让你实时感知 etcd 中数据的变化。你可以把它想象成一个监控摄像头——你把摄像头对准某个目录(例如 /services/),一旦这个目录下有任何风吹草动(比如新增文件、修改文件、删除文件),摄像头就会立刻拍下变化并通知你。

具体来说,Watcher 能干什么?

  • 监听某个前缀下的所有键:比如你指定监听 /services/ 这个前缀,那么所有以 /services/ 开头的键(如 /services/user/instance1/services/order/instance2)的变化都会被监控到。

  • 实时接收变化通知:当有新的键被创建、已有键被更新或删除时,Watcher 会立即触发你预先设置好的回调函数,并把变化的信息(比如发生了什么事件、是哪个键、新值是什么、旧值是什么)传给你。

为什么需要 Watcher?

在分布式系统中,服务实例会动态上下线(比如某个节点崩溃、扩容缩容)。如果你只是定期去查询 etcd(轮询),可能会错过一些短暂的变化,或者查询太频繁浪费资源。而 Watcher 让你能够被动等待通知,一旦变化发生,etcd 就会主动告诉你,这样你就能第一时间更新本地缓存或路由表,保证服务调用的正确性。

Watcher 的工作原理

  1. 你创建一个 Watcher 对象,告诉 etcd:“我要监听 /services/ 这个目录下的所有变化,如果有什么动静,就调用这个回调函数。”

  2. etcd 会一直盯着这个目录。

  3. 当该目录下发生PUT事件(新增或更新)或DELETE事件(删除)时,etcd 立即把事件信息推送给你的程序。

  4. 你的回调函数被调用,你可以从中提取出变化的键、新值、旧值等,然后做相应的处理(比如把新实例加入列表,把下线的实例移除)。

回调函数里能拿到什么?

  • 事件类型PUT 表示键被创建或更新;DELETE_ 表示键被删除。

  • :发生变化的键名。

  • 新值:对于 PUT 事件,你可以拿到更新后的值。

  • 旧值:对于 DELETE 事件,你可以拿到删除前的值,这样就知道是哪个实例下线了。


etcd::Watcher 类

作用:监控 etcd 中某个前缀下的键值变化,当有键被创建、更新或删除时,触发回调函数。

etcd::Watcher 类构造函数参数:

  • registry_host:etcd 服务器地址。
  • prefix:要监听的前缀。
  • callback:用户定义的回调函数,接收一个 etcd::Response 对象。
  • recursive:是否递归监听子目录(通常设为 true)。

用途:在服务发现中,获取初始实例列表后,创建一个 Watcher 监听同一个服务前缀。当有新实例注册(PUT 事件)或实例下线(DELETE 事件)时,回调函数被调用,从而实时更新本地服务列表。

话不多说,我们直接看看怎么使用

首先,我们需要先包含头文件

#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>
#include <iostream>

然后我们需要编写一个回调函数,当有新实例注册(PUT 事件)或实例下线(DELETE 事件)时,回调函数被调用。

回调函数接收一个 etcd::Response const& 参数,通过它获取事件详情。

void watchCallback(etcd::Response const& resp) {
    if (resp.error_code()) {
        std::cerr << "监听错误: " << resp.error_message() << std::endl;
        return;
    }

    // 判断事件类型
    if (resp.event_type() == etcd::Event::EventType::PUT) 
    {
        std::cout << "新增/更新: " << resp.kv().key()
                  << " = " << resp.kv().as_string() << std::endl;
    } 
    else if (resp.event_type() == etcd::Event::EventType::DELETE_) 
    {
        std::cout << "删除: " << resp.kv().key()
                  << " (原值: " << resp.prev_kv().as_string() << ")" << std::endl;
    }
}

然后创建 Watcher 对象

在 main 函数中,实例化 etcd::Watcher:

std::string etcd_url = "http://127.0.0.1:2379";
std::string prefix = "/myservice/";   // 要监听的前缀

etcd::Watcher watcher(etcd_url, prefix, watchCallback, true);
  • 第一个参数:etcd 服务器地址。
  • 第二个参数:监听的前缀(例如 /myservice/)。
  • 第三个参数:回调函数名。
  • 第四个参数:是否递归监听子目录(通常设为 true,表示监听该前缀下的所有键)。

我们需要保持程序运行

Watcher 在后台线程中工作,主线程需要保持存活(例如通过 getchar() 或事件循环)。

std::cout << "开始监听,按回车键退出..." << std::endl;
std::cin.get();

(可选)取消监听

Watcher 析构时会自动取消监听,也可以显式调用 Cancel():

watcher.Cancel();

完整示例

#include <iostream>
#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>

void watchCallback(etcd::Response const& resp) {
    if (resp.error_code()) {
        std::cerr << "监听错误: " << resp.error_message() << std::endl;
        return;
    }
    if (resp.event_type() == etcd::Event::EventType::PUT) {
        std::cout << "PUT: " << resp.kv().key() << " = " << resp.kv().as_string() << std::endl;
    } else if (resp.event_type() == etcd::Event::EventType::DELETE_) {
        std::cout << "DELETE: " << resp.kv().key() << " (原值: " << resp.prev_kv().as_string() << ")" << std::endl;
    }
}

int main() {
    std::string etcd_url = "http://127.0.0.1:2379";
    std::string prefix = "/demo/";

    etcd::Watcher watcher(etcd_url, prefix, watchCallback, true);
    std::cout << "监听 " << prefix << ",按回车退出..." << std::endl;
    std::cin.get();
    return 0;
}

这个时候我们打开另外一个终端,然后执行下面这个

# 写入(PUT)一个键值对
etcdctl put /demo/name "Alice"

然后我们原来的程序就会变成下面这样子

我们接着执行

# 更新同一个键
etcdctl put /demo/name "Bob"

# 删除一个键
etcdctl del /demo/name

# 写入一个子级键(递归监听会触发)
etcdctl put /demo/user/age "30"

完全没有问题。我们按下回车,就结束了


 

watcher.Wait()方法

watcher.Wait() 是 etcd::Watcher 类提供的一个阻塞等待方法,它的作用是让当前线程暂停执行,直到该 Watcher 对象被取消(例如调用了 Cancel())或者发生无法恢复的错误时才会返回。可以把它理解为一个“等待监听结束”的同步点。

具体行为

当调用 watcher.Wait() 后,当前线程会一直阻塞,直到:

  • 显式调用 watcher.Cancel() 取消监听。
  • Watcher 内部发生致命错误导致监听终止。
  • (某些实现中)析构函数被调用。

返回后,你可以检查状态或清理资源。

与 std::cin.get() 的区别

  • std::cin.get():阻塞等待用户输入,与 Watcher 无关,适合简单演示。
  • watcher.Wait():与 Watcher 生命周期绑定,当监听真正结束时才返回,更符合“等待监听完成”的语义,适合在需要同步等待监听完结的场景中使用(例如单元测试或需要精确控制退出时机的程序)。
etcd::Watcher watcher(etcd_url, prefix, callback, true);
std::cout << "监听中,按 Ctrl+C 结束..." << std::endl;

// 等待监听结束(例如在其他线程中调用 Cancel())
watcher.Wait();

std::cout << "监听已停止" << std::endl;

如果希望用户通过 Ctrl+C 停止程序,可以结合信号处理:在信号处理函数中调用 watcher.Cancel(),这样 Wait() 就会返回,程序自然退出。

注意事项

  • Wait() 通常用于主线程等待后台监听线程结束,而回调函数是在 Watcher 内部线程中执行的,因此不会阻塞回调。
  • 如果从未调用 Cancel(),且监听一直有效,Wait() 会永久阻塞,除非 etcd 连接断开等错误导致监听终止。

我们可以看个例子

 将之前的 std::cin.get() 替换为 watcher.Wait() 后,程序的行为会发生变化:Wait() 会一直阻塞,直到监听被主动取消或发生错误。为了保持“按回车退出”的功能,我们需要结合信号处理或在另一个线程中触发取消。下面是一个修改后的示例,它使用信号处理来响应 Ctrl+C 并调用 Cancel(),从而使 Wait() 返回。

#include <iostream>
#include <etcd/Watcher.hpp>
#include <etcd/Response.hpp>
#include <csignal>
#include <memory>

std::unique_ptr<etcd::Watcher> g_watcher; // 全局指针,便于信号处理中访问

void signal_handler(int) {
    std::cout << "\n收到中断信号,停止监听..." << std::endl;
    if (g_watcher) {
        g_watcher->Cancel(); // 取消监听,使 Wait() 返回
    }
}

void watchCallback(etcd::Response const& resp) {
    if (resp.error_code()) {
        std::cerr << "监听错误: " << resp.error_message() << std::endl;
        return;
    }
    if (resp.event_type() == etcd::Event::EventType::PUT) {
        std::cout << "PUT: " << resp.kv().key() << " = " << resp.kv().as_string() << std::endl;
    } else if (resp.event_type() == etcd::Event::EventType::DELETE_) {
        std::cout << "DELETE: " << resp.kv().key() << " (原值: " << resp.prev_kv().as_string() << ")" << std::endl;
    }
}

int main() {
    // 注册信号处理函数
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    std::string etcd_url = "http://127.0.0.1:2379";
    std::string prefix = "/demo/";

    // 创建 Watcher 对象,并保存到全局指针
    g_watcher = std::make_unique<etcd::Watcher>(etcd_url, prefix, watchCallback, true);
    std::cout << "监听 " << prefix << ",按 Ctrl+C 退出..." << std::endl;

    // 等待监听结束(直到 Cancel() 被调用)
    g_watcher->Wait();

    std::cout << "监听已停止" << std::endl;
    return 0;
}

这个时候我们按回车,没有任何反应,我们按下Ctrl+C

这个还是比较好理解的。

2.2.5.使用示例

示例1(test1)

discover_service.cpp

#include <iostream>          // 输入输出流
#include <etcd/Client.hpp>   // etcd 客户端
#include <etcd/Response.hpp> // 响应对象

int main() {
    // 创建 etcd 客户端,连接到本地 etcd 服务器
    etcd::Client etcd("http://127.0.0.1:2379");
    // 服务实例 key 的前缀,用于发现同一服务的所有实例
    const std::string service_prefix = "/services/my_service/";

    // 列出该前缀下的所有 key(相当于目录列表)
    etcd::Response resp = etcd.ls(service_prefix).get(); // 阻塞等待结果
    // 检查操作是否成功
    if (!resp.is_ok()) {
        std::cerr << "发现服务失败: " << resp.error_message() << std::endl;
        return 1;
    }

    // 判断是否有实例
    if (resp.keys().empty()) {
        std::cout << "未在 " << service_prefix << " 下找到服务实例" << std::endl;
    } else {
        std::cout << "发现的服务实例:\n";
        // 遍历返回的键值对并输出
        for (int i = 0; i < resp.keys().size(); ++i) {
            std::cout << "  " << resp.key(i) << " -> " << resp.value(i).as_string() << std::endl;
        }
    }

    return 0;
}

register_service.cpp

#include <iostream>      // 输入输出流
#include <csignal>       // 信号处理
#include <memory>        // 智能指针
#include <etcd/Client.hpp>    // etcd 客户端
#include <etcd/KeepAlive.hpp> // 租约自动续约
#include <etcd/Response.hpp>  // 响应对象

// 全局 KeepAlive 指针,用于在信号处理中停止续约
static std::unique_ptr<etcd::KeepAlive> keepalive;
// 运行标志,控制主循环
static bool running = true;

// 信号处理函数:Ctrl+C 或终止信号时调用
void signal_handler(int) {
    std::cout << "\n收到中断信号,正在关闭..." << std::endl;
    // 重置 keepalive,停止心跳,租约自动撤销
    keepalive.reset();
    // 设置标志退出主循环
    running = false;
}

int main() {
    // 注册信号处理函数
    signal(SIGINT, signal_handler);  // Ctrl+C
    signal(SIGTERM, signal_handler); // 终止信号

    // 创建 etcd 客户端,连接到本地 etcd 服务器(默认端口 2379)
    etcd::Client etcd("http://127.0.0.1:2379");

    // 服务实例的 key 前缀和具体 key
    const std::string service_prefix = "/services/my_service/";
    const std::string instance_id = "instance1";
    const std::string key = service_prefix + instance_id;
    // 服务实例的值,通常为 IP:端口
    const std::string value = "192.168.1.100:8080";
    // 租约有效期(秒),即 etcd 在没有心跳时保留 key 的时间
    const int ttl_seconds = 10;

    // 1. 创建租约
    etcd::Response lease_resp = etcd.leasegrant(ttl_seconds).get(); // 阻塞等待结果
    // 检查租约创建是否成功
    if (!lease_resp.is_ok()) {
        std::cerr << "租约创建失败: " << lease_resp.error_message() << std::endl;
        return 1;
    }
    // 获取租约 ID
    int64_t lease_id = lease_resp.value().lease();
    std::cout << "租约创建成功,ID: " << lease_id << std::endl;

    // 2. 启动 KeepAlive 自动续约(后台线程定期发送心跳)
    keepalive = std::make_unique<etcd::KeepAlive>(etcd, lease_id);

    // 3. 将 key 绑定到租约,写入 etcd
    etcd::Response put_resp = etcd.put(key, value, lease_id).get(); // 使用租约 ID
    // 检查写入是否成功
    if (!put_resp.is_ok()) {
        std::cerr << "写入键值失败: " << put_resp.error_message() << std::endl;
        keepalive.reset(); // 清理租约
        return 1;
    }
    std::cout << "服务已注册: " << key << " -> " << value
              << " (TTL=" << ttl_seconds << "秒,租约ID=" << lease_id << ")" << std::endl;

    // 4. 保持运行,等待信号
    std::cout << "按 Ctrl+C 停止并注销服务" << std::endl;
    while (running) {
        std::this_thread::sleep_for(std::chrono::seconds(1)); // 每秒检查一次标志
    }

    // keepalive 已 reset,租约自动撤销,key 自动从 etcd 删除
    std::cout << "服务已注销" << std::endl;
    return 0;
}

我们编译运行一下

首先我们先运行起来下面这个程序

接着我们多执行几次这个

根据你的输出,可以看到前三次运行发现程序时能正确查询到实例 /services/my_service/instance1 -> 192.168.1.100:8080,但之后实例就消失了。

这是完全正常的行为,原因如下:

  • 租约机制:register_service 程序向 etcd 注册实例时,创建了一个 TTL=10 秒的租约,并通过 KeepAlive 持续发送心跳来维持租约有效。只要注册程序保持运行,实例就会一直存在。
  • 程序退出:当你按下 Ctrl+C 终止 register_service 后,KeepAlive 对象析构,租约被撤销,与之关联的 key 会在几秒内被 etcd 自动删除(通常是在租约到期后,即最多 10 秒)。因此,再次运行发现程序时就会显示无实例。

如何正确测试服务注册与发现

  • 保持注册程序运行:在一个终端启动 ./register_service,它会输出“添加数据成功!”并等待你按回车。不要按回车,让程序一直运行。
  • 在另一个终端反复运行发现程序:每次运行 ./discover_service 都应看到实例信息,因为心跳一直在续约。
  • 停止注册程序:回到注册程序的终端,按回车键(或 Ctrl+C)退出。等待几秒后,再运行发现程序,实例就会消失。

示例2(test2)

discovery.cc

#include <etcd/Client.hpp>   // etcd 客户端
#include <etcd/Watcher.hpp>  // 监听器类
#include <etcd/Response.hpp> // 响应类
#include <iostream>          // 标准输入输出

// 监听回调函数,当监听的目录发生变化时被调用。
// 参数 resp 包含了变化事件的详细信息。
void watcherCallback(etcd::Response const& resp) {
    // 检查是否有错误发生
    if (resp.error_code()) {
        std::cout << "Watcher Error:" << resp.error_code();
        std::cout << "-" << resp.error_message() << std::endl;
        return;
    }

    // 遍历所有事件
    for (auto const& ev : resp.events()) {
        // 从事件中获取事件类型
        if (ev.event_type() == etcd::Event::EventType::PUT) {
            // PUT 事件表示有键值对新增或更新
            // 通过当前值查看新增的是什么值
            std::cout << "服务" << ev.kv().key() << "新增主机:";
            std::cout << ev.kv().as_string() << std::endl;
        } else if (ev.event_type() == etcd::Event::EventType::DELETE_) {
            // DELETE_ 事件表示键值对被删除
            // 需要通过发生事件之前的值来了解哪个值被删除了
            std::cout << "服务" << ev.kv().key() << "下线主机:";
            std::cout << ev.prev_kv().as_string() << std::endl;
        }
    }
}

int main() {
    // etcd 服务器地址
    std::string registry_host = "http://127.0.0.1:2379";
    // 要监听的键前缀(服务目录)
    std::string service_key = "/service/user/instance";

    // 创建 etcd 客户端
    etcd::Client etcd(registry_host);

    // 初次先用 ls 获取所有能够提供指定服务的实例信息
    // ls() 会列出以 service_key 为前缀的所有键
    etcd::Response resp = etcd.ls(service_key).get();
    if (resp.is_ok()) {
        // 遍历并打印所有实例
        for (int i = 0; i < resp.keys().size(); i++) {
            std::cout << resp.key(i) << "=" << resp.value(i).as_string() << std::endl;
        }
    } else {
        // 列出失败时输出错误信息
        std::cout << "Get Service Error:" << resp.error_code();
        std::cout << "-" << resp.error_message() << std::endl;
    }

    // 获取当前实例列表后,创建监听对象监控目录内容变化。
    // 参数依次为:etcd 地址、监听的键前缀、回调函数、是否递归监听(true 表示监听该前缀下所有子键)
    etcd::Watcher watcher(registry_host, service_key, watcherCallback, true);

    // 等待用户输入回车,期间 watcher 会在后台监听变化
    getchar();

    // 取消监听(可选,程序结束时会自动析构)
    watcher.Cancel();

    return 0;
}

registry.cc

#include <etcd/Client.hpp>     // etcd 客户端类
#include <etcd/Response.hpp>   // etcd 响应类
#include <etcd/KeepAlive.hpp>  // 租约自动续约类
#include <thread>              // 线程支持(虽然未直接使用,但 KeepAlive 可能依赖)
#include <iostream>            // 标准输入输出(用于 cout, getchar)

int main() 
{
    // etcd 服务器地址,通常为本地默认端口
    std::string registry_host = "http://127.0.0.1:2379";

    // 创建 etcd 客户端对象,用于与 etcd 交互
    etcd::Client etcd(registry_host);

    // 下面使用的是 leasekeepalive,它会自动创建一个带心跳的租约并持续续约。
    // 调用 leasekeepalive(3) 表示创建一个 TTL=3 秒的租约,并立即开始后台续约。
    // get() 会阻塞直到租约创建成功,返回一个 shared_ptr<KeepAlive> 对象。
    std::shared_ptr<etcd::KeepAlive> keepalive = etcd.leasekeepalive(3).get();
    // 从 KeepAlive 对象中获取实际的租约 ID
    auto lease_id = keepalive->Lease();

    // 服务注册的键名,注意这里使用了固定名称 "/service/user/instance"
    std::string service_key = "/service/user/instance";
    // 当前服务实例的实际地址(IP:端口)
    std::string service_host = "112.23.23.120:9090";
    // 将服务键值对与租约绑定,写入 etcd。
    // put() 返回一个 std::future<Response>,调用 get() 等待结果。
    auto resp_task = etcd.put(service_key, service_host, lease_id);
    auto resp = resp_task.get();

    // 检查写入操作是否成功
    if (resp.is_ok() == false) {
        std::cout << "注册失败: " << resp.error_message() << std::endl;
        return -1;  // 返回错误码
    }
    std::cout << "添加数据成功!" << std::endl;

    // 等待用户输入回车,期间 KeepAlive 会持续在后台发送心跳,保持租约有效。
    getchar();

    // 用户按下回车后,主动撤销租约。
    // 租约撤销后,与之关联的所有键值对会自动从 etcd 中删除。
    etcd.leaserevoke(lease_id);

    // 原代码此处多了一个右括号,已修正删除。
    return 0;
}

首先我们先调用registry

然后我们调用./discoverer

示例3

get.cc

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>
#include <thread>

void callback(const etcd::Response &resp) {
    if (resp.is_ok() == false) {
        std::cout << "收到一个错误的事件通知:" << resp.error_message() << std::endl;
        return;
    }
    for (auto const& ev : resp.events()) {
        if (ev.event_type() == etcd::Event::EventType::PUT) {
            std::cout << "服务信息发生了改变:\n" ;
            std::cout << "当前的值:" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;
            std::cout << "原来的值:" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;
        }else if (ev.event_type() == etcd::Event::EventType::DELETE_) {
            std::cout << "服务信息下线被删除:\n";
            std::cout << "当前的值:" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;
            std::cout << "原来的值:" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;
        }
    }
}

int main(int argc, char *argv[])
{
    std::string etcd_host = "http://127.0.0.1:2379";
    //实例化客户端对象
    etcd::Client client(etcd_host);
    //获取指定的键值对信息
    auto resp = client.ls("/service").get();
    if (resp.is_ok() == false) {
        std::cout << "获取键值对数据失败: " << resp.error_message() << std::endl;
        return -1;
    }
    int sz = resp.keys().size();
    for (int i = 0; i < sz; ++i) {
        std::cout << resp.value(i).as_string() << "可以提供" << resp.key(i) << "服务\n";
    }
    //实例化一个键值对事件监控对象
    auto watcher = etcd::Watcher(client, "/service", callback, true);
    watcher.Wait();
    return 0;
}

put.cc

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <thread>

int main(int argc, char *argv[])
{
    // 定义 etcd 服务端地址(默认本地端口)
    std::string etcd_host = "http://127.0.0.1:2379";
    
    // 实例化 etcd 客户端对象
    etcd::Client client(etcd_host);
    
    // 获取租约保活对象,同时创建一个有效期为 3 秒的租约
    auto keep_alive = client.leasekeepalive(3).get();
    
    // 获取租约 ID,后续用于绑定键值对
    auto lease_id = keep_alive->Lease();
    
    // 向 etcd 写入键值对 /service/user,并关联上面创建的租约(带自动过期)
    auto resp1 = client.put("/service/user", "127.0.0.1:8080", lease_id).get();
    if (resp1.is_ok() == false) {
        std::cout << "新增数据失败:" << resp1.error_message() << std::endl;
        return -1;
    }
    
    // 向 etcd 写入另一个键值对 /service/friend,不关联租约(永久有效)
    auto resp2 = client.put("/service/friend", "127.0.0.1:9090").get();
    if (resp2.is_ok() == false) {
        std::cout << "新增数据失败:" << resp2.error_message() << std::endl;
        return -1;
    }
    
    // 主线程等待 10 秒,让租约过期前观察效果(实际应用中通常持续运行)
    std::this_thread::sleep_for(std::chrono::seconds(10));
    
    return 0;
}

我们直接运行

然后put程序结束的时候,我们后面只能查询到下面这个键了

三.封装服务发现与注册功能

3.1.设计思想

etcd作为一个高可用的分布式键值存储系统,虽然本质上是一个通用的KV存储,而非专门的服务注册中心,但我们可以基于它的强大特性(如租约机制、监听机制)进行二次封装,使其适配服务注册与发现的场景。在封装过程中,我们注重降低模块间的耦合,将etcd的通用能力转化为面向服务治理的专用接口,同时保留其灵活性和可扩展性。

1. 服务注册客户端

服务注册客户端负责将服务实例的信息发布到etcd中,并维护其有效性。我们设计的注册客户端类提供以下核心功能:

  • 服务信息注册:提供一个简洁的接口,允许服务在启动时将其元数据(例如服务名、主机地址、端口等)以键值对的形式写入etcd。写入时,客户端会自动为每个服务实例创建租约(lease),确保数据仅在服务在线期间有效。

  • 保活机制:利用etcd的租约续期能力,客户端内部自动定期向etcd发送心跳,维持服务数据的活性。一旦服务进程退出或网络故障导致心跳停止,etcd的租约到期后会自动删除对应的服务信息,从而实现服务下线的自动清理。

通过这种封装,服务提供方只需调用一次注册接口,即可将服务信息可靠地发布到etcd,无需关心底层的租约管理和续期逻辑。

// 服务注册客户端类
    // 功能:向etcd注册服务实例信息,并通过租约保活机制维护服务在线状态
    class Registry
    {
    public:
        using ptr = std::shared_ptr<Registry>; // 智能指针类型别名

        // 构造函数,初始化etcd客户端并创建租约保活对象
        // host: etcd服务器地址(例如 "http://127.0.0.1:2379")
        Registry(const std::string &host) : _client(std::make_shared<etcd::Client>(host)), // 创建etcd客户端
                                            _keep_alive(_client->leasekeepalive(3).get()), // 创建租约保活对象,租约TTL为3秒,这里采用leasekeepalive,会自动续约
                                            _lease_id(_keep_alive->Lease())
        {
        } // 获取租约ID

        // 析构函数,取消租约保活,释放资源
        ~Registry()
        {
            _keep_alive->Cancel(); // 释放租约
        }

        // 注册服务信息到etcd
        // key: 服务实例的键(通常包含服务名和实例标识,如 "/services/user/192.168.1.1:8080")
        // val: 服务实例的值(如IP地址、端口等元数据)
        // 返回值:注册成功返回true,失败返回false
        bool registry(const std::string &key, const std::string &val)
        {
            // 向etcd写入键值对,并关联租约ID(租约到期后自动删除)
            auto resp = _client->put(key, val, _lease_id).get(); // .get()同步等待结果,只有等到这个键值对真正写入这个etcd服务器后才会自动返回
            if (resp.is_ok() == false)
            {
                LOG_ERROR("注册数据失败:{}", resp.error_message()); // 日志记录错误
                return false;
            }
            return true;
        }

    private:
        std::shared_ptr<etcd::Client> _client;        // etcd客户端对象
        std::shared_ptr<etcd::KeepAlive> _keep_alive; // 租约保活对象,负责自动续期,注意我们的_keep_alive的构造是依赖于_client,所以放在这个位置
        uint64_t _lease_id;                           // 租约ID,用于关联注册的数据,注意我们的_lease_id的构造是依赖于_keep_alive,所以放在这个位置
    };

思想极其的简单,就是往etcd里面写入一个键值对,然后进行保活。

核心功能

  • 服务信息注册:通过 registry(key, val) 方法,将服务实例的标识(如 "/services/user/192.168.1.1:8080")和对应的元数据(如 IP、端口)以键值对形式写入 etcd。写入时关联一个租约 ID,确保数据仅在租约有效期内存在。
  • 租约自动保活:在构造 Registry 对象时,客户端会创建一个 TTL 为 3 秒的租约,并启动一个 etcd::KeepAlive 保活对象。该对象定期向 etcd 发送心跳,自动续租,保证服务在线期间租约始终有效。当服务进程退出或网络异常导致心跳停止时,etcd 侧的租约过期,所有关联的键值对会被自动删除,实现服务下线的优雅清理。

2. 服务发现客户端

服务发现客户端用于从etcd获取服务信息,并实时感知服务列表的变化。我们设计的发现客户端类具备以下特性:

  • 根目录配置:提供设置根目录的接口,允许客户端指定一个基础路径(例如 /services/),后续所有操作均在该目录下进行。这样可以方便地管理多个服务或环境(如开发、测试、生产)。

  • 服务查询与监控:客户端能够获取指定根目录下的所有服务信息,并持续监控该目录下的数据变化。当有新的服务实例注册(新增键值对)或现有实例下线(键值对被删除)时,客户端能实时收到通知。

  • 事件回调机制:客户端开放两个回调注册接口,分别对应服务上线事件和服务下线事件。调用方可以传入自定义的处理函数,当监控到数据新增或删除时,这些函数会被自动触发,从而实现业务逻辑的动态响应(例如更新本地负载均衡列表、刷新缓存等)。

这样的设计使得服务消费者能够以事件驱动的方式感知后端服务的变化,同时通过回调解耦了etcd监听逻辑与业务处理逻辑,提高了代码的可维护性和可测试性。

// 服务发现客户端类
    // 功能:从etcd获取服务列表,并监控服务变化(上线/下线)
    class Discovery
    {
    public:
        using ptr = std::shared_ptr<Discovery>;                               // 智能指针类型别名
        using NotifyCallback = std::function<void(std::string, std::string)>; // 回调函数类型,参数为key和value

        // 构造函数,初始化etcd客户端,执行初始服务发现并启动监控
        // host: etcd服务器地址
        // basedir: 服务信息的根目录(例如 "/services"),所有服务都存储在此目录下
        // put_cb: 服务上线回调函数,当有新服务注册时被调用
        // del_cb: 服务下线回调函数,当有服务注销时被调用
        Discovery(const std::string &host,
                  const std::string &basedir,
                  const NotifyCallback &put_cb,
                  const NotifyCallback &del_cb) : _client(std::make_shared<etcd::Client>(host)), // 创建etcd客户端
                                                  _put_cb(put_cb),                               // 传入自定义回调函数
                                                  _del_cb(del_cb)                                // 传入自定义回调函数
        {                                                                                        // 保存回调函数

            // 第一步:服务发现,先获取当前根目录下已有的所有服务信息
            auto resp = _client->ls(basedir).get(); // 列出目录下所有键值对
            if (resp.is_ok() == false)
            {
                LOG_ERROR("获取服务信息数据失败:{}", resp.error_message()); // 失败记录日志
            }
            int sz = resp.keys().size(); // 获取键的数量
            for (int i = 0; i < sz; ++i)
            {
                if (_put_cb) // 如果我们传入了这个put行为的自定义回调函数,我们就调用这个
                {
                    _put_cb(resp.key(i), resp.value(i).as_string()); // 调用上线回调通知每个已有服务
                }
            }

            // 第二步:启动事件监控,监控根目录下数据的改变(新增、删除)
            _watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir,
                                                       std::bind(&Discovery::callback, this, std::placeholders::_1), true); // 注册回调函数,递归监控子目录
        }

        // 析构函数,取消监控,释放资源
        ~Discovery()
        {
            _watcher->Cancel(); // 取消监听
        }

    private:
        // 事件回调函数,当监控的目录下发生数据变化时被调用
        // resp: etcd响应,包含事件列表
        void callback(const etcd::Response &resp)
        {
            if (resp.is_ok() == false)
            {
                LOG_ERROR("收到一个错误的事件通知: {}", resp.error_message()); // 错误通知记录日志
                return;
            }
            // 遍历响应中的所有事件
            for (auto const &ev : resp.events())
            {
                if (ev.event_type() == etcd::Event::EventType::PUT)
                {
                    // PUT事件:新增或修改键值对,视为服务上线
                    if (_put_cb)
                    {
                        _put_cb(ev.kv().key(), ev.kv().as_string());
                    }
                    LOG_DEBUG("新增服务:{}-{}", ev.kv().key(), ev.kv().as_string());
                }
                else if (ev.event_type() == etcd::Event::EventType::DELETE_)
                {
                    // DELETE事件:删除键值对,视为服务下线
                    if (_del_cb)
                    {
                        _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());
                    }
                    LOG_DEBUG("下线服务:{}-{}", ev.prev_kv().key(), ev.prev_kv().as_string());
                }
            }
        }

    private:
        NotifyCallback _put_cb;                  // 服务上线回调,当监控的那个键有了put行为,那么我们就调用这个函数
        NotifyCallback _del_cb;                  // 服务下线回调,当监控的那个键有了del行为,那么我们就调用这个函数
        std::shared_ptr<etcd::Client> _client;   // etcd客户端对象
        std::shared_ptr<etcd::Watcher> _watcher; // 监听器对象,用于监控目录变化
    };

服务发现客户端 Discovery 的设计目标是动态获取 etcd 中注册的服务信息,并实时感知服务实例的上线和下线。其核心逻辑分为两个阶段:初始服务发现和持续事件监控。

1. 初始服务发现

  • 在构造 Discovery 对象时,客户端首先会向 etcd 发起一次查询,列出指定根目录(如 /services)下的所有键值对。
  • 这一步的目的是获取当前已存在的所有服务实例。
  • 对于查询到的每个键值对,客户端会立即调用上层传入的“服务上线回调”(put_cb),将服务信息(key 和 value)通知给使用方。
  • 这样,使用方在启动时就能获得完整的服务列表。

2. 持续事件监控

初始发现完成后,客户端会创建一个 etcd Watcher(观察者),对同一个根目录进行递归监控。Watcher 会长期运行,一旦目录下发生数据变化(如新增键、修改键、删除键),etcd 就会推送事件给客户端。

客户端内部定义了一个回调函数 callback,当 Watcher 接收到事件时,该回调会被触发。回调函数会遍历事件列表,根据事件类型分别处理:

  • PUT 事件:代表有新的键值对写入或已有键被更新。对于服务发现场景,通常视为服务实例上线(或元数据更新)。此时调用 put_cb 通知上层,并记录调试日志。
  • DELETE 事件:代表有键值对被删除,即服务实例下线。此时调用 del_cb 通知上层,并记录调试日志。

通过这种方式,使用方只需提供两个回调函数,就能实时获得服务列表的动态变化。

3. 资源管理

  • Discovery 对象内部持有 etcd 客户端和 Watcher 的智能指针,确保在对象生命周期内网络连接和监控任务持续有效。
  • 析构函数中主动调用 Watcher::Cancel(),停止监控并释放相关资源,避免内存泄漏或无效回调。

整体工作流程

  • 用户创建 Discovery 实例,传入 etcd 地址、服务根目录以及上线/下线回调。
  • 构造函数立即执行初始查询,通过回调通知现有服务。
  • 随后启动 Watcher,开始监听后续变化。
  • 当服务注册或注销时,etcd 推送事件,回调被触发,通过用户提供的回调函数通知业务层。

这种设计将 etcd 的复杂交互(如长轮询、事件解析)封装在类内部,使用者只需关心如何响应服务列表的变化,大大简化了服务发现逻辑。

3.2.代码+测试

etcd.hpp

// 防止头文件重复包含
#pragma once

// 包含etcd客户端相关头文件
#include <etcd/Client.hpp>    // etcd客户端类,提供KV操作接口
#include <etcd/KeepAlive.hpp> // 租约保活管理类
#include <etcd/Response.hpp>  // 响应封装类
#include <etcd/Watcher.hpp>   // 监听器类,用于监控key的变化
#include <etcd/Value.hpp>     // 键值对的值封装
#include <functional>         // 函数对象(回调函数)支持
#include "logger.hpp"         // 日志模块头文件(定义了LOG_*宏)

// 命名空间 IMS,封装即时通讯系统相关的服务注册与发现组件
namespace IMS
{

    // 服务注册客户端类
    // 功能:向etcd注册服务实例信息,并通过租约保活机制维护服务在线状态
    class Registry
    {
    public:
        using ptr = std::shared_ptr<Registry>; // 智能指针类型别名

        // 构造函数,初始化etcd客户端并创建租约保活对象
        // host: etcd服务器地址(例如 "http://127.0.0.1:2379")
        Registry(const std::string &host) : _client(std::make_shared<etcd::Client>(host)), // 创建etcd客户端
                                            _keep_alive(_client->leasekeepalive(3).get()), // 创建租约保活对象,租约TTL为3秒,这里采用leasekeepalive,会自动续约
                                            _lease_id(_keep_alive->Lease())
        {
        } // 获取租约ID

        // 析构函数,取消租约保活,释放资源
        ~Registry()
        {
            _keep_alive->Cancel(); // 释放租约
        }

        // 注册服务信息到etcd
        // key: 服务实例的键(通常包含服务名和实例标识,如 "/services/user/192.168.1.1:8080")
        // val: 服务实例的值(如IP地址、端口等元数据)
        // 返回值:注册成功返回true,失败返回false
        bool registry(const std::string &key, const std::string &val)
        {
            // 向etcd写入键值对,并关联租约ID(租约到期后自动删除)
            auto resp = _client->put(key, val, _lease_id).get(); // .get()同步等待结果,只有等到这个键值对真正写入这个etcd服务器后才会自动返回
            if (resp.is_ok() == false)
            {
                LOG_ERROR("注册数据失败:{}", resp.error_message()); // 日志记录错误
                return false;
            }
            return true;
        }

    private:
        std::shared_ptr<etcd::Client> _client;        // etcd客户端对象
        std::shared_ptr<etcd::KeepAlive> _keep_alive; // 租约保活对象,负责自动续期,注意我们的_keep_alive的构造是依赖于_client,所以放在这个位置
        uint64_t _lease_id;                           // 租约ID,用于关联注册的数据,注意我们的_lease_id的构造是依赖于_keep_alive,所以放在这个位置
    };

    // 服务发现客户端类
    // 功能:从etcd获取服务列表,并监控服务变化(上线/下线)
    class Discovery
    {
    public:
        using ptr = std::shared_ptr<Discovery>;                               // 智能指针类型别名
        using NotifyCallback = std::function<void(std::string, std::string)>; // 回调函数类型,参数为key和value

        // 构造函数,初始化etcd客户端,执行初始服务发现并启动监控
        // host: etcd服务器地址
        // basedir: 服务信息的根目录(例如 "/services"),所有服务都存储在此目录下
        // put_cb: 服务上线回调函数,当有新服务注册时被调用
        // del_cb: 服务下线回调函数,当有服务注销时被调用
        Discovery(const std::string &host,
                  const std::string &basedir,
                  const NotifyCallback &put_cb,
                  const NotifyCallback &del_cb) : _client(std::make_shared<etcd::Client>(host)), // 创建etcd客户端
                                                  _put_cb(put_cb),                               // 传入自定义回调函数
                                                  _del_cb(del_cb)                                // 传入自定义回调函数
        {                                                                                        // 保存回调函数

            // 第一步:服务发现,先获取当前根目录下已有的所有服务信息
            auto resp = _client->ls(basedir).get(); // 列出目录下所有键值对
            if (resp.is_ok() == false)
            {
                LOG_ERROR("获取服务信息数据失败:{}", resp.error_message()); // 失败记录日志
            }
            int sz = resp.keys().size(); // 获取键的数量
            for (int i = 0; i < sz; ++i)
            {
                if (_put_cb) // 如果我们传入了这个put行为的自定义回调函数,我们就调用这个
                {
                    _put_cb(resp.key(i), resp.value(i).as_string()); // 调用上线回调通知每个已有服务
                }
            }

            // 第二步:启动事件监控,监控根目录下数据的改变(新增、删除)
            _watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir,
                                                       std::bind(&Discovery::callback, this, std::placeholders::_1), true); // 注册回调函数,递归监控子目录
        }

        // 析构函数,取消监控,释放资源
        ~Discovery()
        {
            _watcher->Cancel(); // 取消监听
        }

    private:
        // 事件回调函数,当监控的目录下发生数据变化时被调用
        // resp: etcd响应,包含事件列表
        void callback(const etcd::Response &resp)
        {
            if (resp.is_ok() == false)
            {
                LOG_ERROR("收到一个错误的事件通知: {}", resp.error_message()); // 错误通知记录日志
                return;
            }
            // 遍历响应中的所有事件
            for (auto const &ev : resp.events())
            {
                if (ev.event_type() == etcd::Event::EventType::PUT)
                {
                    // PUT事件:新增或修改键值对,视为服务上线
                    if (_put_cb)
                    {
                        _put_cb(ev.kv().key(), ev.kv().as_string());
                    }
                    LOG_DEBUG("新增服务:{}-{}", ev.kv().key(), ev.kv().as_string());
                }
                else if (ev.event_type() == etcd::Event::EventType::DELETE_)
                {
                    // DELETE事件:删除键值对,视为服务下线
                    if (_del_cb)
                    {
                        _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());
                    }
                    LOG_DEBUG("下线服务:{}-{}", ev.prev_kv().key(), ev.prev_kv().as_string());
                }
            }
        }

    private:
        NotifyCallback _put_cb;                  // 服务上线回调,当监控的那个键有了put行为,那么我们就调用这个函数
        NotifyCallback _del_cb;                  // 服务下线回调,当监控的那个键有了del行为,那么我们就调用这个函数
        std::shared_ptr<etcd::Client> _client;   // etcd客户端对象
        std::shared_ptr<etcd::Watcher> _watcher; // 监听器对象,用于监控目录变化
    };

} // namespace IMS

测试代码

// 测试文件:test_etcd_client.cpp
#include "../../common/etcd.hpp" // 包含封装的 Registry 和 Discovery 类
#include <thread>
#include <chrono>
#include <iostream>

/*
测试中使用了两个 Registry 对象分别注册两个服务,以模拟不同服务实例。

Discovery 在构造时立即获取当前已有服务(service1)并触发一次上线回调。

后续注册 service2 时,Discovery 监控到 PUT 事件,再次触发上线回调。

当 registry2 析构时,租约取消,etcd 自动删除 service2 的键,触发 DELETE 事件。

最后 registry1 析构,service1 下线(这一步可能看不到了)。
*/

// 测试配置
const std::string ETCD_HOST = "http://127.0.0.1:2379"; // etcd 服务器地址
const std::string BASE_DIR = "/services/test";         // 服务根目录

// 服务上线回调函数
void on_service_put(const std::string &key, const std::string &val)
{
    std::cout << "[回调] 服务上线: " << key << " -> " << val << std::endl;
}

// 服务下线回调函数
void on_service_del(const std::string &key, const std::string &val)
{
    std::cout << "[回调] 服务下线: " << key << " -> " << val << std::endl;
}

int main()
{
    // 1. 初始化日志(调试模式,输出到控制台)
    IMS::init_logger(false, "", 0); // 调试模式,日志级别 trace
    LOG_INFO("测试程序启动,etcd 地址: {}", ETCD_HOST);

    // 2. 创建服务注册客户端,并注册两个服务
    LOG_INFO("=== 步骤1: 创建第一个服务注册客户端,注册服务 service1 ===");
    IMS::Registry::ptr registry1 = std::make_shared<IMS::Registry>(ETCD_HOST); // 构建第一个服务注册客户端
    std::string key1 = BASE_DIR + "/service1";                                 // 键
    std::string val1 = "192.168.1.100:8080";                                   // 值
    if (registry1->registry(key1, val1))                                       // 借助服务注册客户端的接口往etcd服务器注册键值对
    {
        LOG_INFO("服务1注册成功: {} -> {}", key1, val1);
    }
    else
    {
        LOG_ERROR("服务1注册失败");
        return -1;
    }

    // 等待一下,确保数据已写入
    std::this_thread::sleep_for(std::chrono::seconds(1));

    LOG_INFO("=== 步骤2: 创建服务发现客户端,监控 {} 目录 ===", BASE_DIR);
    IMS::Discovery::ptr discovery = std::make_shared<IMS::Discovery>(
        ETCD_HOST, BASE_DIR, on_service_put, on_service_del); // 构建一个服务发现客户端
    LOG_INFO("服务发现客户端已启动,等待事件...");

    // 等待监控建立(Watcher 初始化可能需要一点时间)
    std::this_thread::sleep_for(std::chrono::seconds(2));

    {
        LOG_INFO("=== 步骤3: 第一个服务注册客户端,注册第二个服务 service2 ===");
        IMS::Registry::ptr registry2 = std::make_shared<IMS::Registry>(ETCD_HOST); // 构建第二个服务注册客户端
        std::string key2 = BASE_DIR + "/service2";                                 // 键
        std::string val2 = "192.168.1.101:8080";                                   // 值
        if (registry2->registry(key2, val2))                                       // 借助服务注册客户端的接口往etcd服务器注册键值对
        {
            LOG_INFO("服务2注册成功: {} -> {}", key2, val2);
        }
        else
        {
            LOG_ERROR("服务2注册失败");
        }

        // 等待事件触发
        std::this_thread::sleep_for(std::chrono::seconds(3));

        LOG_INFO("=== 步骤4: registry2 离开作用域时,其析构函数会取消租约,service2数据自动删除 ===");
    }
    std::this_thread::sleep_for(std::chrono::seconds(5)); // 等待下线事件

    // 注意:registry1 将在 main 函数结束时析构,service1 随之自动下线,
    // 但由于程序即将退出,可能无法观察到对应的下线回调。
    LOG_INFO("=== 步骤5: main 函数即将结束,registry1 析构,service1 将自动下线(可能来不及看到回调) ===");
    LOG_INFO("测试结束");
    return 0;
}
  • 测试中使用了两个 Registry 对象分别注册两个服务,以模拟不同服务实例。
  • Discovery 在构造时立即获取当前已有服务(service1)并触发一次上线回调。
  • 后续注册 service2 时,Discovery 监控到 PUT 事件,再次触发上线回调。
  • 当 registry2 析构时,租约取消,etcd 自动删除 service2 的键,触发 DELETE 事件。
  • 最后 registry1 析构,service1 下线(这一步可能看不到)
root@VM-16-14-ubuntu:~/cpp-chatsystem/code/server/example/etcd/build# ./test_etcd_client
[default-logger][11:33:14][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:39] 测试程序启动,etcd 地址: http://127.0.0.1:2379
[default-logger][11:33:14][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:42] === 步骤1: 创建第一个服务注册客户端,注册服务 service1 ===
[default-logger][11:33:14][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:48] 服务1注册成功: /services/test/service1 -> 192.168.1.100:8080
[default-logger][11:33:15][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:59] === 步骤2: 创建服务发现客户端,监控 /services/test 目录 ===
[回调] 服务上线: /services/test/service1 -> 192.168.1.100:8080
[default-logger][11:33:16][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:62] 服务发现客户端已启动,等待事件...
[default-logger][11:33:18][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:68] === 步骤3: 第一个服务注册客户端,注册第二个服务 service2 ===
[回调] 服务上线: /services/test/service2 -> 192.168.1.101:8080
[default-logger][11:33:18][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:74] 服务2注册成功: /services/test/service2 -> 192.168.1.101:8080
[default-logger][11:33:18][382360][debug   ][/root/cpp-chatsystem/code/server/example/etcd/../../common/etcd.hpp:127] 新增服务:/services/test/service2-192.168.1.101:8080
[default-logger][11:33:21][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:84] === 步骤4: registry2 离开作用域时,其析构函数会取消租约,service2数据自动删除 ===
[回调] 服务下线: /services/test/service2 -> 192.168.1.101:8080
[default-logger][11:33:23][382360][debug   ][/root/cpp-chatsystem/code/server/example/etcd/../../common/etcd.hpp:136] 下线服务:/services/test/service2-192.168.1.101:8080
[default-logger][11:33:26][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:90] === 步骤5: main 函数即将结束,registry1 析构,service1 将自动下线(可能来不及看到回调) ===
[default-logger][11:33:26][382303][info    ][/root/cpp-chatsystem/code/server/example/etcd/test_etcd_client.cpp:91] 测试结束
[warn] watcher does't exit normally
root@VM-16-14-ubuntu:~/cpp-chatsystem/code/server/example/etcd/build# 

非常的完美

转载自CSDN-专业IT技术社区

原文链接:https://blog.csdn.net/2301_80224556/article/details/157657699

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--