本篇参考:https://developer.salesforce.com/blogs/2018/07/which-streaming-event-do-i-use.html
https://trailhead.salesforce.com/content/learn/modules/platform_events_basics
我们在之前的篇章中讲过 Streaming API的使用,可以参看:salesforce零基础学习(八十五)streaming api 简单使用(接近实时获取你需要跟踪的数据的更新消息状态)
今天讲的是另外一个针对消息发布订阅的实现,即 Platform Event。Platform Event的详细描述可以参看上面的trailhead练习,如何选取streaming event可以参看第一个链接。接下来通过三步来讲 Platform Event。
一. 什么是Platform Event
Platform Event是一个基于发布订阅的消息管理架构。通过Platform Event,你可以监听你的系统数据并且满足某种条件下,和自己或者其他外部系统进行交流通讯。比如salesforce系统的Account数据在其他的外部系统有备份。当 Account的Owner或者Account类型有改变以后,需要外部关联的系统同时知道当前的数据进行了改变并作出相应的处理。我们以前针对此种case可能通过callout,将我们的数据变化通过http方式发布到对端系统,然后对端系统进行处理以后返回response告诉salesforce系统处理完成。此种方式可能有以下的问题:
- 当数据量大的时候,因为salesforce有callout的各种limitation,容易有government limitation的风险;
- 当对端系统迁移或者需要增加对端系统时,可能需要有额外的effort进行处理。
Platform Event是基于发布订阅模式,即salesforce发布了消息以后,所有他的订阅者理论上都有权力去订阅到发布的数据。如果有新增的订阅者,我们只需要配置相关的订阅端即可。需要了解的是,使用Platform Event不仅可以和内部系统进行发布订阅,也可以和外部系统进行连携。Platform Event有几个名词需要了解。
- Event:在业务流程中有意义的状态更改,比如account change owner
- Event message:Event发生时需要发送的消息。通常我们需要创建 PlatForm Event来进行对应,这个后期讲
- Event producer:消息的发布者
- Event channel:消息发布者发布消息的渠道,这里我们用到的就是Event Bus
- Event consumer:从渠道中接受消息的订阅者。
所以我们将操作连接起来可以理解成:当salesforce某些有意义的业务数据变化以后,发布者通过Event Bus这个渠道进行了数据发布,将数据广播出去,订阅者通过EventBus进行数据的接收即完成了数据的发布以及订阅。数据的发布和订阅可以有多种方式实现,篇中的头部URL展示了Platform Event的开发文档,包含了全部的操作手册,本篇只是以其中的trigger方式进行扫盲,详情操作还请查看官方这个API文档。
二. PlatForm发布
1)创建PlatForm Event:在Set Up处搜索platform event,在Integrations下面点击Platform Events便可以看到系统配置的Platform Event信息,点击New Platform Event即可新建一条我们需要的PlatForm Event
2)当我们创建以后,可以选择Publish Behavior,salesforce提供了两个值可供选择: Publish After Commit / Publish Immediately。这两种有很大的区别:
- Publish Immediately当对一个platform event列表进行publish时,即使有一些失败,成功的那些也会继续进行发布,即保存时allOrNone属性会被自动忽略;
- Publish Immediately如果设置了此种模式, setSavePoint以及rollback不支持,即发送了就发送了,没有某种case回滚模式;反之Publish After Commit可以有回滚操作。
Event Type选择的值为High Volume,salesforce可以将发布的广播数据保存三天。
3)当我们创建完以后,我们会发现API Name是__e结尾的。针对Platform Event是以这种特殊的表进行存储,当然里面的数据不支持SOQL或者apex方式去搜索出来,订阅者只能通过其他的方式去查询。标准的字段区域我们发现有一个 Reply Id。这个字段是一个非常重要的字段。有什么作用呢?
每一个Event Message都会包含一个 Reply Id,当广播传递给订阅者时由系统填充,代表着当前的事件在时间流中的位置(index)。对于连续的事件,Reply Id不一定是连续的,所以不要考虑使用+1的方式去找到下一个 ReplyId.当订阅者因为某些原因丢失了接受的订阅的信息,可以通过Reply Id去找回,但是因为 Publish Event只保留3天,所以需要在3天以内去完成。
同时我们也可以看到,下方区域还包含Trigger以及Subscription,这意味着Publish Event可以进行trigger操作,但是只能监听trigger.afterInsert,其他的不支持。我们也可以进行创建字段操作去创建我们需要的类型以及变量数据。
4)当我们点击Custom Fields New按钮以后,我们可以看到包含以下的字段类型可供选择。按照需求选择我们需要的字段类型创建字段即可。
如图我们创建了三个字段,和自定义字段类似,后缀也是__c,所以很好理解。
我们的模拟需求外部系统有备份salesforce的Account数据,当Account的industry变化以后,会发一个Task告诉Owner,同时将变化的数据告诉外部连携系统。Publish Platform可以有很多种方式,可以通过apex / process builder / flow / rest api(通常demo中会使用workbench来操作)。我们的实现为使用apex trigger来实现。
trigger中核心的一个语句就是EventBus.publish了,和Datebase.insert一样,可以传递一个list或者一个item都可以。以下的代码便可以实现了一个Platform Event的发布广播操作。
trigger AccountTrigger on Account(after update) {
List<Account__e> needPublishAccountList = new List<Account__e>();
for(Account accountItem : (List<Account>)Trigger.new) {
Account oldAccount = Trigger.oldMap.get(accountItem.Id);
if(oldAccount != null && oldAccount.Industry != accountItem.Industry) {
Account__e accountPublishItem = new Account__e();
accountPublishItem.Account_Id__c = accountItem.Id;
accountPublishItem.After_Industry__c = accountItem.Industry;
accountPublishItem.Before_Industry__c = oldAccount.Industry;
needPublishAccountList.add(accountPublishItem);
}
} if(!needPublishAccountList.isEmpty()) {
List<Database.SaveResult> results = EventBus.publish(needPublishAccountList);
for (Database.SaveResult sr : results) {
if (sr.isSuccess()) {
System.debug('Successfully published event.');
} else {
for(Database.Error err : sr.getErrors()) {
System.debug('Error returned: ' +
err.getStatusCode() +
' - ' +
err.getMessage());
}
}
}
} }
三. PlatForm Event的订阅
既然Event已经发布了,我们需要知道我们写的代码是否正确,订阅者是否可以收到信息。上面也说过通过SOQL没法搜索到Platform Event的信息,那么我们应该如何去查询或者操作去了解是否发布成功呢?官方提供了特别多的订阅方式。比如我们可以通过apex trigger / lightning component / lightning web component / process builder / CometD(针对外部系统连携可以通过轮询去订阅)
所以我们使用哪种方式的订阅要取决于我们项目的需要,如果订阅在内部系统执行,作为公用操作可以使用trigger。如果我们需要在程序中监听,可以使用lightning aura或者lwc去监听。并进行页面实时变更操作。如果外部连携,可以考虑使用CometD进行连携。具体的使用方式可以查看上面的官方文档链接,本篇demo主要通过trigger/去实现订阅的测试。
AccountEventTrigger.trigger:只能使用after insert,用于广播以后针对owner创建一个task信息。
trigger AccountEventTrigger on Account__e (after insert) {
List<Task> taskList = new List<Task>();
for (Account__e accountEvent : Trigger.New) {
Task taskItem = new Task();
taskItem.Subject = 'Industry change reminder';
taskItem.Description = 'old industry : ' + accountEvent.Before_Industry__c + ' new industry : ' + accountEvent.After_Industry__c;
taskItem.ActivityDate = System.today();
taskItem.Priority = 'Medium';
taskItem.OwnerId = UserInfo.getUserId();
taskList.add(taskItem);
}
insert taskList;
}
我们在订阅处便可以看到trigger作为订阅者实现了订阅。
总结:Platform Event在和内部外部系统信息交互有着很好用的效果,本篇只是简单的进行功能扫盲,详情使用自行查看开发文档。