mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-04 17:15:58 +08:00
s3 minio, cos, oss support
This commit is contained in:
+119
-20
@@ -2,36 +2,135 @@ package third
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/cont"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/third"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (t *thirdServer) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*third.ApplyPutResp, error) {
|
||||
return t.s3dataBase.ApplyPut(ctx, req)
|
||||
func (t *thirdServer) PartSize(ctx context.Context, req *third.PartSizeReq) (*third.PartSizeResp, error) {
|
||||
size, err := t.s3dataBase.PartSize(ctx, req.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &third.PartSizeResp{Size: size}, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) GetPut(ctx context.Context, req *third.GetPutReq) (*third.GetPutResp, error) {
|
||||
return t.s3dataBase.GetPut(ctx, req)
|
||||
}
|
||||
|
||||
func (t *thirdServer) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (*third.ConfirmPutResp, error) {
|
||||
return t.s3dataBase.ConfirmPut(ctx, req)
|
||||
}
|
||||
|
||||
func (t *thirdServer) GetUrl(ctx context.Context, req *third.GetUrlReq) (*third.GetUrlResp, error) {
|
||||
if req.Expires <= 0 {
|
||||
if err := tokenverify.CheckAdmin(ctx); err != nil {
|
||||
return nil, err
|
||||
func (t *thirdServer) InitiateMultipartUpload(ctx context.Context, req *third.InitiateMultipartUploadReq) (*third.InitiateMultipartUploadResp, error) {
|
||||
if err := checkUploadName(ctx, req.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result, err := t.s3dataBase.InitiateMultipartUpload(ctx, req.Hash, req.Size, time.Hour*24, int(req.MaxParts))
|
||||
if err != nil {
|
||||
if haErr, ok := errs.Unwrap(err).(*cont.HashAlreadyExistsError); ok {
|
||||
obj := &relation.ObjectModel{
|
||||
Name: req.Name,
|
||||
UserID: mcontext.GetOpUserID(ctx),
|
||||
Hash: req.Hash,
|
||||
Key: haErr.Object.Key,
|
||||
Size: haErr.Object.Size,
|
||||
ContentType: req.ContentType,
|
||||
Cause: req.Cause,
|
||||
CreateTime: time.Now(),
|
||||
}
|
||||
if err := t.s3dataBase.SetObject(ctx, obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &third.InitiateMultipartUploadResp{
|
||||
Url: t.apiAddress(obj.Key),
|
||||
}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
var sign *third.AuthSignParts
|
||||
if result.Sign != nil && len(result.Sign.Parts) > 0 {
|
||||
sign = &third.AuthSignParts{
|
||||
Url: result.Sign.URL,
|
||||
Query: toPbMapArray(result.Sign.Query),
|
||||
Header: toPbMapArray(result.Sign.Header),
|
||||
Parts: make([]*third.SignPart, len(result.Sign.Parts)),
|
||||
}
|
||||
for i, part := range result.Sign.Parts {
|
||||
sign.Parts[i] = &third.SignPart{
|
||||
PartNumber: int32(part.PartNumber),
|
||||
Url: part.URL,
|
||||
Query: toPbMapArray(part.Query),
|
||||
Header: toPbMapArray(part.Header),
|
||||
}
|
||||
}
|
||||
}
|
||||
return t.s3dataBase.GetUrl(ctx, req)
|
||||
return &third.InitiateMultipartUploadResp{
|
||||
Upload: &third.UploadInfo{
|
||||
UploadID: result.UploadID,
|
||||
PartSize: result.PartSize,
|
||||
Sign: sign,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) GetHashInfo(ctx context.Context, req *third.GetHashInfoReq) (*third.GetHashInfoResp, error) {
|
||||
return t.s3dataBase.GetHashInfo(ctx, req)
|
||||
func (t *thirdServer) AuthSign(ctx context.Context, req *third.AuthSignReq) (*third.AuthSignResp, error) {
|
||||
partNumbers := utils.Slice(req.PartNumbers, func(partNumber int32) int { return int(partNumber) })
|
||||
result, err := t.s3dataBase.AuthSign(ctx, req.UploadID, partNumbers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := &third.AuthSignResp{
|
||||
Url: result.URL,
|
||||
Query: toPbMapArray(result.Query),
|
||||
Header: toPbMapArray(result.Header),
|
||||
Parts: make([]*third.SignPart, len(result.Parts)),
|
||||
}
|
||||
for i, part := range result.Parts {
|
||||
resp.Parts[i] = &third.SignPart{
|
||||
PartNumber: int32(part.PartNumber),
|
||||
Url: part.URL,
|
||||
Query: toPbMapArray(part.Query),
|
||||
Header: toPbMapArray(part.Header),
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) CleanObject(ctx context.Context, now time.Time) {
|
||||
t.s3dataBase.CleanExpirationObject(ctx, now)
|
||||
func (t *thirdServer) CompleteMultipartUpload(ctx context.Context, req *third.CompleteMultipartUploadReq) (*third.CompleteMultipartUploadResp, error) {
|
||||
if err := checkUploadName(ctx, req.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result, err := t.s3dataBase.CompleteMultipartUpload(ctx, req.UploadID, req.Parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
obj := &relation.ObjectModel{
|
||||
Name: req.Name,
|
||||
UserID: mcontext.GetOpUserID(ctx),
|
||||
Hash: result.Hash,
|
||||
Key: result.Key,
|
||||
Size: result.Size,
|
||||
ContentType: req.ContentType,
|
||||
Cause: req.Cause,
|
||||
CreateTime: time.Now(),
|
||||
}
|
||||
if err := t.s3dataBase.SetObject(ctx, obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &third.CompleteMultipartUploadResp{
|
||||
Url: t.apiAddress(obj.Key),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) AccessURL(ctx context.Context, req *third.AccessURLReq) (*third.AccessURLResp, error) {
|
||||
expireTime, rawURL, err := t.s3dataBase.AccessURL(ctx, req.Name, t.defaultExpire)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &third.AccessURLResp{
|
||||
Url: rawURL,
|
||||
ExpireTime: expireTime.UnixMilli(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *thirdServer) apiAddress(name string) string {
|
||||
return t.apiURL + name
|
||||
}
|
||||
|
||||
+36
-16
@@ -2,12 +2,17 @@ package third
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/cos"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/minio"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/s3/oss"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/obj"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
@@ -17,44 +22,59 @@ import (
|
||||
)
|
||||
|
||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
u, err := url.Parse(config.Config.Object.ApiURL)
|
||||
if err != nil {
|
||||
apiURL := config.Config.Object.ApiURL
|
||||
if apiURL == "" {
|
||||
return fmt.Errorf("api url is empty")
|
||||
}
|
||||
if _, err := url.Parse(config.Config.Object.ApiURL); err != nil {
|
||||
return err
|
||||
}
|
||||
if apiURL[len(apiURL)-1] != '/' {
|
||||
apiURL += "/"
|
||||
}
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 根据配置文件策略选择 oss 方式
|
||||
enable := config.Config.Object.Enable
|
||||
var o obj.Interface
|
||||
if enable == "minio" {
|
||||
o, err = obj.NewMinioInterface()
|
||||
} else if enable == "tencent" {
|
||||
o, err = obj.NewCosClient()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db, err := relation.NewGormDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := db.AutoMigrate(&relationTb.ObjectHashModel{}, &relationTb.ObjectInfoModel{}, &relationTb.ObjectPutModel{}); err != nil {
|
||||
if err := db.AutoMigrate(&relationTb.ObjectModel{}); err != nil {
|
||||
return err
|
||||
}
|
||||
// 根据配置文件策略选择 oss 方式
|
||||
enable := config.Config.Object.Enable
|
||||
var o s3.Interface
|
||||
switch config.Config.Object.Enable {
|
||||
case "minio":
|
||||
o, err = minio.NewMinio()
|
||||
case "cos":
|
||||
o, err = cos.NewCos()
|
||||
case "oss":
|
||||
o, err = oss.NewOSS()
|
||||
default:
|
||||
err = fmt.Errorf("invalid object enable: %s", enable)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
third.RegisterThirdServer(server, &thirdServer{
|
||||
apiURL: apiURL,
|
||||
thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb)),
|
||||
userRpcClient: rpcclient.NewUserRpcClient(client),
|
||||
s3dataBase: controller.NewS3Database(o, relation.NewObjectHash(db), relation.NewObjectInfo(db), relation.NewObjectPut(db), u),
|
||||
s3dataBase: controller.NewS3Database(o, relation.NewObjectInfo(db)),
|
||||
defaultExpire: time.Hour * 24 * 7,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
type thirdServer struct {
|
||||
apiURL string
|
||||
thirdDatabase controller.ThirdDatabase
|
||||
s3dataBase controller.S3Database
|
||||
userRpcClient rpcclient.UserRpcClient
|
||||
defaultExpire time.Duration
|
||||
}
|
||||
|
||||
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
package third
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/third"
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
func toPbMapArray(m map[string][]string) map[string]*third.MapValues {
|
||||
res := make(map[string]*third.MapValues)
|
||||
for key := range m {
|
||||
res[key] = &third.MapValues{
|
||||
Values: m[key],
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func checkUploadName(ctx context.Context, name string) error {
|
||||
if name == "" {
|
||||
return errs.ErrArgs.Wrap("name is empty")
|
||||
}
|
||||
if name[0] == '/' {
|
||||
return errs.ErrArgs.Wrap("name cannot start with `/`")
|
||||
}
|
||||
if err := checkValidObjectName(name); err != nil {
|
||||
return errs.ErrArgs.Wrap(err.Error())
|
||||
}
|
||||
opUserID := mcontext.GetOpUserID(ctx)
|
||||
if opUserID == "" {
|
||||
return errs.ErrNoPermission.Wrap("opUserID is empty")
|
||||
}
|
||||
if !tokenverify.IsManagerUserID(opUserID) {
|
||||
if !strings.HasPrefix(name, opUserID+"_") {
|
||||
return errs.ErrNoPermission.Wrap(fmt.Sprintf("name must start with `%s_`", opUserID))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkValidObjectNamePrefix(objectName string) error {
|
||||
if len(objectName) > 1024 {
|
||||
return errors.New("object name cannot be longer than 1024 characters")
|
||||
}
|
||||
if !utf8.ValidString(objectName) {
|
||||
return errors.New("object name with non UTF-8 strings are not supported")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkValidObjectName(objectName string) error {
|
||||
if strings.TrimSpace(objectName) == "" {
|
||||
return errors.New("object name cannot be empty")
|
||||
}
|
||||
return checkValidObjectNamePrefix(objectName)
|
||||
}
|
||||
Reference in New Issue
Block a user