This commit is contained in:
withchao
2023-03-30 14:19:58 +08:00
parent 8843933d45
commit ee712593d1
5 changed files with 256 additions and 155 deletions
+33 -7
View File
@@ -2,6 +2,7 @@ package controller
import "C"
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
@@ -16,6 +17,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/third"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/google/uuid"
"io"
"path"
"strconv"
"time"
@@ -25,6 +27,7 @@ const (
hashPrefix = "hash"
tempPrefix = "temp"
fragmentPrefix = "fragment_"
urlsName = "urls.json"
)
type S3Database interface {
@@ -138,9 +141,9 @@ func (c *s3Database) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*thi
return nil, err
}
// 新上传
var pack int
var fragmentNum int
const effective = time.Hour * 24 * 2
req.FragmentSize, pack = c.getFragmentNum(req.FragmentSize, req.Size)
req.FragmentSize, fragmentNum = c.getFragmentNum(req.FragmentSize, req.Size)
put := relation.ObjectPutModel{
PutID: c.UUID(),
Hash: req.Hash,
@@ -152,8 +155,8 @@ func (c *s3Database) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*thi
EffectiveTime: time.Now().Add(effective),
}
put.Path = path.Join(tempPrefix, c.today(), req.Hash, put.PutID)
putURLs := make([]string, 0, pack)
for i := 0; i < pack; i++ {
putURLs := make([]string, 0, fragmentNum)
for i := 0; i < fragmentNum; i++ {
url, err := c.obj.PresignedPutURL(ctx, &obj.ApplyPutArgs{
Bucket: c.obj.TempBucket(),
Name: path.Join(put.Path, c.fragmentName(i)),
@@ -171,6 +174,10 @@ func (c *s3Database) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*thi
}
t := md5.Sum(urlsJsonData)
put.PutURLsHash = hex.EncodeToString(t[:])
_, err = c.obj.PutObject(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: path.Join(put.Path, urlsName)}, bytes.NewReader(urlsJsonData), int64(len(urlsJsonData)))
if err != nil {
return nil, err
}
put.CreateTime = time.Now()
if err := c.put.Create(ctx, []*relation.ObjectPutModel{&put}); err != nil {
return nil, err
@@ -191,9 +198,28 @@ func (c *s3Database) GetPut(ctx context.Context, req *third.GetPutReq) (*third.G
if up.Complete {
return nil, errors.New("up completed")
}
_, pack := c.getFragmentNum(up.FragmentSize, up.ObjectSize)
fragments := make([]*third.GetPutFragment, pack)
for i := 0; i < pack; i++ {
reader, err := c.obj.GetObject(ctx, &obj.BucketObject{Bucket: c.obj.Name(), Name: path.Join(up.Path, urlsName)})
if err != nil {
return nil, err
}
urlsData, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
t := md5.Sum(urlsData)
if h := hex.EncodeToString(t[:]); h != up.PutURLsHash {
return nil, fmt.Errorf("invalid put urls hash %s %s", h, up.PutURLsHash)
}
var urls []string
if err := json.Unmarshal(urlsData, &urls); err != nil {
return nil, err
}
_, fragmentNum := c.getFragmentNum(up.FragmentSize, up.ObjectSize)
if len(urls) != fragmentNum {
return nil, fmt.Errorf("invalid urls length %d fragment %d", len(urls), fragmentNum)
}
fragments := make([]*third.GetPutFragment, fragmentNum)
for i := 0; i < fragmentNum; i++ {
name := path.Join(up.Path, c.fragmentName(i))
o, err := c.obj.GetObjectInfo(ctx, &obj.BucketObject{
Bucket: c.obj.TempBucket(),
+24
View File
@@ -9,6 +9,7 @@ import (
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
"io"
"net/http"
"net/url"
"time"
@@ -196,6 +197,29 @@ func (m *minioImpl) IsNotFound(err error) bool {
}
}
func (m *minioImpl) PutObject(ctx context.Context, info *BucketObject, reader io.Reader, size int64) (*ObjectInfo, error) {
update, err := m.client.PutObject(ctx, info.Bucket, info.Name, reader, size, minio.PutObjectOptions{})
if err != nil {
return nil, err
}
return &ObjectInfo{
Size: update.Size,
Hash: update.ETag,
}, nil
}
func (m *minioImpl) GetObject(ctx context.Context, info *BucketObject) (SizeReader, error) {
object, err := m.client.GetObject(ctx, info.Bucket, info.Name, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
stat, err := object.Stat()
if err != nil {
return nil, err
}
return NewSizeReader(object, stat.Size), nil
}
func (m *minioImpl) CheckName(name string) error {
return s3utils.CheckValidObjectName(name)
}
+29
View File
@@ -2,6 +2,7 @@ package obj
import (
"context"
"io"
"net/http"
"time"
)
@@ -29,6 +30,30 @@ type ObjectInfo struct {
Hash string
}
type SizeReader interface {
io.ReadCloser
Size() int64
}
func NewSizeReader(r io.ReadCloser, size int64) SizeReader {
if r == nil {
return nil
}
return &sizeReader{
size: size,
ReadCloser: r,
}
}
type sizeReader struct {
size int64
io.ReadCloser
}
func (r *sizeReader) Size() int64 {
return r.size
}
type Interface interface {
// Name 存储名字
Name() string
@@ -58,4 +83,8 @@ type Interface interface {
IsNotFound(err error) bool
// CheckName 检查名字是否可用
CheckName(name string) error
// PutObject 上传文件
PutObject(ctx context.Context, info *BucketObject, reader io.Reader, size int64) (*ObjectInfo, error)
// GetObject 下载文件
GetObject(ctx context.Context, info *BucketObject) (SizeReader, error)
}