mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-10 12:05:58 +08:00
feat: support distributed lock in crontask. (#3401)
* feat: support distributed lock in crontask. * remove space. * remove comment. * remove log. * Update logic. * Update contents.
This commit is contained in:
@@ -58,6 +58,11 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
||||
cm.Watch(ctx)
|
||||
}
|
||||
|
||||
locker, err := NewEtcdLocker(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srv := &cronServer{
|
||||
ctx: ctx,
|
||||
config: conf,
|
||||
@@ -65,6 +70,7 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
||||
msgClient: msg.NewMsgClient(msgConn),
|
||||
conversationClient: pbconversation.NewConversationClient(conversationConn),
|
||||
thirdClient: third.NewThirdClient(thirdConn),
|
||||
locker: locker,
|
||||
}
|
||||
|
||||
if err := srv.registerClearS3(); err != nil {
|
||||
@@ -81,6 +87,8 @@ func Start(ctx context.Context, conf *Config, client discovery.Conn, service grp
|
||||
log.ZDebug(ctx, "cron task server is running")
|
||||
<-ctx.Done()
|
||||
log.ZDebug(ctx, "cron task server is shutting down")
|
||||
srv.cron.Stop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -91,6 +99,7 @@ type cronServer struct {
|
||||
msgClient msg.MsgClient
|
||||
conversationClient pbconversation.ConversationClient
|
||||
thirdClient third.ThirdClient
|
||||
locker *EtcdLocker
|
||||
}
|
||||
|
||||
func (c *cronServer) registerClearS3() error {
|
||||
@@ -98,7 +107,9 @@ func (c *cronServer) registerClearS3() error {
|
||||
log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType)
|
||||
return nil
|
||||
}
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3)
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||
c.locker.ExecuteWithLock(c.ctx, "clearS3", c.clearS3)
|
||||
})
|
||||
return errs.WrapMsg(err, "failed to register clear s3 cron task")
|
||||
}
|
||||
|
||||
@@ -107,11 +118,15 @@ func (c *cronServer) registerDeleteMsg() error {
|
||||
log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords)
|
||||
return nil
|
||||
}
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg)
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||
c.locker.ExecuteWithLock(c.ctx, "deleteMsg", c.deleteMsg)
|
||||
})
|
||||
return errs.WrapMsg(err, "failed to register delete msg cron task")
|
||||
}
|
||||
|
||||
func (c *cronServer) registerClearUserMsg() error {
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg)
|
||||
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, func() {
|
||||
c.locker.ExecuteWithLock(c.ctx, "clearUserMsg", c.clearUserMsg)
|
||||
})
|
||||
return errs.WrapMsg(err, "failed to register clear user msg cron task")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user