一个应用对外提供RPC服务时,可能存在多端同时调用,这时RPC服务可能出现过载响应慢等问题
这时候可以开启多个RPC服务提供同一个功能实现负载均衡
这时又会产生新的问题,多个服务实现负载均衡的时候,某个服务宕机了无法及时剔除,新增服务无法及时增加到负载中,然后服务注册和服务发现就可以很好的解决这个问题
服务启动后把自己的连接信息储存到ETCD
服务关闭的时候ETCD中的信息也同步删除
在ETCD中取出目前活跃的服务列表
根据负载均衡规则选择其中一个调用
编写服务注册代码registry.go
先定义两个服务结构体,一个储存服务信息,一个封装注册操作
// ServiceInfo 服务信息
type ServiceInfo struct {
Name string // 服务名称
IP string // 服务地址
}
// Service 封装一个服务结构体
type Service struct {
ServiceInfo ServiceInfo // 储存服务信息
ServiceKey string // ETCD的key
ServerError chan error // 使用chan控制状态
leaseId clientV3.LeaseID // ETCD Lease租约
client *clientV3.Client // ETCD 客户端
}
创建租约并储存服务到ETCD并绑定租约通过KeepAlive续约
// keepAlive 创建租约并储存服务到ETCD并绑定租约通过KeepAlive续约
func (service *Service) keepAlive() (<-chan *clientV3.LeaseKeepAliveResponse, error) {
serviceInfo := &service.ServiceInfo
val, _ := json.Marshal(serviceInfo)
// 创建一个租约
resp, err := service.client.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
return nil, err
}
// 储存服务到ETCD并绑定到租约
_, err = service.client.Put(context.TODO(), service.ServiceKey, string(val), clientV3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
return nil, err
}
service.leaseId = resp.ID
// 续约
return service.client.KeepAlive(context.TODO(), resp.ID)
}
创建销毁和关闭服务方法
// Stop 停止服务注册
func (service *Service) Stop() {
service.ServerError <- nil
}
// revoke 销毁租约
func (service *Service) revoke() error {
_, err := service.client.Revoke(context.TODO(), service.leaseId)
if err != nil {
log.Fatal(err)
}
log.Printf("%s:已停止服务\n", service.ServiceKey)
return err
}
调用对应方法进行服务注册
// StartRegistry 开始服务注册
func (service *Service) StartRegistry() (err error) {
// 注册服务绑定租约
ch, err := service.keepAlive()
if err != nil {
log.Fatal(err)
return
}
for {
select {
case err := <-service.ServerError:
// 服务停止
return err
case <-service.client.Ctx().Done():
// 服务停止
return errors.New("服务停止")
case resp, ok := <-ch:
// 监听租约
if !ok {
// chan 无信息,销毁
return service.revoke()
}
log.Printf("服务正常: %s, TTL:%d", service.ServiceKey, resp.TTL)
}
}
return
}
对外提供一个创建注册服务的方法
// NewRegistryService 创建一个注册服务
func NewRegistryService(info ServiceInfo, endpoints []string) (*Service, error) {
client, err := clientV3.New(clientV3.Config{
Endpoints: endpoints, // ETCD服务地址,支持集群多个
DialTimeout: time.Second * 100, // 超时
})
if err != nil {
log.Fatal(err)
return nil, err
}
// 组装服务信息
service := &Service{
ServiceInfo: info,
ServiceKey: info.Name + "/" + info.IP, // 地址+名称生成key防止集群key冲突
client: client,
}
return service, err
}
编写服务发现代码discovery.go
先定义个结构体封装发现操作
// EtcdClient etcd客户端
type EtcdClient struct {
endpoints []string // ETCD服务地址
kv clientV3.KV // ETCD KEY VALUE储存组件
serverName string // 服务名称
serverIndexKey string // 上一次分配服务的数组下标
client *clientV3.Client // ETCD客户端
ctx context.Context // 上下文
lease clientV3.Lease // ETCD租约
leaseID clientV3.LeaseID // ETCD租约ID
}
和ETCD建立连接创建客户端
// connect 建立连接
func (etcdClient *EtcdClient) connect() (err error) {
etcdClient.client, err = clientV3.New(clientV3.Config{
Endpoints: etcdClient.endpoints,
DialTimeout: 10 * time.Second,
})
if err != nil {
return
}
etcdClient.kv = clientV3.NewKV(etcdClient.client)
etcdClient.ctx = context.Background()
return
}
根据前缀读取ETCD里的数据
// getList 根据前缀去etcd获取服务列表
func (etcdClient *EtcdClient) getList(prefix string) ([][]byte, error) {
resp, err := etcdClient.kv.Get(etcdClient.ctx, prefix, clientV3.WithPrefix())
if err != nil {
return nil, err
}
servers := make([][]byte, 0)
for _, value := range resp.Kvs {
if value != nil {
servers = append(servers, value.Value)
}
}
return servers, nil
}
一个简单的实现轮训方式负载均衡的方法和关闭方法
// getServerIndex 轮训方式实现负载均衡
func (etcdClient *EtcdClient) getServerIndex(len int) int {
len = len - 1
serverIndex := 0
defer func() {
etcdClient.client.Put(etcdClient.ctx, etcdClient.serverIndexKey, strconv.Itoa(serverIndex))
}()
// 轮训方式访问
serverIndexRes, err := etcdClient.client.Get(etcdClient.ctx, etcdClient.serverIndexKey)
if err != nil || (serverIndexRes.Count == 0) {
return serverIndex
}
etcdIndex, err := strconv.Atoi(string(serverIndexRes.Kvs[0].Value))
if err != nil {
return serverIndex
}
if etcdIndex < len {
serverIndex = etcdIndex + 1
}
return serverIndex
}
// Close 关闭连接
func (etcdClient *EtcdClient) Close() (err error) {
return etcdClient.client.Close()
}
根据负载均衡方法返回的下标取服务并返回
// GetServer 获取一个活跃的服务并返回
func (etcdClient *EtcdClient) GetServer() ([]byte, error) {
servers, err := etcdClient.getList(etcdClient.serverName)
if err != nil || len(servers) == 0 {
return nil, errors.New("服务已全部停止")
}
index := etcdClient.getServerIndex(len(servers))
return servers[index], nil
}
对外提供一个创建客户端的方法
// NewEtcdClient 创建etcd服务发现客户端
func NewEtcdClient(endpoints []string, serverName string) *EtcdClient {
var client = &EtcdClient{
ctx: context.Background(),
endpoints: endpoints,
serverName: serverName,
serverIndexKey: "index-" + serverName, // 别用serverName开头,不然就冲突和服务冲突了
}
err := client.connect()
if err != nil {
panic(err)
}
return client
}
后续RPC服务和HTTP服务代码来自:一个简单的Golang RPC服务
一个简单的Golang RPC服务
改写原来的RPC服务增加服务注册改端口为3001
func StartServer() {
server := grpc.NewServer()
// 注册 RegisterSpeakServer
goProtocol.RegisterSpeakServer(server, &Speak{})
serverPort := ":3001"
address, err := net.Listen("tcp", serverPort)
if err != nil {
log.Fatalf(err.Error())
return
}
// 启动服务注册
go func() {
service, err := registry.NewRegistryService(registry.ServiceInfo{
Name: "speak",
IP: "127.0.0.1" + serverPort,
}, []string{"127.0.0.1:2379"})
if err != nil {
panic(err)
return
}
err = service.StartRegistry()
if err != nil {
panic(err)
return
}
}()
log.Println("已启动服务端...")
if err := server.Serve(address); err != nil {
panic(err)
}
}
// SayOk 实现了 SpeakServer 接口中定义的 SayOk 方法
func (*Speak) SayOk(ctx context.Context, req *goProtocol.Request) (*goProtocol.Response, error) {
fmt.Println("调用了SayOk1")
return &goProtocol.Response{
Output: "调用了SayOk1:" + req.Input,
}, nil
}
复制一份改为server2.go 改端口为3002并一起启动,控制台可以看到服务注册在不断续约
修改客户端代码增加服务发现功能
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 连接ETCD
client := discovery.NewEtcdClient([]string{"127.0.0.1:2379"}, "speak")
defer client.Close()
// 服务发现方式获取活跃服务
address, err := client.GetServer()
if err != nil {
fmt.Println(err.Error())
return
}
serverInfo := registry.ServiceInfo{}
// 解析服务信息
err = json.Unmarshal(address, &serverInfo)
if err != nil {
fmt.Println(err.Error())
return
}
// 调用对应的服务
res := StartClient(serverInfo.IP)
fmt.Println(res)
// 返回到浏览器
w.Write([]byte(res))
})
err := http.ListenAndServe(":8080", nil)
if err != nil {
fmt.Println(err.Error())
return
}
}
启动客户端监听8080端口,使用接口调试工具不断请求127.0.0.1:8080
// 会轮训返回不同信息
调用了SayOk1:调用传递参数:127.0.0.1:3001
调用了SayOk2:调用传递参数:127.0.0.1:3002
调用了SayOk1:调用传递参数:127.0.0.1:3001
调用了SayOk2:调用传递参数:127.0.0.1:3002
....
// 停止服务2会不断返回
调用了SayOk1:调用传递参数:127.0.0.1:3001
调用了SayOk1:调用传递参数:127.0.0.1:3001
调用了SayOk1:调用传递参数:127.0.0.1:3001
// 再次开启服务2恢复轮训返回不同信息
.....
使用浏览器访问可能会出现一直请求同一个服务的情况
那是因为浏览器会自动请求一个 /favicon.ico
导致服务请求了两次,负载均衡轮训回去了,实际上是正常的
评论 (0)