This commit is contained in:
wangchuxiao
2023-03-14 18:47:40 +08:00
parent 1d04b6c924
commit 104b5748b8
31 changed files with 57 additions and 638 deletions
+18 -17
View File
@@ -4,6 +4,7 @@ import "C"
import (
"OpenIM/pkg/common/db/obj"
"OpenIM/pkg/common/db/table/relation"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/proto/third"
"OpenIM/pkg/utils"
@@ -214,9 +215,9 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
defer func() {
if _err == nil {
// 清理上传的碎片
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: put.Path})
err := c.obj.DeleteObject(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: put.Path})
if err != nil {
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
log.ZError(ctx, "deleteObject failed", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
}
}
}()
@@ -241,12 +242,12 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
return nil, err
}
defer func() {
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{
err := c.obj.DeleteObject(ctx, &obj.BucketObject{
Bucket: c.obj.TempBucket(),
Name: put.Path,
})
if err != nil {
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
log.ZError(ctx, "DeleteObject", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
}
}()
// 服务端已存在
@@ -297,7 +298,7 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
if put.Hash != o.Hash {
return nil, fmt.Errorf("hash mismatching should %s reality %s", put.Hash, o.Hash)
}
if err := c.obj.CopyObjet(ctx, &src[0], dst); err != nil {
if err := c.obj.CopyObject(ctx, &src[0], dst); err != nil {
return nil, err
}
} else {
@@ -306,8 +307,8 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
Name: path.Join(put.Path, "merge_"+c.UUID()),
}
defer func() { // 清理合成的文件
if err := c.obj.DeleteObjet(ctx, tempBucket); err != nil {
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", tempBucket.Bucket, "Path", tempBucket.Name)
if err := c.obj.DeleteObject(ctx, tempBucket); err != nil {
log.ZError(ctx, "DeleteObject", err, "Bucket", tempBucket.Bucket, "Path", tempBucket.Name)
}
}()
err := c.obj.ComposeObject(ctx, src, tempBucket)
@@ -324,7 +325,7 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
if put.Hash != info.Hash {
return nil, fmt.Errorf("hash mismatch should %s reality %s", put.Hash, info.Hash)
}
if err := c.obj.CopyObjet(ctx, tempBucket, dst); err != nil {
if err := c.obj.CopyObject(ctx, tempBucket, dst); err != nil {
return nil, err
}
}
@@ -349,7 +350,7 @@ func (c *s3Database) ConfirmPut(ctx context.Context, req *third.ConfirmPutReq) (
return nil, err
}
if err := c.put.SetCompleted(ctx, put.PutID); err != nil {
tracelog.SetCtxWarn(ctx, "SetCompleted", err, "PutID", put.PutID)
log.ZError(ctx, "SetCompleted", err, "PutID", put.PutID)
}
return &third.ConfirmPutResp{
Url: c.urlName(o.Name),
@@ -369,23 +370,23 @@ func (c *s3Database) cleanPutTemp(ctx context.Context, t time.Time, num int) {
for {
puts, err := c.put.FindExpirationPut(ctx, t, num)
if err != nil {
tracelog.SetCtxWarn(ctx, "FindExpirationPut", err, "Time", t, "Num", num)
log.ZError(ctx, "FindExpirationPut", err, "Time", t, "Num", num)
return
}
if len(puts) == 0 {
return
}
for _, put := range puts {
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: put.Path})
err := c.obj.DeleteObject(ctx, &obj.BucketObject{Bucket: c.obj.TempBucket(), Name: put.Path})
if err != nil {
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
log.ZError(ctx, "DeleteObject", err, "Bucket", c.obj.TempBucket(), "Path", put.Path)
return
}
}
ids := utils.Slice(puts, func(e *relation.ObjectPutModel) string { return e.PutID })
err = c.put.DelPut(ctx, ids)
if err != nil {
tracelog.SetCtxWarn(ctx, "DelPut", err, "PutID", ids)
log.ZError(ctx, "DelPut", err, "PutID", ids)
return
}
}
@@ -394,7 +395,7 @@ func (c *s3Database) cleanPutTemp(ctx context.Context, t time.Time, num int) {
func (c *s3Database) cleanExpirationObject(ctx context.Context, t time.Time) {
err := c.info.DeleteExpiration(ctx, t)
if err != nil {
tracelog.SetCtxWarn(ctx, "DeleteExpiration", err, "Time", t)
log.ZError(ctx, "DeleteExpiration", err, "Time", t)
}
}
@@ -402,7 +403,7 @@ func (c *s3Database) clearNoCitation(ctx context.Context, engine string, limit i
for {
list, err := c.hash.DeleteNoCitation(ctx, engine, limit)
if err != nil {
tracelog.SetCtxWarn(ctx, "DeleteNoCitation", err, "Engine", engine, "Limit", limit)
log.ZError(ctx, "DeleteNoCitation", err, "Engine", engine, "Limit", limit)
return
}
if len(list) == 0 {
@@ -410,10 +411,10 @@ func (c *s3Database) clearNoCitation(ctx context.Context, engine string, limit i
}
var hasErr bool
for _, h := range list {
err := c.obj.DeleteObjet(ctx, &obj.BucketObject{Bucket: h.Bucket, Name: h.Name})
err := c.obj.DeleteObject(ctx, &obj.BucketObject{Bucket: h.Bucket, Name: h.Name})
if err != nil {
hasErr = true
tracelog.SetCtxWarn(ctx, "DeleteObjet", err, "Bucket", h.Bucket, "Path", h.Name)
log.ZError(ctx, "DeleteObject", err, "Bucket", h.Bucket, "Path", h.Name)
continue
}
}