Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode

This commit is contained in:
wangchuxiao
2023-03-15 15:27:01 +08:00
16 changed files with 408 additions and 162 deletions
+37 -8
View File
@@ -2,9 +2,11 @@ package controller
import "C"
import (
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/db/obj"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/log"
"OpenIM/pkg/errs"
"OpenIM/pkg/proto/third"
"OpenIM/pkg/utils"
"context"
@@ -18,10 +20,17 @@ import (
"time"
)
const (
hashPrefix = "hash"
tempPrefix = "temp"
fragmentPrefix = "fragment_"
)
type S3Database interface {
ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*third.ApplyPutResp, error)
GetPut(ctx context.Context, req *third.GetPutReq) (*third.GetPutResp, error)
ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (*third.ConfirmPutResp, error)
GetUrl(ctx context.Context, req *third.GetUrlReq) (*third.GetUrlResp, error)
CleanExpirationObject(ctx context.Context, t time.Time)
}
@@ -48,7 +57,7 @@ func (c *s3Database) today() string {
// fragmentName 根据序号生成文件名
func (c *s3Database) fragmentName(index int) string {
return "fragment_" + strconv.Itoa(index+1)
return fragmentPrefix + strconv.Itoa(index+1)
}
// getFragmentNum 获取分片大小和分片数量
@@ -76,16 +85,13 @@ func (c *s3Database) CheckHash(hash string) error {
return err
}
if len(val) != md5.Size {
return errors.New("hash value error")
return errs.ErrArgs.Wrap("invalid hash")
}
return nil
}
func (c *s3Database) urlName(name string) string {
if name[0] != '/' {
name = "/" + name
}
return "http://127.0.0.1:8080" + name
return config.Config.Object.ApiURL + name
}
func (c *s3Database) UUID() string {
@@ -93,7 +99,7 @@ func (c *s3Database) UUID() string {
}
func (c *s3Database) HashName(hash string) string {
return path.Join("hash", c.today(), c.UUID())
return path.Join(hashPrefix, hash+"_"+c.today()+"_"+c.UUID())
}
func (c *s3Database) isNotFound(err error) bool {
@@ -141,7 +147,7 @@ func (c *s3Database) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*thi
ExpirationTime: expirationTime,
EffectiveTime: time.Now().Add(effective),
}
put.Path = path.Join("upload", c.today(), req.Hash, put.PutID)
put.Path = path.Join(tempPrefix, c.today(), req.Hash, put.PutID)
putURLs := make([]string, 0, pack)
for i := 0; i < pack; i++ {
url, err := c.obj.PresignedPutURL(ctx, &obj.ApplyPutArgs{
@@ -356,6 +362,29 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
}, nil
}
func (c *s3Database) GetUrl(ctx context.Context, req *third.GetUrlReq) (*third.GetUrlResp, error) {
info, err := c.info.Take(ctx, req.Name)
if err != nil {
return nil, err
}
if info.ExpirationTime != nil && info.ExpirationTime.Before(time.Now()) {
return nil, errs.ErrRecordNotFound.Wrap("object expired")
}
hash, err := c.hash.Take(ctx, info.Hash, c.obj.Name())
if err != nil {
return nil, err
}
u, err := c.obj.GetURL(ctx, hash.Bucket, hash.Name, time.Duration(req.Expires)*time.Millisecond)
if err != nil {
return nil, err
}
return &third.GetUrlResp{
Url: u,
Size: hash.Size,
Hash: hash.Hash,
}, nil
}
func (c *s3Database) CleanExpirationObject(ctx context.Context, t time.Time) {
// 清理上传产生的临时文件
c.cleanPutTemp(ctx, t, 10)
+53 -46
View File
@@ -1,52 +1,66 @@
package obj
import (
"OpenIM/pkg/common/config"
"OpenIM/pkg/utils"
"context"
"errors"
"fmt"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
"net/http"
"net/url"
"time"
)
func NewMinioClient() {
}
func NewMinioInterface() (Interface, error) {
//client, err := minio.New("127.0.0.1:9000", &minio.Options{
// Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
// Secure: false,
//})
// todo 初始化连接和桶
return &minioImpl{}, nil
conf := config.Config.Object.Minio
u, err := url.Parse(conf.Endpoint)
if err != nil {
return nil, fmt.Errorf("minio endpoint parse %w", err)
}
if u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("invalid minio endpoint scheme %s", u.Scheme)
}
client, err := minio.New(u.Host, &minio.Options{
Creds: credentials.NewStaticV4(conf.AccessKeyID, conf.SecretAccessKey, ""),
Secure: u.Scheme == "https",
})
if err != nil {
return nil, fmt.Errorf("minio new client %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
for _, bucket := range utils.Distinct([]string{conf.TempBucket, conf.DataBucket}) {
exists, err := client.BucketExists(ctx, bucket)
if err != nil {
return nil, fmt.Errorf("minio bucket %s exists %w", bucket, err)
}
if exists {
continue
}
opt := minio.MakeBucketOptions{
Region: conf.Location,
ObjectLocking: conf.IsDistributedMod,
}
if err := client.MakeBucket(ctx, conf.TempBucket, opt); err != nil {
return nil, fmt.Errorf("minio make bucket %s %w", bucket, err)
}
}
return &minioImpl{
client: client,
tempBucket: conf.TempBucket,
dataBucket: conf.DataBucket,
}, nil
}
type minioImpl struct {
tempBucket string // 上传桶
permanentBucket string // 永久桶
clearBucket string // 自动清理桶
urlstr string // 访问地址
client *minio.Client
tempBucket string // 上传桶
dataBucket string // 永久桶
urlstr string // 访问地址
client *minio.Client
}
//func (m *minioImpl) Init() error {
// client, err := minio.New("127.0.0.1:9000", &minio.Options{
// Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
// Secure: false,
// })
// if err != nil {
// return fmt.Errorf("minio client error: %w", err)
// }
// m.urlstr = "http://127.0.0.1:9000"
// m.client = client
// m.tempBucket = "temp"
// m.permanentBucket = "permanent"
// m.clearBucket = "clear"
// return nil
//}
func (m *minioImpl) Name() string {
return "minio"
}
@@ -63,26 +77,20 @@ func (m *minioImpl) MinExpirationTime() time.Duration {
return time.Hour * 24
}
func (m *minioImpl) AppendHeader() http.Header {
return map[string][]string{
"x-amz-object-append": {"true"},
}
}
func (m *minioImpl) TempBucket() string {
return m.tempBucket
}
func (m *minioImpl) DataBucket() string {
return m.permanentBucket
return m.dataBucket
}
func (m *minioImpl) ClearBucket() string {
return m.clearBucket
}
func (m *minioImpl) GetURL(bucket string, name string) string {
return fmt.Sprintf("%s/%s/%s", m.urlstr, bucket, name)
func (m *minioImpl) GetURL(ctx context.Context, bucket string, name string, expires time.Duration) (string, error) {
u, err := m.client.PresignedGetObject(ctx, bucket, name, expires, nil)
if err != nil {
return "", err
}
return u.String(), nil
}
func (m *minioImpl) PresignedPutURL(ctx context.Context, args *ApplyPutArgs) (string, error) {
@@ -111,7 +119,6 @@ func (m *minioImpl) GetObjectInfo(ctx context.Context, args *BucketObject) (*Obj
return nil, err
}
return &ObjectInfo{
URL: m.GetURL(args.Bucket, args.Name),
Size: info.Size,
Hash: info.ETag,
}, nil
+4 -5
View File
@@ -20,10 +20,8 @@ type ApplyPutArgs struct {
}
type ObjectInfo struct {
URL string
Size int64
Hash string
Expiration time.Time
Size int64
Hash string
}
type Interface interface {
@@ -40,7 +38,8 @@ type Interface interface {
// DataBucket 永久存储的桶名
DataBucket() string
// GetURL 通过桶名和对象名返回URL
GetURL(bucket string, name string) string
//GetURL(bucket string, name string) string
GetURL(ctx context.Context, bucket string, name string, expires time.Duration) (string, error)
// PresignedPutURL 申请上传,返回PUT的上传地址
PresignedPutURL(ctx context.Context, args *ApplyPutArgs) (string, error)
// GetObjectInfo 获取对象信息
+3 -1
View File
@@ -2,6 +2,7 @@ package relation
import (
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/log"
"OpenIM/pkg/utils"
"context"
"gorm.io/gorm"
@@ -32,7 +33,8 @@ func (u *UserGorm) Update(ctx context.Context, users []*relation.UserModel) (err
// 获取指定用户信息 不存在,也不返回错误
func (u *UserGorm) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
err = utils.Wrap(u.DB.Debug().Where("user_id in ?", userIDs).Find(&users).Error, "")
log.ZDebug(ctx, "Find args", "userIDs", userIDs)
err = utils.Wrap(u.DB.Where("user_id in ?", userIDs).Find(&users).Error, "")
return users, err
}