Hangfire只允许同时运行同一个任务

时间:2024-01-27 17:15:42

Hangfire有个机制可以确保所有任务都会被执行,如果当服务器停机了一段时间重新启动时,在此期间的周期任务会几乎同时执行。而大部分时候,我们希望同个周期任务每段时间只运行一个就行了。

或者是如果周期任务设置得过于频繁,当之前的任务还没执行完,我们也不希望继续添加周期任务进队列去排队执行。

Hangfire有提供一个扩展https://docs.hangfire.io/en/latest/background-processing/throttling.html 

同个DisableConcurrentExecution我们可以限制同一个任务每次只会执行一个,但是如果有任务正在执行,这时候又有新任务过来,新任务并不会被删除而是处于排队状态,等待前面的任务执行完。

 

 

而且,如果我们的任务用了同一个方法作为入口时(或者说我们需要根据方法的参数来确定是否为同一个任务),此时这个控制就不适用了。

参考了https://gist.github.com/sbosell/3831f5bb893b20e82c72467baf8aefea,我们可以用过滤器来实现,将运行期间进来的任务给取消掉。

代码的具体实现为:

 1     /// <summary>
 2     /// 禁用多个排队项目
 3     /// <remarks>同个任务取消并行执行,期间进来的任务不会等待,会被取消</remarks>
 4     /// </summary>
 5     public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
 6     {
 7         private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
 8         private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(4);//任务执行超时时间
 9 
10         public void OnCreating(CreatingContext filterContext)
11         {
12             var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId");
13             if (!string.IsNullOrEmpty(recurringJobId)&&!AddFingerprintIfNotExists(filterContext.Connection, recurringJobId))
14             {
15                 filterContext.Canceled = true;
16             }
17         }
18 
19         public void OnPerformed(PerformedContext filterContext)
20         {
21             var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId");
22             if (!string.IsNullOrEmpty(recurringJobId))
23             {
24                 RemoveFingerprint(filterContext.Connection, recurringJobId);
25             }
26         }
27 
28         private static bool AddFingerprintIfNotExists(IStorageConnection connection, string recurringJobId)
29         {
30             using (connection.AcquireDistributedLock(GetFingerprintLockKey(recurringJobId), LockTimeout))
31             {
32                 var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(recurringJobId));
33 
34                 if (fingerprint != null &&
35                     fingerprint.ContainsKey("Timestamp") &&
36                     DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out var timestamp) &&
37                     DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
38                 {
39                     // 有任务还未执行完,并且没有超时
40                     return false;
41                 }
42 
43                 // 没有任务执行,或者该任务已超时
44                 connection.SetRangeInHash(GetFingerprintKey(recurringJobId), new Dictionary<string, string>
45             {
46                 { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
47             });
48 
49                 return true;
50             }
51         }
View Code

 

在OnCreating方法中,我们读取RecurringJobId的值,获取周期任务的id(同样的id代表同一个周期任务),然后以这个id为key去设置一个超时。如果在此期间,拿到了一个key的值,以及设置的时间还未超时的话,我们通过设置filterContext.Canceled = true取消掉此任务。

 

 使用connection.AcquireDistributedLock在设置键值时添加分布式锁,使用connection.SetRangeInHash键RecurringJobId作为key,当前时间作为值保存。以此来确保在FingerprintTimeout的超时时间内,同个RecurringJobId的任务只能创建一个。或者等任务执行完后在OnPerformed方法中释放掉这个键值。

在OnPerformed方法中,将我们在创建方法中设置的RecurringJobId key和对应的时间给删除,这样OnCreating可以继续创建同一个RecurringJobId 的任务。

 

或者是普通触发的任务,这时候没有RecurringJobId 我们希望可以同个参数来控制,同样的参数不能同时执行。我们可以通过这个方法来生成相应的key

 1         private static string GetFingerprint(Job job)
 2         {
 3             var parameters = string.Empty;
 4             if (job?.Arguments != null)
 5             {
 6                 parameters = string.Join(".", job.Arguments);
 7             }
 8             if (job?.Type == null || job.Method == null)
 9             {
10                 return string.Empty;
11             }
12             var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
13             var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
14             var fingerprint = Convert.ToBase64String(hash);
15             return fingerprint;
16         }    
View Code

这样我们就能确保我们希望的同一个任务不会同时在执行,而且周期任务也不会继续在队列中排队