大家好,我是失业在家,正在找工作的博主Jerry,找工作之余,总结和整理以前的项目经验,动手写了个洋葱架构(整洁架构)示例解决方案 OnionArch。其目的是为了更好的实现基于DDD(领域驱动分析)和命令查询职责分离(CQRS)的洋葱架构。
OnionArch 是用来实现单个微服务的。它提供了Grpc接口和Dapr Side Car进行交互,通过Dapr来实现微服务之间的接口调用、事件发布订阅等微服务特性。但是,Dapr官方文档上只有Go语言的Grpc的微服务调用示例,没有事件发布和订阅示例,更没有基于Grpc通讯用.Net实现的事件订阅和发布示例。
一、实现目标
为了方便大家写代码,本文旨在介绍如何通过Dapr实现.Net Grpc服务之间的发布和订阅,并采用与WebApi类似的事件订阅方式。
如果是Dapr Side Car通过Web Api和微服务引用交互,在WebApi中实现事件订阅非常简单,只要在Action 上增加“[Topic("pubsub", "TestTopic")]” Attribute即可,可如果Dapr是通过Grpc和Grpc服务交互就不能这样写了。
为了保持WebApi和Grpc事件订阅代码的一致性,本文就是要在Grpc通讯的情况下实现如下写法来订阅并处理事件。
[Topic("pubsub", "TestTopic")]
public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)
{
string message = "TestTopicEvent" + request.EventData.Name;
Console.WriteLine(message);
return Task.FromResult(new HelloReply
{
Message = message
});
}
二、实现方案
Dapr实现.Net Grpc服务之间的发布和订阅,根据官方文档,需要重写AppCallback.AppCallbackBase Grpc类的ListTopicSubscriptions方法和OnTopicEvent方法,ListTopicSubscriptions是给Dapr调用获取该微服务已订阅的事件,OnTopicEvent给Dapr调用以触发事件到达处理逻辑。但是这样就需要在AppCallback.AppCallbackBase实现类中硬编码已订阅的事件和事件处理逻辑。显然不符合我们的实现目标。
参考Dapr SDK中关于WebApi 订阅查询接口“http://localhost:<appPort>/dapr/subscribe”的实现代码,可以在AppCallback.AppCallbackBase实现类的ListTopicSubscriptions方法中,采用相同的方式,在Grpc方法中查询Topic Attribute的方式来搜索已订阅的事件。这样就不用在ListTopicSubscriptions中硬编码已订阅的事件了。
为了避免在OnTopicEvent方法中应编码事件处理逻辑,就需要在接收到事件触发后动态调用Grpc方法。理论上,只要有proto文件就可以动态调用Grpc方法,而proto文件本来就在项目中。但是,我没找到.Net动态调用Grpc方法的相关资料,不知道大家有没有?
我这里采用了另一种方式,根据我上一篇关于.Net 7.0 RC gRPC JSON 转码为 Swagger/OpenAPI文档。Grpc方法可以增加一个转码为Json的WebApi调用。这样就可以在OnTopicEvent方法中接收到事件触发后,通过HttpClient post到对应的WebApi地址,曲线实现动态调用Grpc方法。是不是有点脱裤子放屁的感觉?
三、代码实现
我的解决方案如下,GrpcServiceA发布事件,GrpcServiceB接收事件并处理。
实现事件发布
GrpcServiceA发布事件比较简单,和WebApi的方式是一样一样的。
public async override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
//await _daprClient.SaveStateAsync("statestore", "testKey", request.Name);
EventData eventData = new EventData() { Id = 6, Name = request.Name, Description = "Looking for a job" };
await _daprClient.PublishEventAsync<EventData>("pubsub", "TestTopic", eventData);
return new HelloReply
{
Message = "Hello" + request.Name
};
}
_daprClient怎么来的?我参考Dapr .Net SDK的代码,给IGrpcServerBuilder 增加了扩展方法:
public static IGrpcServerBuilder AddDapr(this IGrpcServerBuilder builder, Action<DaprClientBuilder> configureClient = null)
{
if (builder is null)
{
throw new ArgumentNullException(nameof(builder));
}
// This pattern prevents registering services multiple times in the case AddDapr is called
// by non-user-code.
if (builder.Services.Any(s => s.ImplementationType == typeof(DaprMvcMarkerService)))
{
return builder;
}
builder.Services.AddDaprClient(configureClient);
builder.Services.AddSingleton<DaprMvcMarkerService>();
return builder;
}
private class DaprMvcMarkerService
{
}
View Code
然后就可以这样把DaprClient依赖注入到服务中。
builder.Services.AddGrpc().AddJsonTranscoding().AddDapr();
实现事件订阅
根据上述实现方案,GrpcServiceB接收事件并处理有点复杂,参考我Grpc接口转码Json的的内容,在要接收事件的Grpc方法上增加转码Json WebApi 配置。
rpc TestTopicEvent (TestTopicEventRequest) returns (HelloReply){
option (google.api.http) = {
post: "/v1/greeter/testtopicevent",
body: "eventData"
};
}
增加google.api.http选项,,可以通过post eventData 数据到地址“/v1/greeter/testtopicevent”调用该Grpc方法。然后实现该Grpc接口。
[Topic("pubsub", "TestTopic")]
public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)
{
string message = "TestTopicEvent" + request.EventData.Name;
Console.WriteLine(message);
return Task.FromResult(new HelloReply
{
Message = message
});
}
我重用了Dapr .Net SDK 的Topic Attribute来标记该Grpc的实现接口,这样就可以搜索所有带Topic Attribute的Grpc方法来获取已经订阅的事件。
接下来才是重头戏,重写AppCallback.AppCallbackBase Grpc接口类的ListTopicSubscriptions方法和OnTopicEvent方法
public async override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context)
{
var result = new ListTopicSubscriptionsResponse();
var subcriptions = _endpointDataSource.GetDaprSubscriptions(_loggerFactory);
foreach (var subscription in subcriptions)
{
TopicSubscription subscr = new TopicSubscription()
{
PubsubName = subscription.PubsubName,
Topic = subscription.Topic,
Routes = new TopicRoutes()
};
subscr.Routes.Default = subscription.Route;
result.Subscriptions.Add(subscr);
}
return result;
}
该方法返回所有已订阅的事件和对应的WebApi Url,将事件对应的WebApi地址放入subscr.Routes.Default中。
其中_endpointDataSource.GetDaprSubscriptions 方法参考了Dapr .Net SDK的实现。
public static List<Subscription> GetDaprSubscriptions(this EndpointDataSource dataSource, ILoggerFactory loggerFactory, SubscribeOptions options = null)
{
var logger = loggerFactory.CreateLogger("DaprTopicSubscription");
var subscriptions = dataSource.Endpoints
.OfType<RouteEndpoint>()
.Where(e => e.Metadata.GetOrderedMetadata<ITopicMetadata>().Any(t => t.Name != null)) // only endpoints which have TopicAttribute with not null Name.
.SelectMany(e =>
{
var topicMetadata = e.Metadata.GetOrderedMetadata<ITopicMetadata>();
var originalTopicMetadata = e.Metadata.GetOrderedMetadata<IOriginalTopicMetadata>();
var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();
for (int i = 0; i < topicMetadata.Count(); i++)
{
subs.Add((topicMetadata[i].PubsubName,
topicMetadata[i].Name,
(topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic,
(topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload,
topicMetadata[i].Match,
topicMetadata[i].Priority,
originalTopicMetadata.Where(m => (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.OwnedMetadatas?.Any(o => o.Equals(m.Id)) == true || string.IsNullOrEmpty(m.Id))
.GroupBy(c => c.Name)
.ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()),
(topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator,
e.RoutePattern));
}
return subs;
})
.Distinct()
.GroupBy(e => new { e.PubsubName, e.Name })
.Select(e => e.OrderBy(e => e.Priority))
.Select(e =>
{
var first = e.First();
var rawPayload = e.Any(e => e.EnableRawPayload.GetValueOrDefault());
var metadataSeparator = e.FirstOrDefault(e => !string.IsNullOrEmpty(e.MetadataSeparator)).MetadataSeparator ?? ",";
var rules = e.Where(e => !string.IsNullOrEmpty(e.Match)).ToList();
var defaultRoutes = e.Where(e => string.IsNullOrEmpty(e.Match)).Select(e => RoutePatternToString(e.RoutePattern)).ToList();
//var defaultRoute = defaultRoutes.FirstOrDefault();
var defaultRoute = defaultRoutes.LastOrDefault();
//multiple identical names. use comma separation.
var metadata = new Metadata(e.SelectMany(c => c.OriginalTopicMetadata).GroupBy(c => c.Key).ToDictionary(c => c.Key, c => string.Join(metadataSeparator, c.SelectMany(c => c.Value).Distinct())));
if (rawPayload || options?.EnableRawPayload is true)
{
metadata.Add(Metadata.RawPayload, "true");
}
if (logger != null)
{
if (defaultRoutes.Count > 1)
{
logger.LogError("A default subscription to topic {name} on pubsub {pubsub} already exists.", first.Name, first.PubsubName);
}
var duplicatePriorities = rules.GroupBy(e => e.Priority)
.Where(g => g.Count() > 1)
.ToDictionary(x => x.Key, y => y.Count());
foreach (var entry in duplicatePriorities)
{
logger.LogError("A subscription to topic {name} on pubsub {pubsub} has duplicate priorities for {priority}: found {count} occurrences.", first.Name, first.PubsubName, entry.Key, entry.Value);
}
}
var subscription = new Subscription()
{
Topic = first.Name,
PubsubName = first.PubsubName,
Metadata = metadata.Count > 0 ? metadata : null,
};
if (first.DeadLetterTopic != null)
{
subscription.DeadLetterTopic = first.DeadLetterTopic;
}
// Use the V2 routing rules structure
if (rules.Count > 0)
{
subscription.Routes = new Routes
{
Rules = rules.Select(e => new Rule
{
Match = e.Match,
Path = RoutePatternToString(e.RoutePattern),
}).ToList(),
Default = defaultRoute,
};
}
// Use the V1 structure for backward compatibility.
else
{
subscription.Route = defaultRoute;
}
return subscription;
})
.OrderBy(e => (e.PubsubName, e.Topic));
return subscriptions.ToList();
}
private static string RoutePatternToString(RoutePattern routePattern)
{
return string.Join("/", routePattern.PathSegments
.Select(segment => string.Concat(segment.Parts.Cast<RoutePatternLiteralPart>()
.Select(part => part.Content))));
}
注意标红的哪一行是我唯一改动的地方,因为Grpc接口增加了Web Api配置后会返回两个Route,一个是原始Grpc的,一个是WebApi的,我们需要后面那个。
接着重写OnTopicEvent方法
public async override Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context)
{
TopicEventResponse topicResponse = new TopicEventResponse();
string payloadString = request.Data.ToStringUtf8();
Console.WriteLine("OnTopicEvent Data:" + payloadString);
HttpContent postContent = new StringContent(payloadString, new MediaTypeWithQualityHeaderValue("application/json"));
var response = await _httpClient4TopicEvent.PostAsync("http://" + context.Host + "/" + request.Path, postContent);
string responseContent = await response.Content.ReadAsStringAsync();
Console.WriteLine(responseContent);
if (response.IsSuccessStatusCode)
{
Console.WriteLine("OnTopicEvent Invoke Success.");
topicResponse.Status = TopicEventResponseStatus.Success;
}
else
{
Console.WriteLine("OnTopicEvent Invoke Error.");
topicResponse.Status = TopicEventResponseStatus.Drop;
}
return topicResponse;
}
这里简单处理了事件触发的返回参数TopicEventResponse ,未处理重试的情况。request.path是在ListTopicSubscriptions方法中返回给Dapr的事件对应的WebApi调用地址。
参数_httpClient4TopicEvent是这样注入的:
builder.Services.AddHttpClient("HttpClient4TopicEvent", httpClient =>
{
httpClient.DefaultRequestVersion = HttpVersion.Version20;
httpClient.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrHigher;
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
}
);
因为Grpc是基于Http2.0及以上版本的,所以需要修改HtttpClient默认配置,不然无法访基于Http2.0的WebApi。
然后将我们的AppCallback.AppCallbackBase实现类DaprAppCallbackService Map到GrpcService即可。
app.MapGrpcService<DaprAppCallbackService>();
四、实现效果
分别通过Dapr运行ServiceA和ServiceB微服务,注意指定--app-protocol协议为Grpc,我这里还使用了.Net 热重载技术。
dapr run --app-protocol grpc --app-id serviceA --app-port 5002 --dapr-grpc-port 50002 -- dotnet watch run --launch-profile https
dapr run --app-protocol grpc --app-id serviceB --app-port 5003 --dapr-grpc-port 50003 -- dotnet watch run --launch-profile https
在ServiceA中发布事件
在ServiceB中查看已订阅的事件和接收到的事件触发
五、找工作
▪ 博主有15年以上的软件技术实施经验(Technical Leader),专注于微服务(Dapr)和云原生(K8s)软件架构设计、专注于 .Net Core\Java开发和Devops构建发布。
▪ 博主10年以上的软件交付管理经验(Project Manager & Product Ower),致力于敏捷(Scrum)项目管理、软件产品业务需求分析和原型设计。
▪ 博主熟练配置和使用 Microsoft Azure云。
▪ 博主为人诚恳,积极乐观,工作认真负责。
我家在广州,也可以去深圳工作。做架构师、产品经理、项目经理都可以。有工作机会推荐的朋友可以加我微信 15920128707,微信名字叫Jerry。
本文源代码在这里:iamxiaozhuang/TestDaprGrpcSubscripber (github.com) 大家可以随便取用。