feat: S3 server cache (#1329)

* optimize scheduled deletion

* optimize scheduled deletion

* optimize scheduled deletion

* optimize scheduled deletion

* minio cache

* fix: conflicts

* feat: minio cache

* feat: cache optimize

* feat: cache optimize

* feat: cache optimize

* feat: cache optimize

* feat: cache optimize
This commit is contained in:
chao
2023-11-02 20:40:45 -05:00
committed by GitHub
parent 62e9980f3c
commit cb0bf64435
8 changed files with 386 additions and 159 deletions
+21 -9
View File
@@ -20,6 +20,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"path"
"strings"
"time"
@@ -32,12 +33,16 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3"
)
func New(impl s3.Interface) *Controller {
return &Controller{impl: impl}
func New(cache cache.S3Cache, impl s3.Interface) *Controller {
return &Controller{
cache: cache,
impl: impl,
}
}
type Controller struct {
impl s3.Interface
cache cache.S3Cache
impl s3.Interface
}
func (c *Controller) HashPath(md5 string) string {
@@ -69,8 +74,12 @@ func (c *Controller) PartLimit() *s3.PartLimit {
return c.impl.PartLimit()
}
func (c *Controller) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) {
return c.cache.GetKey(ctx, c.impl.Engine(), name)
}
func (c *Controller) GetHashObject(ctx context.Context, hash string) (*s3.ObjectInfo, error) {
return c.impl.StatObject(ctx, c.HashPath(hash))
return c.StatObject(ctx, c.HashPath(hash))
}
func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*InitiateUploadResult, error) {
@@ -94,7 +103,7 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64
if maxParts > 0 && partNumber > 0 && partNumber < maxParts {
return nil, errors.New(fmt.Sprintf("too many parts: %d", partNumber))
}
if info, err := c.impl.StatObject(ctx, c.HashPath(hash)); err == nil {
if info, err := c.StatObject(ctx, c.HashPath(hash)); err == nil {
return nil, &HashAlreadyExistsError{Object: info}
} else if !c.impl.IsNotFound(err) {
return nil, err
@@ -168,13 +177,13 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash)
return nil, errors.New("md5 mismatching")
}
if info, err := c.impl.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {
if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil {
return &UploadResult{
Key: info.Key,
Size: info.Size,
Hash: info.ETag,
}, nil
} else if !c.impl.IsNotFound(err) {
} else if !c.IsNotFound(err) {
return nil, err
}
cleanObject := make(map[string]struct{})
@@ -200,7 +209,7 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
}
targetKey = result.Key
case UploadTypePresigned:
uploadInfo, err := c.impl.StatObject(ctx, upload.Key)
uploadInfo, err := c.StatObject(ctx, upload.Key)
if err != nil {
return nil, err
}
@@ -230,6 +239,9 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa
default:
return nil, errors.New("invalid upload id type")
}
if err := c.cache.DelS3Key(c.impl.Engine(), targetKey).ExecDel(ctx); err != nil {
return nil, err
}
return &UploadResult{
Key: targetKey,
Size: upload.Size,
@@ -253,7 +265,7 @@ func (c *Controller) AuthSign(ctx context.Context, uploadID string, partNumbers
}
func (c *Controller) IsNotFound(err error) bool {
return c.impl.IsNotFound(err)
return c.impl.IsNotFound(err) || errs.ErrRecordNotFound.Is(err)
}
func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) {