mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-18 15:59:01 +08:00
group db tx
This commit is contained in:
@@ -69,10 +69,10 @@ func KickGroupMember(c *gin.Context) {
|
||||
// 默认 全部自动
|
||||
NewRpc(NewApiBind[apistruct.KickGroupMemberReq, apistruct.KickGroupMemberResp](c), "", group.NewGroupClient, group.GroupClient.KickGroupMember).Execute()
|
||||
// 可以自定义编辑请求和响应
|
||||
a := NewRpc(NewApiBind[api_struct.KickGroupMemberReq, api_struct.KickGroupMemberResp](c), "", group.NewGroupClient, group.GroupClient.KickGroupMember)
|
||||
a.Before(func(apiReq *api_struct.KickGroupMemberReq, rpcReq *group.KickGroupMemberReq, bind func() error) error {
|
||||
a := NewRpc(NewApiBind[apistruct.KickGroupMemberReq, apistruct.KickGroupMemberResp](c), "", group.NewGroupClient, group.GroupClient.KickGroupMember)
|
||||
a.Before(func(apiReq *apistruct.KickGroupMemberReq, rpcReq *group.KickGroupMemberReq, bind func() error) error {
|
||||
return bind()
|
||||
}).After(func(rpcResp *group.KickGroupMemberResp, apiResp *api_struct.KickGroupMemberResp, bind func() error) error {
|
||||
}).After(func(rpcResp *group.KickGroupMemberResp, apiResp *apistruct.KickGroupMemberResp, bind func() error) error {
|
||||
return bind()
|
||||
}).Execute()
|
||||
}
|
||||
|
||||
@@ -12,10 +12,6 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
//func GetUsersInfo(ctx context.Context, args ...interface{}) ([]*sdkws.UserInfo, error) {
|
||||
// return nil, errors.New("TODO:GetUserInfo")
|
||||
//}
|
||||
|
||||
func NewUserCheck(zk discoveryRegistry.SvcDiscoveryRegistry) *UserCheck {
|
||||
return &UserCheck{
|
||||
zk: zk,
|
||||
|
||||
@@ -3,6 +3,7 @@ package group
|
||||
import (
|
||||
"Open_IM/internal/common/check"
|
||||
"Open_IM/internal/common/notification"
|
||||
"Open_IM/internal/tx"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db/cache"
|
||||
"Open_IM/pkg/common/db/controller"
|
||||
@@ -44,7 +45,15 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
return err
|
||||
}
|
||||
pbGroup.RegisterGroupServer(server, &groupServer{
|
||||
GroupInterface: controller.NewGroupInterface(controller.NewGroupDatabase(db, redis.GetClient(), mongo.GetClient())),
|
||||
GroupInterface: controller.NewGroupController(
|
||||
relation.NewGroupDB(db),
|
||||
relation.NewGroupMemberDB(db),
|
||||
relation.NewGroupRequest(db),
|
||||
tx.NewGorm(db),
|
||||
tx.NewMongo(mongo.GetClient()),
|
||||
unrelation.NewSuperGroupMongoDriver(mongo.GetClient()),
|
||||
redis.GetClient(),
|
||||
),
|
||||
UserCheck: check.NewUserCheck(client),
|
||||
ConversationChecker: check.NewConversationChecker(client),
|
||||
})
|
||||
@@ -52,7 +61,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
|
||||
}
|
||||
|
||||
type groupServer struct {
|
||||
GroupInterface controller.GroupInterface
|
||||
GroupInterface controller.GroupController
|
||||
UserCheck *check.UserCheck
|
||||
Notification *notification.Check
|
||||
ConversationChecker *check.ConversationChecker
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package tx
|
||||
|
||||
import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func NewGorm(db *gorm.DB) Tx {
|
||||
return &_Gorm{tx: db}
|
||||
}
|
||||
|
||||
type _Gorm struct {
|
||||
tx *gorm.DB
|
||||
}
|
||||
|
||||
func (g *_Gorm) Transaction(fn func(tx any) error) error {
|
||||
return g.tx.Transaction(func(tx *gorm.DB) error {
|
||||
return fn(tx)
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package tx
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
func NewMongo(client *mongo.Client) CtxTx {
|
||||
return &_Mongo{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
type _Mongo struct {
|
||||
client *mongo.Client
|
||||
}
|
||||
|
||||
func (m *_Mongo) Transaction(ctx context.Context, fn func(ctx context.Context) error) error {
|
||||
sess, err := m.client.StartSession()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sCtx := mongo.NewSessionContext(ctx, sess)
|
||||
defer sess.EndSession(sCtx)
|
||||
if err := fn(sCtx); err != nil {
|
||||
_ = sess.AbortTransaction(sCtx)
|
||||
return err
|
||||
}
|
||||
return utils.Wrap(sess.CommitTransaction(sCtx), "")
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package tx
|
||||
|
||||
import "context"
|
||||
|
||||
type Tx interface {
|
||||
Transaction(fn func(tx any) error) error
|
||||
}
|
||||
|
||||
type CtxTx interface {
|
||||
Transaction(ctx context.Context, fn func(ctx context.Context) error) error
|
||||
}
|
||||
Reference in New Issue
Block a user