ABP Framework-BackgroundJob源码解析
目录
https://abp.io/docs/6.0/Background-Jobs
Version
6.0.3
Package
Volo.Abp.BackgroundJobs.HangFire //依赖Hangfire和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs.Quartz //依赖Quartz和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs.RabbitMQ //依赖RabbitMQ和Volo.Abp.BackgroundJobs.Abstractions
Volo.Abp.BackgroundJobs //依赖BackgroundWorker和Volo.Abp.BackgroundJobs.Abstractions,与上第三方组件平级
Volo.Abp.BackgroundJobs.Abstractions
//独立模块
Volo.Abp.BackgroundJobs.Domain
Volo.Abp.BackgroundJobs.Domain.Shared
Volo.Abp.BackgroundJobs.EntityFrameworkCore
Volo.Abp.BackgroundJobs.MongoDB
BackgroundJob
JobArgs
作为最小执行单元,使用时只需继承BackgroundJob或AsyncBackgroundJob。
public class EmailSendingJob : AsyncBackgroundJob<EmailSendingArgs>, ITransientDependency
{
    private readonly IEmailSender _emailSender;
    public EmailSendingJob(IEmailSender emailSender)
    {
        _emailSender = emailSender;
    }
    public override async Task ExecuteAsync(EmailSendingArgs args)
    {
        await _emailSender.SendAsync(args.EmailAddress, args.Subject, args.Body);
    }
}
前提是需要一个JobArgs,来传递Job执行需要的一些参数。
public class EmailSendingArgs
{
    public string EmailAddress { get; set; }
    public string Subject { get; set; }
    public string Body { get; set; }
}
文档中提到BackgroundJobName,在代码中体现为BackgroundJobNameAttribute。
 实则是在当使用了RabbitMQ或者类似的第三方组件时才需要拿到名字作为队列名,如果在Hangfire或者Quartz或者ABP自身的BackgroundJobWorker管理,都无需要使用到BackgroundJobNameAttribute。当然如果加上了,也没有影响,只是加入Job时,会以加上的名字作为Job名,加入到执行队列中。以DefaultBackgroundJobManager为例
实则是在当使用了RabbitMQ或者类似的第三方组件时才需要拿到名字作为队列名,如果在Hangfire或者Quartz或者ABP自身的BackgroundJobWorker管理,都无需要使用到BackgroundJobNameAttribute。当然如果加上了,也没有影响,只是加入Job时,会以加上的名字作为Job名,加入到执行队列中。以DefaultBackgroundJobManager为例
[Dependency(ReplaceServices = true)]
public class DefaultBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    //...
    public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    {
        var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
        var jobId = await EnqueueAsync(jobName, args, priority, delay);
        return jobId.ToString();
    }
    //...
}
当取JobArgs的名字时,优先从Attribute中取,如果没有则使用类名
public class BackgroundJobNameAttribute : Attribute, IBackgroundJobNameProvider
{
    //...
    
    public static string GetName([NotNull] Type jobArgsType)
    {
        Check.NotNull(jobArgsType, nameof(jobArgsType));
        return jobArgsType.GetCustomAttributes(true)
                   .OfType<IBackgroundJobNameProvider>()
                   .FirstOrDefault()?.Name
               ?? jobArgsType.FullName;
    }
}
BackgroundJob
回归到最小执行单元BackgroundJob,ABP中提供了两类抽象接口与抽象实现
 BackgroundJob中并没有什么实质性内容,仅作为抽象封装。
BackgroundJob中并没有什么实质性内容,仅作为抽象封装。
public abstract class AsyncBackgroundJob<TArgs> : IAsyncBackgroundJob<TArgs>
{
    public ILogger<AsyncBackgroundJob<TArgs>> Logger { get; set; }
    protected AsyncBackgroundJob()
    {
        Logger = NullLogger<AsyncBackgroundJob<TArgs>>.Instance;
    }
    public abstract Task ExecuteAsync(TArgs args);
}
BackgroundJob注册
项目启动时,会扫描到所有继承自IBackgroundJob和IAsyncBackgroundJob的实现完成注册并将注册内容存储到AbpBackgroundJobOptions中。
[DependsOn(typeof(AbpJsonModule))]
public class AbpBackgroundJobsAbstractionsModule : AbpModule
{
    public override void PreConfigureServices(ServiceConfigurationContext context)
    {
        RegisterJobs(context.Services);
    }
    private static void RegisterJobs(IServiceCollection services)
    {
        var jobTypes = new List<Type>();
        
        // 扫描实现加入到JobTypes中
        services.OnRegistred(context =>
        {
            if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)) ||
                ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IAsyncBackgroundJob<>)))
            {
                jobTypes.Add(context.ImplementationType);
            }
        });
        
        // 扫描到的结果转换存储到AbpBackgroundJobOptions中
        services.Configure<AbpBackgroundJobOptions>(options =>
        {
            foreach (var jobType in jobTypes)
            {
                options.AddJob(jobType);
            }
        });
    }
}
在Options中,一个属性是控制整个BackgroundJob是否启用,另外的则是使用字典存储所有注册的Job,简化代码如下。
public class AbpBackgroundJobOptions
{
    private readonly Dictionary<Type, BackgroundJobConfiguration> _jobConfigurationsByArgsType;
    public bool IsJobExecutionEnabled { get; set; } = true;
    public BackgroundJobConfiguration GetJob(Type argsType)
    {
        return _jobConfigurationsByArgsType.GetOrDefault(argsType);
    }
    public void AddJob(BackgroundJobConfiguration jobConfiguration)
    {
        _jobConfigurationsByArgsType[jobConfiguration.ArgsType] = jobConfiguration;
    }
}
此处依赖BackgroundJobConfiguration类,该类仅作为一个记录中转,记录Job本身的类型,JobArgs类型,Job名,对这三者信息的封装。
public class BackgroundJobConfiguration
{
    public Type ArgsType { get; }
    public Type JobType { get; }
    public string JobName { get; }
    public BackgroundJobConfiguration(Type jobType)
    {
        JobType = jobType;
        //从JobType中分析出JobArgs的类型,取其泛型参数具体类型
        ArgsType = BackgroundJobArgsHelper.GetJobArgsType(jobType);
        JobName = BackgroundJobNameAttribute.GetName(ArgsType);
    }
}
BackgroundJobExecuter
BackgroundJob类型上分为了两类,ABP代码中并不直接对这两类Job分别调用,而是再包一层,对这两类的调用抽象,得到一个唯一的操作入口。
 ExecuteAsync方法简要如下代码,JobExecutionContext承担要执行的Job信息的承载,和BackgroundJobConfiguration的职责类似。其中完成按类别调用实际的Job,该方法支持override,如果想要自定义一个BackgroundJob类型,可以在此基础上参考。上层的Hangfire/Quartz/RabbitMQ/ABP-BackgroundJobWorker等都是基于IBackgroundJobExecuter来执行,因此此处扩展并不会对上层产生影响。
ExecuteAsync方法简要如下代码,JobExecutionContext承担要执行的Job信息的承载,和BackgroundJobConfiguration的职责类似。其中完成按类别调用实际的Job,该方法支持override,如果想要自定义一个BackgroundJob类型,可以在此基础上参考。上层的Hangfire/Quartz/RabbitMQ/ABP-BackgroundJobWorker等都是基于IBackgroundJobExecuter来执行,因此此处扩展并不会对上层产生影响。
public class BackgroundJobExecuter : IBackgroundJobExecuter, ITransientDependency
{
    //...
    
    public virtual async Task ExecuteAsync(JobExecutionContext context)
    {
        // 从DI中获得Job实例
        var job = context.ServiceProvider.GetService(context.JobType);
        // 按照继承接口类型找到执行方法
        var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute)) ??
                               context.JobType.GetMethod(nameof(IAsyncBackgroundJob<object>.ExecuteAsync));
        //...
        try
        {
            // 分类执行对应方法
            if (jobExecuteMethod.Name == nameof(IAsyncBackgroundJob<object>.ExecuteAsync))
            {
                await ((Task)jobExecuteMethod.Invoke(job, new[] { context.JobArgs }));
            }
            else
            {
                jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
            }
        }
        catch (Exception ex)
        {
            //...
            // 异常转换为BackgroundJob独有异常,便于捕获方便后续Job重试
            throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
            {
                JobType = context.JobType.AssemblyQualifiedName,
                JobArgs = context.JobArgs
            };
        }
    }
}
对于上层调用方(Hangfire/Quartz/RabbitMQ/ABP-BackgroundJob)对BackgroundJobExecuter的具体使用,跳转到下下小节中提及。
BackgroundJobManager
BackgroundJob定义好了,在业务编排中期望在某些点加入一个Job,以便于在特定时间点执行Job。ABP封装了IBackgroundJobManager,用来管理Job入队编排操作。
 在编排时Job加入到执行队列(默认是入库)。在加入时可以指定期望的Job,执行时间,执行优先级等参数。
在编排时Job加入到执行队列(默认是入库)。在加入时可以指定期望的Job,执行时间,执行优先级等参数。
 此处以ABP自身的BackgroundJobManager和基于Hangfire实现的 BackgroundJobManager解析。
此处以ABP自身的BackgroundJobManager和基于Hangfire实现的 BackgroundJobManager解析。
DefaultBackgroundJobManager
ABP实现了默认的BackgroundJobManager(位于Volo.Abp.BackgroundJobs包),将待执行Job信息存储到库中。再由BackgroundJobWorker捞起符合条件的Job执行。
 其中数据来源依赖于IBackgroudJobStore,借助仓储从数据库存入Job数据,并返回JobId,方便于一些场景下跟踪Job执行进度。
其中数据来源依赖于IBackgroudJobStore,借助仓储从数据库存入Job数据,并返回JobId,方便于一些场景下跟踪Job执行进度。
[Dependency(ReplaceServices = true)]
public class DefaultBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    //...
    public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    {
        var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
        var jobId = await EnqueueAsync(jobName, args, priority, delay);
        return jobId.ToString();
    }
    protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    {
        var jobInfo = new BackgroundJobInfo
        {
            Id = GuidGenerator.Create(),
            JobName = jobName,
            JobArgs = Serializer.Serialize(args),
            Priority = priority,
            CreationTime = Clock.Now,
            NextTryTime = Clock.Now
        };
        if (delay.HasValue)
        {
            jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
        }
         
        //保存JobInfo
        await Store.InsertAsync(jobInfo);
        return jobInfo.Id;
    }
}
HangfireBackgroundJobManager
位于Volo.Abp.BackgroundJobs.HangFire包中,依赖Hangfire来管理触发时机和调用。
[Dependency(ReplaceServices = true)]
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public virtual Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    {
        return Task.FromResult(delay.HasValue
            ? BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                adapter => adapter.ExecuteAsync(args),
                delay.Value
            )
            : BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                adapter => adapter.ExecuteAsync(args)
            ));
    }
}
该部分类图简要如下
 将期望执行的Job及参数移交到Hangfire的BackgroundJob类中,该类负责管理时间和触发时机,而具体的执行动作由HangfireBackgroundJobExecutionAdapter来执行,该类承担是否能够执行Job以及调用IBackgroundJobExecuter发起执行。
将期望执行的Job及参数移交到Hangfire的BackgroundJob类中,该类负责管理时间和触发时机,而具体的执行动作由HangfireBackgroundJobExecutionAdapter来执行,该类承担是否能够执行Job以及调用IBackgroundJobExecuter发起执行。
HangfireBackgroundJobManager->Hangfire.BackgroundJob->HangfireBackgroundJobExecutionAdapter->IBackgroundJobExecuter->BackgroundJob
对于Quartz和RabbitMQ流程上类似都是将Job移交到第三方组件管理,但具体的执行又会被回调到IBackgroundJobExecuter中。
AbpBackgroundJobsHangfireModule
值得注意,在AbpBackgroundJobsHangfireModule中也有一个类似HangfireBackgroundWorker一样创建null的BackgroundJobServer的机制,保持所有的Job都可以注册,编排期望的Job,但是在执行时都受阻,并不会真正的执行。
public class AbpBackgroundJobsHangfireModule : AbpModule
{
    public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
    {
        // BackgroundJob开关关闭后,默认使用一个null的Hangfire.BackgroundJobServer
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
        if (!options.IsJobExecutionEnabled)
        {
            var hangfireOptions = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
            hangfireOptions.BackgroundJobServerFactory = CreateOnlyEnqueueJobServer;
        }
    }
    //当开关关闭时,返回null
    private BackgroundJobServer CreateOnlyEnqueueJobServer(IServiceProvider serviceProvider)
    {
        serviceProvider.GetRequiredService<JobStorage>();
        return null;
    }
}
BackgroundJob触发执行
当期望执行时间到达,需要借助一些方式收集到符合条件的Job并执行,使用不同的第三方组件有不同的管理调度策略,默认ABP的是BackgroundJobWorker,对于Hangfire的则是Hangfire.Background来管理,Quartz使用Schedule,RabbitMQ则依靠延时队列,但最终都是调用IBackgroundJobExecuter来执行Job。此处解析ABP的默认BackgroundJobWorker和Hangfire回调过程。

BackgroundJobWorker
对应于ABP的DefaultBackgroundJobManager,ABP提供了BackgroundJobManager,位于Volo.Abp.BackgroundJobs包中,基于BackgroundWorker,周期性的获取符合执行条件的Jobs。
 在AbpBackgroundJobWorkerOptions中默认的执行周期JobPollPeriod为5秒,如有需求可以更改值。
在AbpBackgroundJobWorkerOptions中默认的执行周期JobPollPeriod为5秒,如有需求可以更改值。
[DependsOn(typeof(AbpBackgroundJobsModule))]
public class MyModule : AbpModule
{
    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        Configure<AbpBackgroundJobWorkerOptions>(options =>
        {
            options.DefaultTimeout = 864000; //10 days (as seconds)
        });
    }
}
在BackgroundJobWorker中,通过DI获取到Store,查询到符合条件的Jobs,转交给BackgroundJobExecuter执行,如有失败,则换时间重试。执行顺利,则由Store再移除Job记录。
public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroundJobWorker
{
    protected AbpBackgroundJobOptions JobOptions { get; }
    protected AbpBackgroundJobWorkerOptions WorkerOptions { get; }
    protected IAbpDistributedLock DistributedLock { get; }
    //...
    
    protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
    {
        //...
        var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
        var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount);
        var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
        var clock = workerContext.ServiceProvider.GetRequiredService<IClock>();
        var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();
        foreach (var jobInfo in waitingJobs)
        {
            jobInfo.TryCount++;
            jobInfo.LastTryTime = clock.Now;
            var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
            var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
            var context = new JobExecutionContext(
                workerContext.ServiceProvider,
                jobConfiguration.JobType,
                jobArgs);
            await jobExecuter.ExecuteAsync(context);
            await store.DeleteAsync(jobInfo.Id);
        }
        //...
    }
    //...
}
在模块中,当BackgroundJob开关开启了,默认注册IBackgroundJobWorker。如果想要自定义,可以替换BackgroundJobWorker,再加入自定义的BackgroundJobWorker,但不用在注册。
public class AbpBackgroundJobsModule : AbpModule
{
    public async override Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
        if (options.IsJobExecutionEnabled)
        {
            await context.AddBackgroundWorkerAsync<IBackgroundJobWorker>();
        }
    }
     //...
}
HangfireJobWorker
ABP中并无HangfireJobWorker,仅仅是我抽离出的一个概念,对应平级的BackgroundJobWorker,对于Hangfire其自身的BackgroundJob管理着所有的Job执行,如有符合条件的,Hangfire内部会筛选找到并调用执行,仍在HangfireBackgroundJobManager中,BackgroundJob.Schedule处的回调便是Hangfire中有符合条件的Job后的执行动作对应于BackgroundJobWorker的DoWork。
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public virtual Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
        TimeSpan? delay = null)
    {
        return Task.FromResult(delay.HasValue
            ? BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                adapter => adapter.ExecuteAsync(args),
                delay.Value
            )
            : BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                adapter => adapter.ExecuteAsync(args)
            ));
    }
}
对应于Quartz和RabbitMQ过程类似,此处不再提及。
扩展
BackgroundJob中提供了很多扩展点
- 可以参照IBackgroundJob或IAsyncBackgroundJob实现自定义的BackgroundJob,但需要同样扩展IBackgroundJobExecuter的实现以支持自定义的BackgroundJob类型。 
- 对应于IBackgroundJobManager可以实现自定义的BackgroundJobManager,可以参照已有的实现,来管理入队策略或更换存储源来保存期望执行的Job信息。 
- 对应于IBackgroundJobWorker,同样可以参照BackgroundJobWorker,可以实现自定义的BackgroundJobWorker。 
- 对于IBackgroundJobStore,默认提供了内存中和保存到数据库中两种,也可以进行扩展,实现想要的保存方式,比如保存到Redis,文件等场景。 
2024-04-05,望技术有成后能回来看见自己的脚步。