Golang+ETCD的服务注册和发现以及RPC服务的负载均衡

Golang+ETCD的服务注册和发现以及RPC服务的负载均衡

远航
2022-11-25 / 0 评论 / 466 阅读 / 正在检测是否收录...



编写服务注册代码registry.go

先定义两个服务结构体,一个储存服务信息,一个封装注册操作
// ServiceInfo 服务信息
type ServiceInfo struct {
    Name string // 服务名称
    IP   string // 服务地址
}

// Service 封装一个服务结构体
type Service struct {
    ServiceInfo ServiceInfo      // 储存服务信息
    ServiceKey  string           // ETCD的key
    ServerError chan error       // 使用chan控制状态
    leaseId     clientV3.LeaseID // ETCD Lease租约
    client      *clientV3.Client // ETCD 客户端
}
创建租约并储存服务到ETCD并绑定租约通过KeepAlive续约
// keepAlive 创建租约并储存服务到ETCD并绑定租约通过KeepAlive续约
func (service *Service) keepAlive() (<-chan *clientV3.LeaseKeepAliveResponse, error) {
    serviceInfo := &service.ServiceInfo
    val, _ := json.Marshal(serviceInfo)

    // 创建一个租约
    resp, err := service.client.Grant(context.TODO(), 5)
    if err != nil {
        log.Fatal(err)
        return nil, err
    }

    // 储存服务到ETCD并绑定到租约
    _, err = service.client.Put(context.TODO(), service.ServiceKey, string(val), clientV3.WithLease(resp.ID))
    if err != nil {
        log.Fatal(err)
        return nil, err
    }
    service.leaseId = resp.ID
    // 续约
    return service.client.KeepAlive(context.TODO(), resp.ID)
}
创建销毁和关闭服务方法
// Stop 停止服务注册
func (service *Service) Stop() {
    service.ServerError <- nil
}

// revoke 销毁租约
func (service *Service) revoke() error {
    _, err := service.client.Revoke(context.TODO(), service.leaseId)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("%s:已停止服务\n", service.ServiceKey)
    return err
}
调用对应方法进行服务注册
// StartRegistry 开始服务注册
func (service *Service) StartRegistry() (err error) {
    // 注册服务绑定租约
    ch, err := service.keepAlive()
    if err != nil {
        log.Fatal(err)
        return
    }
    for {
        select {
        case err := <-service.ServerError:
            // 服务停止
            return err
        case <-service.client.Ctx().Done():
            // 服务停止
            return errors.New("服务停止")
        case resp, ok := <-ch:
            // 监听租约
            if !ok {
                // chan 无信息,销毁
                return service.revoke()
            }
            log.Printf("服务正常: %s, TTL:%d", service.ServiceKey, resp.TTL)
        }
    }

    return
}
对外提供一个创建注册服务的方法
// NewRegistryService 创建一个注册服务
func NewRegistryService(info ServiceInfo, endpoints []string) (*Service, error) {
    client, err := clientV3.New(clientV3.Config{
        Endpoints:   endpoints,         // ETCD服务地址,支持集群多个
        DialTimeout: time.Second * 100, // 超时
    })

    if err != nil {
        log.Fatal(err)
        return nil, err
    }

    // 组装服务信息
    service := &Service{
        ServiceInfo: info,
        ServiceKey:  info.Name + "/" + info.IP, // 地址+名称生成key防止集群key冲突
        client:      client,
    }
    return service, err
}

编写服务发现代码discovery.go

先定义个结构体封装发现操作
// EtcdClient etcd客户端
type EtcdClient struct {
    endpoints      []string         // ETCD服务地址
    kv             clientV3.KV      // ETCD KEY VALUE储存组件
    serverName     string           // 服务名称
    serverIndexKey string           // 上一次分配服务的数组下标
    client         *clientV3.Client // ETCD客户端
    ctx            context.Context  // 上下文
    lease          clientV3.Lease   // ETCD租约
    leaseID        clientV3.LeaseID // ETCD租约ID
}
和ETCD建立连接创建客户端
// connect 建立连接
func (etcdClient *EtcdClient) connect() (err error) {
    etcdClient.client, err = clientV3.New(clientV3.Config{
        Endpoints:   etcdClient.endpoints,
        DialTimeout: 10 * time.Second,
    })
    if err != nil {
        return
    }
    etcdClient.kv = clientV3.NewKV(etcdClient.client)
    etcdClient.ctx = context.Background()
    return
}
根据前缀读取ETCD里的数据
// getList 根据前缀去etcd获取服务列表
func (etcdClient *EtcdClient) getList(prefix string) ([][]byte, error) {
    resp, err := etcdClient.kv.Get(etcdClient.ctx, prefix, clientV3.WithPrefix())
    if err != nil {
        return nil, err
    }
    servers := make([][]byte, 0)
    for _, value := range resp.Kvs {
        if value != nil {
            servers = append(servers, value.Value)
        }
    }
    return servers, nil
}
一个简单的实现轮训方式负载均衡的方法和关闭方法
// getServerIndex 轮训方式实现负载均衡
func (etcdClient *EtcdClient) getServerIndex(len int) int {
    len = len - 1
    serverIndex := 0
    defer func() {
        etcdClient.client.Put(etcdClient.ctx, etcdClient.serverIndexKey, strconv.Itoa(serverIndex))
    }()
    // 轮训方式访问
    serverIndexRes, err := etcdClient.client.Get(etcdClient.ctx, etcdClient.serverIndexKey)
    if err != nil || (serverIndexRes.Count == 0) {
        return serverIndex
    }
    etcdIndex, err := strconv.Atoi(string(serverIndexRes.Kvs[0].Value))
    if err != nil {
        return serverIndex
    }
    if etcdIndex < len {
        serverIndex = etcdIndex + 1
    }
    return serverIndex
}

// Close 关闭连接
func (etcdClient *EtcdClient) Close() (err error) {
    return etcdClient.client.Close()
}
根据负载均衡方法返回的下标取服务并返回
// GetServer 获取一个活跃的服务并返回
func (etcdClient *EtcdClient) GetServer() ([]byte, error) {
    servers, err := etcdClient.getList(etcdClient.serverName)
    if err != nil || len(servers) == 0 {
        return nil, errors.New("服务已全部停止")
    }
    index := etcdClient.getServerIndex(len(servers))
    return servers[index], nil
}
对外提供一个创建客户端的方法
// NewEtcdClient 创建etcd服务发现客户端
func NewEtcdClient(endpoints []string, serverName string) *EtcdClient {
    var client = &EtcdClient{
        ctx:            context.Background(),
        endpoints:      endpoints,
        serverName:     serverName,
        serverIndexKey: "index-" + serverName, // 别用serverName开头,不然就冲突和服务冲突了
    }
    err := client.connect()
    if err != nil {
        panic(err)
    }
    return client
}

后续RPC服务和HTTP服务代码来自:一个简单的Golang RPC服务

一个简单的Golang RPC服务

改写原来的RPC服务增加服务注册改端口为3001
func StartServer() {
    server := grpc.NewServer()
    // 注册 RegisterSpeakServer
    goProtocol.RegisterSpeakServer(server, &Speak{})
    serverPort := ":3001"
    address, err := net.Listen("tcp", serverPort)
    if err != nil {
        log.Fatalf(err.Error())
        return
    }

    // 启动服务注册
    go func() {
        service, err := registry.NewRegistryService(registry.ServiceInfo{
            Name: "speak",
            IP:   "127.0.0.1" + serverPort,
        }, []string{"127.0.0.1:2379"})
        if err != nil {
            panic(err)
            return
        }
        err = service.StartRegistry()
        if err != nil {
            panic(err)
            return
        }
    }()

    log.Println("已启动服务端...")

    if err := server.Serve(address); err != nil {
        panic(err)
    }
}

// SayOk 实现了 SpeakServer 接口中定义的 SayOk 方法
func (*Speak) SayOk(ctx context.Context, req *goProtocol.Request) (*goProtocol.Response, error) {
    fmt.Println("调用了SayOk1")
    return &goProtocol.Response{
        Output: "调用了SayOk1:" + req.Input,
    }, nil
}
复制一份改为server2.go 改端口为3002并一起启动,控制台可以看到服务注册在不断续约

iShot_2022-11-25_16.38.37.png
iShot_2022-11-25_16.38.24.png

修改客户端代码增加服务发现功能
func main() {
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        // 连接ETCD
        client := discovery.NewEtcdClient([]string{"127.0.0.1:2379"}, "speak")
        defer client.Close()
        // 服务发现方式获取活跃服务
        address, err := client.GetServer()
        if err != nil {
            fmt.Println(err.Error())
            return
        }
        serverInfo := registry.ServiceInfo{}
        // 解析服务信息
        err = json.Unmarshal(address, &serverInfo)
        if err != nil {
            fmt.Println(err.Error())
            return
        }
        // 调用对应的服务
        res := StartClient(serverInfo.IP)
        fmt.Println(res)
        // 返回到浏览器
        w.Write([]byte(res))
    })
    err := http.ListenAndServe(":8080", nil)
    if err != nil {
        fmt.Println(err.Error())
        return
    }
}
启动客户端监听8080端口,使用接口调试工具不断请求127.0.0.1:8080
// 会轮训返回不同信息
调用了SayOk1:调用传递参数:127.0.0.1:3001
调用了SayOk2:调用传递参数:127.0.0.1:3002
调用了SayOk1:调用传递参数:127.0.0.1:3001
调用了SayOk2:调用传递参数:127.0.0.1:3002
....
// 停止服务2会不断返回
调用了SayOk1:调用传递参数:127.0.0.1:3001
调用了SayOk1:调用传递参数:127.0.0.1:3001
调用了SayOk1:调用传递参数:127.0.0.1:3001
// 再次开启服务2恢复轮训返回不同信息
.....
使用浏览器访问可能会出现一直请求同一个服务的情况
那是因为浏览器会自动请求一个 /favicon.ico
导致服务请求了两次,负载均衡轮训回去了,实际上是正常的
0

评论 (0)

取消