.NET 云原生架构师训练营(基于 OP Storming 和 Actor 的大型分布式架构二)--学习笔记

时间:2022-12-26 07:09:14
  • 为什么我们用 Orleans
  • Dapr VS Orleans
  • Actor 模型
  • Orleans 的核心概念
  • 结合 OP Storming 的实践

结合 OP Storming 的实践

  • 业务模型
  • 设计模型
  • 代码实现

业务模型

.NET 云原生架构师训练营(基于 OP Storming 和 Actor 的大型分布式架构二)--学习笔记

我们可以把关键对象(职位、客户行为记录、线索)参考为 actor

猎头顾问一边寻找职位,一边寻找候选人,撮合之后匹配成线索,然后推荐候选人到客户公司,进行面试,发放 offer,候选人入职

设计模型

.NET 云原生架构师训练营(基于 OP Storming 和 Actor 的大型分布式架构二)--学习笔记

我们新建职位的时候需要一个参数对象 CreateJobArgument,相当于录入数据

创建了 Job 之后,它有三个行为:浏览、点赞、投递

投递之后会直接产生一个意向的 Thread,可以继续去推进它的状态:推荐 -> 面试 -> offer -> 入职

针对浏览和点赞会产生两种不同的活动记录:ViewActivity 和 StarActivity

代码实现

  • HelloOrleans.Host

HelloOrleans.Host

新建一个空白解决方案 HelloOrleans

创建一个 ASP .NET Core 空项目 HelloOrleans.Host

分别创建 BaseEntity、Job、Thread、Activity 实体

namespace HelloOrleans.Host.Contract.Entity
{
    public class BaseEntity
    {
        public string Identity { get; set; }
    }
}


namespace HelloOrleans.Host.Contract.Entity
{
    public class Job : BaseEntity
    {
        public string Title { get; set; }
        public string Description { get; set; }
        public string Location { get; set; }
    }
}

namespace HelloOrleans.Host.Contract.Entity
{
    public class Thread : BaseEntity
    {
        public string JobId { get; set; }
        public string ContactId { get; set; }
        public EnumThreadStatus Status { get; set; }
    }
}

namespace HelloOrleans.Host.Contract
{
    public enum EnumThreadStatus : int
    {
        Recommend,
        Interview,
        Offer,
        Onboard,
    }
}

namespace HelloOrleans.Host.Contract.Entity
{
    public class Activity : BaseEntity
    {
        public string JobId { get; set; }
        public string ContactId { get; set; }
        public EnumActivityType Type { get; set; }
    }
}

namespace HelloOrleans.Host.Contract
{
    public enum EnumActivityType : int
    {
        View = 1,
        Star = 2,
    }
}

给 Job 添加 View 和 Star 的行为

public async Task View(string contactId)
{

}

public async Task Star(string contactId)
{

}

这里就只差 Grain 的 identity,我们添加 Orleans 的 nuget 包

<PackageReference Include="Microsoft.Orleans.Core" Version="3.6.5" />
<PackageReference Include="Microsoft.Orleans.Server" Version="3.6.5" />
<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.6.5" />
<PackageReference Include="Microsoft.Orleans.OrleansTelemetryConsumers.Linux" Version="3.6.5" />
  • Microsoft.Orleans.Core 是核心
  • Microsoft.Orleans.Server 做 Host 就需要用到它
  • Microsoft.Orleans.CodeGenerator.MSBuild 会在编译的时候帮我们生成客户端或者访问代码
  • Microsoft.Orleans.OrleansTelemetryConsumers.Linux 是监控

安装完后我们就可以继承 Grain 的基类了

using Orleans;

namespace HelloOrleans.Host.Contract.Entity
{
    public class Job : Grain
    {
        public string Title { get; set; }
        public string Description { get; set; }
        public string Location { get; set; }

        public async Task View(string contactId)
        {

        }

        public async Task Star(string contactId)
        {

        }
    }
}

如果我们需要用它来做持久化是有问题的,因为持久化的时候它会序列化我们所有的公有属性,然而在 Grain 里面会有一些公有属性你没有办法给它序列化,所以持久化的时候会遇到一些问题,除非我们把持久化的东西重新写一遍

public abstract class Grain : IAddressable, ILifecycleParticipant<IGrainLifecycle>
{
    public GrainReference GrainReference { get { return Data.GrainReference; } }
    
    /// <summary>
    /// String representation of grain's SiloIdentity including type and primary key.
    /// </summary>
    public string IdentityString
    {
        get { return Identity?.IdentityString ?? string.Empty; }
    }
    
    ...
}

理论上你的状态和行为是可以封装在一起的,这样更符合 OO 的逻辑

我们现在需要分开状态和行为

定义一个 IJobGrain 接口,继承 IGrainWithStringKey,用 string 作为它的 identity 的类型

using Orleans;

namespace HelloOrleans.Host.Contract.Grain
{
    public interface IJobGrain : IGrainWithStringKey
    {
        Task View(string contactId);
    }
}

定义 JobGrain 继承 Grain,实现 IJobGrain 接口

using HelloOrleans.Host.Contract.Entity;
using HelloOrleans.Host.Contract.Grain;
using Orleans;

namespace HelloOrleans.Host.Grain
{
    public class JobGrain : Grain<Job>, IJobGrain
    {
        public Task View(string contactId)
        {
            throw new NotImplementedException();
        }
    }
}

这是使用 DDD 来做的区分开状态和行为,变成贫血模型,是不得已而为之,因为持久化的问题

在 Orleans 的角度而言,它的 Actor 绑定了一个外部的状态,但是实际上我们更希望它们两在一起

它的实体就变成这样

namespace HelloOrleans.Host.Contract.Entity
{
    public class Job
    {
        public string Title { get; set; }
        public string Description { get; set; }
        public string Location { get; set; }
    }
}

Job 不是 Actor 实例,JobGrain 才是 Actor 实例

接下来我们需要做一个 Host 让它跑起来

添加 nuget 包

<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />

在 Program 中需要通过 WebApplication 的 Builder 配置 Orleans

builder.Host.UseOrleans(silo =>
{
    silo.UseLocalhostClustering();
    silo.AddMemoryGrainStorage("hello-orleans");
});

在 JobGrain 中使用 hello-orleans 这个 Storage 标识一下

[StorageProvider(ProviderName = "hello-orleans")]
public class JobGrain : Grain<Job>, IJobGrain

添加 JobController,这属于前面讲的 silo 内模式,可以直接使用 IGrainFactory,因为这是在同一个项目里

using Microsoft.AspNetCore.Mvc;
using Orleans;

namespace HelloOrleans.Host.Controllers
{
    [Route("job")]
    public class JobController : Controller
    {
        private IGrainFactory _factory;

        public JobController(IGrainFactory grainFactory)
        {
            _factory = grainFactory;
        }
    }
}

添加一个创建方法 CreateAsync,它的入参叫做 CreateJobViewModel,包含我们需要的 Job 的数据

[Route("")]
[HttpPost]
public async Task<IActionResult> CreateAsync([FromBody] CreateJobViewModel model)
{
    var jobId = Guid.NewGuid().ToString();
    var jobGrain = _factory.GetGrain<IJobGrain>(jobId);
}

创建的时候 Grain 是不存在的,必须有 identity,不然 Actor 获取不到,所以需要先 new 一个 identity,就是 jobId

通过 IGrainFactory 获取到 jobGrain 之后我们是无法获取到它的 state,只能看到它的行为,所以我们需要在 Grain 里面添加一个 Create 的方法方便我们调用

using HelloOrleans.Host.Contract.Entity;
using Orleans;

namespace HelloOrleans.Host.Contract.Grain
{
    public interface IJobGrain : IGrainWithStringKey
    {
        Task<Job> Create(Job job);
        Task View(string contactId);
    }
}

所以这个 Create 方法并不是真正的 Create,只是用来设置 state 的对象,再通过 WriteStateAsync 方法保存

using HelloOrleans.Host.Contract.Entity;
using HelloOrleans.Host.Contract.Grain;
using Orleans;
using Orleans.Providers;

namespace HelloOrleans.Host.Grain
{
    [StorageProvider(ProviderName = "hello-orleans")]
    public class JobGrain : Grain<Job>, IJobGrain
    {
        public async Task<Job> Create(Job job)
        {
            job.Identity = this.GetPrimaryKeyString();
            this.State = job;
            await this.WriteStateAsync();
            return this.State;
        }

        public Task View(string contactId)
        {
            throw new NotImplementedException();
        }
    }
}

new 一个 job,调用 Create 方法设置 State,得到一个带 identity 的 job,然后返回 OK

[Route("")]
[HttpPost]
public async Task<IActionResult> CreateAsync([FromBody] CreateJobViewModel model)
{
    var jobId = Guid.NewGuid().ToString();
    var jobGrain = _factory.GetGrain<IJobGrain>(jobId);

    var job = new Job()
    {
        Title = model.Title,
        Description = model.Description,
        Location = model.Location,
    };
    job = await jobGrain.Create(job);
    return Ok(job);
}

因为我们现在采用的是内存级别的 GrainStorage,所以我们没有办法去查看它

我们再加一个 Get 的方法去查询它

[Route("{jobId}")]
[HttpGet]
public async Task<IActionResult> GetAsync(string jobId)
{
    var jobGrain = _factory.GetGrain<IJobGrain>(jobId);
}

这个时候我们需要去 Grain 的接口里面加一个 Get 方法

using HelloOrleans.Host.Contract.Entity;
using Orleans;

namespace HelloOrleans.Host.Contract.Grain
{
    public interface IJobGrain : IGrainWithStringKey
    {
        Task Create(Job job);
        Task<Job> Get();
        Task View(string contactId);
    }
}

Get 方法是不需要传 id 的,因为这个 id 就是 Grain 的 id,你激活的时候就已经有了,直接返回 this.State

using HelloOrleans.Host.Contract.Entity;
using HelloOrleans.Host.Contract.Grain;
using Orleans;
using Orleans.Providers;

namespace HelloOrleans.Host.Grain
{
    [StorageProvider(ProviderName = "hello-orleans")]
    public class JobGrain : Grain<Job>, IJobGrain
    {
        public async Task Create(Job job)
        {
            this.State = job;
            await this.WriteStateAsync();
        }

        public Task<Job> Get()
        {
            return Task.FromResult(this.State);
        }

        public Task View(string contactId)
        {
            throw new NotImplementedException();
        }
    }
}

这个地方所有你的行为都不是直接去查数据库,而是利用这个 State,它不需要你自己去读取,跟 DDD 的 repository 不同

直接通过 Grain 的 Get 方法获取 Job 返回 OK

[Route("{jobId}")]
[HttpGet]
public async Task<IActionResult> GetAsync(string jobId)
{
    var jobGrain = _factory.GetGrain<IJobGrain>(jobId);
    return Ok(await jobGrain.Get());
}

这里我们可以再加点校验逻辑

[Route("{jobId}")]
[HttpGet]
public async Task<IActionResult> GetAsync(string jobId)
{
    if (string.IsNullOrEmpty(jobId))
    {
        throw new ArgumentNullException(nameof(jobId));
    }

    var jobGrain = _factory.GetGrain<IJobGrain>(jobId);
    return Ok(await jobGrain.Get());
}

要注意如果你传入的 jobId 是不存在的,因为不管你传什么,只要是一个合法的字符串,并且不重复,它都会帮你去激活,只不过在于它是否做持久化而已,如果你随便传了一个 jobId,这个时候不是调了 Get 方法,它可能也会返回给你一个空的 state,所以这个 jobId 没有这种很强的合法性的约束,在调 Get 的时候要特别的注意,不管是 Create 还是 Get,其实都是调用了 GetGrain,传了一个 identity 进去,这样的一个行为

在 Program 中添加 Controller 的配置

using Orleans.Hosting;

var builder = WebApplication.CreateBuilder(args);

builder.Host.UseOrleans(silo =>
{
    silo.UseLocalhostClustering();
    silo.AddMemoryGrainStorage("hello-orleans");
});
builder.Services.AddControllers();

var app = builder.Build();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
    endpoints.MapControllers();
});

app.MapGet("/", () => "Hello World!");

app.Run();

我们启动项目测试一下

Create 方法入参

{
	"title": "第一个职位",
	"description": "第一个职位"
}

.NET 云原生架构师训练营(基于 OP Storming 和 Actor 的大型分布式架构二)--学习笔记

可以看到方法调用成功,返回的 job 里面包含了 identity

接着我们使用 Create 方法返回的 identity 作为入参调用 Get 方法

.NET 云原生架构师训练营(基于 OP Storming 和 Actor 的大型分布式架构二)--学习笔记

可以看到方法调用成功,返回同一个 job

这种基于内存的存储就很适合用来做单元测试

.NET 云原生架构师训练营(基于 OP Storming 和 Actor 的大型分布式架构二)--学习笔记

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。

如有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。