为什么需要异步通信?
在我们预设定的接口中,我们需要完成一个重置密码的功能。基本流程为,用户提交需要重置密码的邮箱,系统接收到后向邮箱发送一则消息,用户点击邮箱中带有加密信息的邮件再次向系统发起请求,系统通过验证后重置用户的密码。在这一个流程当中,发送邮件是一个耗时操作,如果采用同步的方式,一方面这会导致大量的请求浪费(因为要监听状态需要发起轮询请求),另一方面会导致接口数量不断增长变得臃肿,另外,对一些耗时操作同步请求会影响用户体验。基于上面的种种原因,我们有必要为系统接入基于事件异步通信,这样不仅为系统带来解耦,同时可以基于消息队列进行多个订阅处理,从而提高系统的运行效率。在go-micro中,我们可以通过broker组件来实现上述的异步通信。这里我们选择go-micro插件支持rabbitmq作为broker的驱动。
docker-compose安装rabbitmq
.env中添加配置信息
...#设置rabbitmq镜像版本RABBITMQ_VERSION=3.8.3-management#rabbitmq默认用户名称RABBITMQ_USER=root#rabbitmq默认密码RABBTIMQ_PASSWORD=root...
修改docker-compose.yaml
micro-rabbitmq:image: rabbitmq:${RABBITMQ_VERSION}restart: alwaysports:- 15672:15672- 5672:5672environment:- RABBITMQ_DEFAULT_USER=${RABBITMQ_USER}- RABBITMQ_DEFAULT_PASS=${RABBTIMQ_PASSWORD}networks:- micro-network
检查rabbitmq是否正常运行
检查容器是否正常运行
访问rabbitmq可视化管理界面
打开http://127.0.0.1:15672输入配置的用户名密码
编写重置密码服务
创建重置密码记录模型
touch pkg/model/password.go
package modelimport (db "github.com/869413421/micro-service/common/pkg/db")// PasswordReset 重置密码模型type PasswordReset struct {db.BaseModelToken string `gorm:"column:token;type:varchar(255) not null;index" `Email string `gorm:"column:email;type:varchar(255) not null;index" valid:"email"`Verify int8 `gorm:"column:verify;type:tinyint(1);not null;default:0"`}// Store 创建重置记录func (model *PasswordReset) Store() (err error) {result := db.GetDB().Create(&model)err = result.Errorif err != nil {return err}return nil}// Delete 删除数据库重置记录func (model *PasswordReset) Delete() (rowsAffected int64, err error) {result := db.GetDB().Delete(&model)err = result.Errorif err != nil {return 0, err}rowsAffected = result.RowsAffectedreturn rowsAffected, nil}// Update 更新数据库重置记录func (model *PasswordReset) Update() (rowsAffected int64, err error) {result := db.GetDB().Save(&model)err = result.Errorif err != nil {return 0, err}rowsAffected = result.RowsAffectedreturn rowsAffected, nil}
创建重置密码模型数据库交互层
touch pkg/repo/password.go
package repoimport (db "github.com/869413421/micro-service/common/pkg/db""github.com/869413421/micro-service/user/pkg/model""gorm.io/gorm")//PasswordRestRepositoryInterface 重置记录操作接口type PasswordRestRepositoryInterface interface {GetByEmail(email string) (*model.PasswordReset, error)GetByToken(token string) (*model.PasswordReset, error)}//PasswordResetRepository 重置记录操作仓库type PasswordResetRepository struct {DB *gorm.DB}// NewPasswordResetRepository 新建仓库func NewPasswordResetRepository() PasswordRestRepositoryInterface {return &PasswordResetRepository{DB: db.GetDB()}}// GetByEmail 根据邮件获取func (repo *PasswordResetRepository) GetByEmail(email string) (*model.PasswordReset, error) {passwordReset := &model.PasswordReset{}err := repo.DB.Where("email = ?", email).First(&passwordReset).Errorif err != nil {return nil, err}return passwordReset, nil}// GetByToken 根据token获取func (repo *PasswordResetRepository) GetByToken(token string) (*model.PasswordReset, error) {passwordReset := &model.PasswordReset{}err := repo.DB.Where("token = ?", token).First(&passwordReset).Errorif err != nil {return nil, err}return passwordReset, nil}
定义protobuf
在proto/user/user.proto添加相应的定义
...service UserService {rpc Pagination(PaginationRequest) returns(PaginationResponse){}rpc Get(GetRequest) returns(UserResponse){}rpc Create(CreateRequest) returns(UserResponse){}rpc Update(UpdateRequest) returns(UserResponse){}rpc Delete(DeleteRequest) returns(UserResponse){}rpc Auth(AuthRequest) returns(TokenResponse){}rpc ValidateToken(TokenRequest) returns(TokenResponse){}rpc CreatePasswordReset(CreatePasswordResetRequest) returns(PasswordReset){}rpc ResetPassword(ResetPasswordRequest) returns(ResetPasswordResponse){}}......// PasswordReset 重置密码记录message PasswordReset{uint64 id = 1;string email = 2;string token = 3;string create_at = 4;}// CreatePasswordResetRequest 创建重置密码请求message CreatePasswordResetRequest{string email = 1;}// ResetPasswordRequest 重置密码请求message ResetPasswordRequest{string token = 1 ;}// ResetPasswordResponse 重置密码相应message ResetPasswordResponse{bool resetSuccess = 1;string newPassword = 2;}...
生成代码
make proto
编写密码重置业务代码
编写一个生成随机字符串方法,用于生成用户新密码
打开common项目
mkdir pkg/stringtouch pkg/string/string.go
package stringimport ("math/rand""time")// RandString 生成随机字符串func RandString(len int) string {r := rand.New(rand.NewSource(time.Now().UnixNano()))bytes := make([]byte, len)for i := 0; i < len; i++ {b := r.Intn(26) + 65bytes[i] = byte(b)}return string(bytes)}
使用事务进行密码重置
touch service/password.go
package serviceimport ("errors""github.com/869413421/micro-service/common/pkg/db"string2 "github.com/869413421/micro-service/common/pkg/string""github.com/869413421/micro-service/user/pkg/repo""gorm.io/gorm""time")// PasswordResetServiceInterface 重置密码业务接口type PasswordResetServiceInterface interface {Reset(token string) (string, error)}// PasswordResetService 重置密码业务type PasswordResetService struct {UserRepo repo.UserRepositoryInterfacePasswordResetRepo repo.PasswordRestRepositoryInterface}// NewPasswordResetService 创建业务层func NewPasswordResetService() PasswordResetServiceInterface {return &PasswordResetService{UserRepo: repo.NewUserRepository(),PasswordResetRepo: repo.NewPasswordResetRepository(),}}// Reset 重置密码,返回新的密码func (srv *PasswordResetService) Reset(token string) (string, error) {//1.根据token获取密码重置记录passwordReset, err := srv.PasswordResetRepo.GetByToken(token)if err != nil {return "", err}//2.比较时间,查看邮件是否已经超时或已验证if passwordReset.Verify == 1 {return "", errors.New("the record has been verified")}d, _ := time.ParseDuration("+5m")expTime := passwordReset.CreatedAt.Add(d)if time.Now().After(expTime) {return "", errors.New("verify that the message has expired")}//3.获取用户更新密码(使用事務)newPassword := string2.RandString(8)db := db.GetDB()err = db.Transaction(func(tx *gorm.DB) error {user, err := srv.UserRepo.GetByEmail(passwordReset.Email)if err != nil {return err}user.Password = newPasswordresult := tx.Debug().Save(&user)err = result.Errorif err != nil {return err}rowsAffected := result.RowsAffectedif rowsAffected == 0 {return errors.New("update password fail")}//4.更新重置记录passwordReset.Verify = 1result = tx.Debug().Save(&passwordReset)err = result.Errorif err != nil {return err}rowsAffected = result.RowsAffectedif rowsAffected == 0 {return errors.New("update password fail")}return nil})if err != nil {return "", err}return newPassword, nil}
编写服务处理器代码
修改初始化依赖
...//UserServiceHandler 用户服务处理器type UserServiceHandler struct {UserRepo repo.UserRepositoryInterfaceTokenService service.AuthblePasswordService service.PasswordResetServiceInterface}// NewUserServiceHandler 用户服务初始化func NewUserServiceHandler() *UserServiceHandler {return &UserServiceHandler{UserRepo: repo.NewUserRepository(),TokenService: service.NewTokenService(),PasswordService: service.NewPasswordResetService(),}}...
增加创建重置记录,重置方法
...// CreatePasswordReset 创建密码重置记录func (srv *UserServiceHandler) CreatePasswordReset(ctx context.Context, req *pb.CreatePasswordResetRequest, rsp *pb.PasswordReset) error {//1.获取提交邮箱,检查用户是否存在_, err := srv.UserRepo.GetByEmail(req.Email)if err != nil {return errors.NotFound("User.CreatePasswordReset.GetByEmail.Error", "user not found ,check you email")}passwordReset := &model.PasswordReset{}types.Fill(passwordReset, req)//2.生成md5保存到数据库passwordReset.Token = password.Md5Str(req.Email + time.Now().String())err = passwordReset.Store()if err != nil {return err}//3.返回响应信息types.Fill(rsp, passwordReset)return nil}// ResetPassword 重置密码func (srv *UserServiceHandler) ResetPassword(ctx context.Context, req *pb.ResetPasswordRequest, rsp *pb.ResetPasswordResponse) error {//1.执行重置逻辑newPassword, err := srv.PasswordService.Reset(req.Token)if err != nil {return err}//2.返回新密码rsp.ResetSuccess = truersp.NewPassword = newPasswordreturn nil}...
接入borker,编写订阅发布业务代码
上面的代码已经完成了创建密码重置记录以及密码重置等功能,但中间基于异步通信的发布消息,订阅消费(发送邮件)代码还没有实现。
使用rabbitmq作为go-microborker组件
修改docker-compose.yaml
通过环境变量为go-micro指定borker
...micro-user-service:depends_on: # 启动依赖,需要等etcd集群启动后才启动当前容器- etcd1- etcd2- etcd3- micro-user-dbbuild: ./user # dockerfile所在目录environment:TZ: ${TZ}MICRO_SERVER_ADDRESS: ":9091" # 服务端口MICRO_REGISTRY: "etcd" # 注册中心类型MICRO_REGISTRY_ADDRESS: "etcd1:2379,etcd2:2379,etcd3:2379" # 注册中心集群地址DB_HOST: "micro-user-db:3306"DB_DATABASE: ${USER_DB_DATABASE}DB_USER: ${USER_DB_USER}DB_PASSWORD: ${USER_DB_PASSWORD}DB_MAX_CONNECTIONS: ${USER_DB_MAX_CONNECTIONS}DB_MAX_IDE_CONNECTIONS: ${USER_DB_MAX_IDE_CONNECTIONS}DB_CONNECTIONS_MAX_LIFE_TIME: ${USER_DB_CONNECTIONS_MAX_LIFE_TIME}MICRO_BROKER: "rabbitmq"MICRO_BROKER_ADDRESS: "amqp://${RABBITMQ_USER}:${RABBTIMQ_PASSWORD}@micro-rabbitmq:5672"restart: alwaysports:- 9092:9091volumes:- ./user:/appnetworks:- micro-network...
获取rabbitmq插件包
go get -u github.com/micro/go-plugins/broker/rabbitmq/v2
修改plugin.go
package mainimport (// rabbitmq broker_ "github.com/micro/go-plugins/broker/rabbitmq/v2")
修改makefile
....PHONY: buildbuild: protoCGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-w' -i -o micro-user-service ./main.go ./plugin.go...
通过上述步骤,基于插件机制使rabbitmq成为go-microborker组件的默认驱动。
封装container,Json包
考虑到整个程序的生命周期中,我们有许多对象需要全局使用,我们定义一个容器包将对象存储到当中,在需要时再从容器中取出使用。
打开common项目
mkdir pkg/containertouch pkg/container/service.go
package containerimport ("github.com/micro/go-micro/v2""github.com/micro/go-micro/v2/broker")var service micro.Service// SetService 设置服务实例func SetService(srv micro.Service) {service = srv}// GetService 返回服务实例func GetService() micro.Service {return service}// GetServiceBroker 返回服务Broker实例func GetServiceBroker() broker.Broker {return service.Options().Broker}
mkdir pkg/encodertouch pkg/encoder/encoder.go
package encoderimport jsoniter "github.com/json-iterator/go"var JsonHandler jsoniter.APIfunc init() {JsonHandler = jsoniter.ConfigCompatibleWithStandardLibrary}
基于事件编写发布代码
touch pkg/model/password_hooks.go
package modelimport ("fmt""github.com/869413421/micro-service/common/pkg/container""github.com/869413421/micro-service/common/pkg/encoder""github.com/micro/go-micro/v2/broker""gorm.io/gorm")var createTopic = "create.password.reset"// AfterCreate 创建成功后func (model *PasswordReset) AfterCreate(tx *gorm.DB) (err error) {if model.Email != "" {err := pushCreateEvent(model)if err != nil {return err}}return nil}// pushCreateEvent 推送创建消息func pushCreateEvent(model *PasswordReset) error {//1.获取发布连接publisher := container.GetServiceBroker()//2.构建broker消息body, err := encoder.JsonHandler.Marshal(model)if err != nil {return err}msg := &broker.Message{Header: map[string]string{"email": model.Email,},Body: body,}//3.发送消息到mqerr = publisher.Publish(createTopic, msg)if err != nil {return err} else {fmt.Println("[pub] pubbed message:", string(msg.Body))}return nil}
在创建密码重置记录成功后出发了模型事件,这时候将消息推送到rabbitmq,完成消息发布流程
编写订阅代码
touch subscriber/password.go
package subscriberimport ("fmt""github.com/micro/go-micro/v2/broker")// 重置密码事件const createPasswordResetTopic = "create.password.reset"// EventSubscriberInterface 事件订阅者启动器接口type EventSubscriberInterface interface {Subscriber() error}// EventSubscriber 事件订阅者启动器type EventSubscriber struct {Broker broker.Broker}// NewEventSubscriber 创建事件订阅启动器func NewEventSubscriber(brk broker.Broker) EventSubscriberInterface {return &EventSubscriber{Broker: brk}}// Subscriber 启动订阅func (subscriber EventSubscriber) Subscriber() error {//1.重置密码事件订阅_, err := subscriber.Broker.Subscribe(createPasswordResetTopic, func(event broker.Event) error {// TODO 发送邮件fmt.Println("[sub] received message:", string(event.Message().Body), "header", event.Message().Header)return nil}, broker.Queue(createPasswordResetTopic), broker.DisableAutoAck())if err != nil {return err}return nil}
修改main.go启动订阅
package mainimport ("github.com/869413421/micro-service/common/pkg/container""github.com/869413421/micro-service/common/pkg/db""github.com/869413421/micro-service/user/handler""github.com/869413421/micro-service/user/pkg/model""github.com/869413421/micro-service/user/subscriber""github.com/micro/go-micro/v2"log "github.com/micro/go-micro/v2/logger"proto "github.com/869413421/micro-service/user/proto/user")func main() {// 1.准备数据库连接,并且执行数据库迁移db := db.GetDB()db.AutoMigrate(&model.User{})db.AutoMigrate(&model.PasswordReset{})// 2.创建服务service := micro.NewService(micro.Name("micro.service.user"),micro.Version("v1"),)// 3.初始化服务,全局化service对象service.Init()container.SetService(service)// 4.初始化borkerbrk := service.Options().Brokerdefer brk.Disconnect()err := brk.Init()if err != nil {log.Fatal(err)return}err = brk.Connect()if err != nil {log.Fatal("connection broker error:", err)return}// 5.订阅事件eventSubscriber := subscriber.NewEventSubscriber(brk)err = eventSubscriber.Subscriber()if err != nil {log.Fatal("subscriber broker error:", err)return}// 6.注册服务处理器proto.RegisterUserServiceHandler(service.Server(), handler.NewUserServiceHandler())// 7.运行服务if err := service.Run(); err != nil {log.Fatal(err)}}
测试重置密码相关接口
编译代码,然后重启容器
make builddocker-compose up -d micro-user-service
调用了创建重置密码的记录后我们可以看到发布订阅代码中打印的相关信息都显示了
拿到日志中的token调用重置接口
接口返回新生成的密码。
