This commit is contained in:
withchao
2023-03-15 11:24:59 +08:00
parent 316fa10257
commit e9865539de
7 changed files with 144 additions and 140 deletions
+7 -10
View File
@@ -92,16 +92,13 @@ type config struct {
OssRoleArn string `yaml:"OssRoleArn"`
}
Minio struct {
Bucket string `yaml:"bucket"`
AppBucket string `yaml:"appBucket"`
Location string `yaml:"location"`
Endpoint string `yaml:"endpoint"`
AccessKeyID string `yaml:"accessKeyID"`
SecretAccessKey string `yaml:"secretAccessKey"`
EndpointInner string `yaml:"endpointInner"`
EndpointInnerEnable bool `yaml:"endpointInnerEnable"`
StorageTime int `yaml:"storageTime"`
IsDistributedMod bool `yaml:"isDistributedMod"`
TempBucket string `yaml:"tempBucket"`
DataBucket string `yaml:"dataBucket"`
Location string `yaml:"location"`
Endpoint string `yaml:"endpoint"`
AccessKeyID string `yaml:"accessKeyID"`
SecretAccessKey string `yaml:"secretAccessKey"`
IsDistributedMod bool `yaml:"isDistributedMod"`
} `yaml:"minio"`
Aws struct {
AccessKeyID string `yaml:"accessKeyID"`
+15 -10
View File
@@ -20,6 +20,12 @@ import (
"time"
)
const (
hashPrefix = "hash"
tempPrefix = "temp"
fragmentPrefix = "fragment_"
)
type S3Database interface {
ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*third.ApplyPutResp, error)
GetPut(ctx context.Context, req *third.GetPutReq) (*third.GetPutResp, error)
@@ -51,7 +57,7 @@ func (c *s3Database) today() string {
// fragmentName 根据序号生成文件名
func (c *s3Database) fragmentName(index int) string {
return "fragment_" + strconv.Itoa(index+1)
return fragmentPrefix + strconv.Itoa(index+1)
}
// getFragmentNum 获取分片大小和分片数量
@@ -79,17 +85,12 @@ func (c *s3Database) CheckHash(hash string) error {
return err
}
if len(val) != md5.Size {
return errors.New("hash value error")
return errs.ErrArgs.Wrap("invalid hash")
}
return nil
}
func (c *s3Database) urlName(name string) string {
//if name[0] != '/' {
// name = "/" + name
//}
//config.Config.Credential.ObjectURL + name
//return "http://127.0.0.1:8080" + name
return config.Config.Object.ApiURL + name
}
@@ -98,7 +99,7 @@ func (c *s3Database) UUID() string {
}
func (c *s3Database) HashName(hash string) string {
return path.Join("hash", c.today(), c.UUID())
return path.Join(hashPrefix, hash+"_"+c.today()+"_"+c.UUID())
}
func (c *s3Database) isNotFound(err error) bool {
@@ -146,7 +147,7 @@ func (c *s3Database) ApplyPut(ctx context.Context, req *third.ApplyPutReq) (*thi
ExpirationTime: expirationTime,
EffectiveTime: time.Now().Add(effective),
}
put.Path = path.Join("upload", c.today(), req.Hash, put.PutID)
put.Path = path.Join(tempPrefix, c.today(), req.Hash, put.PutID)
putURLs := make([]string, 0, pack)
for i := 0; i < pack; i++ {
url, err := c.obj.PresignedPutURL(ctx, &obj.ApplyPutArgs{
@@ -373,8 +374,12 @@ func (c *s3Database) GetUrl(ctx context.Context, req *third.GetUrlReq) (*third.G
if err != nil {
return nil, err
}
u, err := c.obj.GetURL(ctx, hash.Bucket, hash.Name, time.Duration(req.Expires)*time.Millisecond)
if err != nil {
return nil, err
}
return &third.GetUrlResp{
Url: c.obj.GetURL(hash.Bucket, hash.Name),
Url: u,
Size: hash.Size,
Hash: hash.Hash,
}, nil
+34 -47
View File
@@ -2,25 +2,18 @@ package obj
import (
"OpenIM/pkg/common/config"
"OpenIM/pkg/utils"
"context"
"errors"
"fmt"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
"net/http"
"net/url"
"time"
)
func NewMinioClient() {
}
func NewMinioInterface() (Interface, error) {
if true {
return &minioImpl{}, nil // todo
}
conf := config.Config.Object.Minio
u, err := url.Parse(conf.Endpoint)
if err != nil {
@@ -31,42 +24,43 @@ func NewMinioInterface() (Interface, error) {
}
client, err := minio.New(u.Host, &minio.Options{
Creds: credentials.NewStaticV4(conf.AccessKeyID, conf.SecretAccessKey, ""),
Secure: false,
Secure: u.Scheme == "https",
})
if err != nil {
return nil, fmt.Errorf("minio new client %w", err)
}
// todo 初始化连接和桶
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()
for _, bucket := range utils.Distinct([]string{conf.TempBucket, conf.DataBucket}) {
exists, err := client.BucketExists(ctx, bucket)
if err != nil {
return nil, fmt.Errorf("minio bucket %s exists %w", bucket, err)
}
if exists {
continue
}
opt := minio.MakeBucketOptions{
Region: conf.Location,
ObjectLocking: conf.IsDistributedMod,
}
if err := client.MakeBucket(ctx, conf.TempBucket, opt); err != nil {
return nil, fmt.Errorf("minio make bucket %s %w", bucket, err)
}
}
return &minioImpl{
client: client,
//tempBucket: conf.Bucket,
client: client,
tempBucket: conf.TempBucket,
dataBucket: conf.DataBucket,
}, nil
}
type minioImpl struct {
tempBucket string // 上传桶
permanentBucket string // 永久桶
clearBucket string // 自动清理桶
urlstr string // 访问地址
client *minio.Client
tempBucket string // 上传桶
dataBucket string // 永久桶
urlstr string // 访问地址
client *minio.Client
}
//func (m *minioImpl) Init() error {
// client, err := minio.New("127.0.0.1:9000", &minio.Options{
// Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
// Secure: false,
// })
// if err != nil {
// return fmt.Errorf("minio client error: %w", err)
// }
// m.urlstr = "http://127.0.0.1:9000"
// m.client = client
// m.tempBucket = "temp"
// m.permanentBucket = "permanent"
// m.clearBucket = "clear"
// return nil
//}
func (m *minioImpl) Name() string {
return "minio"
}
@@ -83,26 +77,20 @@ func (m *minioImpl) MinExpirationTime() time.Duration {
return time.Hour * 24
}
func (m *minioImpl) AppendHeader() http.Header {
return map[string][]string{
"x-amz-object-append": {"true"},
}
}
func (m *minioImpl) TempBucket() string {
return m.tempBucket
}
func (m *minioImpl) DataBucket() string {
return m.permanentBucket
return m.dataBucket
}
func (m *minioImpl) ClearBucket() string {
return m.clearBucket
}
func (m *minioImpl) GetURL(bucket string, name string) string {
return fmt.Sprintf("%s/%s/%s", m.urlstr, bucket, name)
func (m *minioImpl) GetURL(ctx context.Context, bucket string, name string, expires time.Duration) (string, error) {
u, err := m.client.PresignedGetObject(ctx, bucket, name, expires, nil)
if err != nil {
return "", err
}
return u.String(), nil
}
func (m *minioImpl) PresignedPutURL(ctx context.Context, args *ApplyPutArgs) (string, error) {
@@ -131,7 +119,6 @@ func (m *minioImpl) GetObjectInfo(ctx context.Context, args *BucketObject) (*Obj
return nil, err
}
return &ObjectInfo{
URL: m.GetURL(args.Bucket, args.Name),
Size: info.Size,
Hash: info.ETag,
}, nil
+4 -5
View File
@@ -20,10 +20,8 @@ type ApplyPutArgs struct {
}
type ObjectInfo struct {
URL string
Size int64
Hash string
Expiration time.Time
Size int64
Hash string
}
type Interface interface {
@@ -40,7 +38,8 @@ type Interface interface {
// DataBucket 永久存储的桶名
DataBucket() string
// GetURL 通过桶名和对象名返回URL
GetURL(bucket string, name string) string
//GetURL(bucket string, name string) string
GetURL(ctx context.Context, bucket string, name string, expires time.Duration) (string, error)
// PresignedPutURL 申请上传,返回PUT的上传地址
PresignedPutURL(ctx context.Context, args *ApplyPutArgs) (string, error)
// GetObjectInfo 获取对象信息