Oracle Coherence中文教程二十一:使用缓存事件

时间:2020-12-12 07:39:37

使用缓存事件

Coherence提供缓存使用JavaBean事件模型的事件。收到你的需要,你需要他们的地方,不管变化实际上是发生在集群中的事件,这是非常简单的。熟悉JavaBean模型的开发应与事件工作有没有困难,即使在复杂的群集。

本章包含以下各节:

    监听器接口和事件对象
    了解事件担保
    支持活动的缓存和类
    注册的所有活动
    使用内部类作为MapListener
    配置一个MapListener一个Cache
    具体身份签约活动
    过滤事件
    “精简版的活动
    高级:听查询
    高级:合成事件
    高级:背胶地图事件
    高级:同步事件监听器

21.1监听器接口和事件对象

JavaBeans事件模型中,有一个EventListener接口,所有的听众必须延长。 , Coherence提供一个MapListener接口,它允许接收事件的应用程序逻辑连贯性的高速缓存中的数据时,添加,修改或删除。

21-1说明的摘录从com.tangosol.util.MapListener API 

21-1摘自的MapListener API

public interface MapListener

        extends EventListener

    {

    /**

    * Invoked when a map entry has been inserted.

    *

    * @param evt  the MapEvent carrying the insert information

    */

    public void entryInserted(MapEvent evt);

    /**

    * Invoked when a map entry has been updated.

    *

    * @param evt  the MapEvent carrying the update information

    */

    public void entryUpdated(MapEvent evt);

    /**

    * Invoked when a map entry has been removed.

    *

    * @param evt  the MapEvent carrying the delete information

    */

    public void entryDeleted(MapEvent evt);

    }

实现了的MapListener界面的应用程序对象可以报名参加任何连贯的缓存或类实现ObservableMap接口的事件,只需通过一个实例应用程序的MapListener的实施到addMapListener ()方法。

MapEvent对象传递给MapListener的进行所有必要的有关事件的信息,已经发生,包括对引发事件源( ObservableMap ) ,身份(键) ,该事件涉及到什么样的行动是反对该标识(插入,更新或删除) ,旧值和新的值是什么:

21-2说明的摘录从com.tangosol.util.MapEvent API 

21-2摘自的MapEvent API

public class MapEvent

        extends EventObject

    {

    /**

    * Return an ObservableMap object on which this event has actually

    * occurred.

    *

    * @return an ObservableMap object

    */

    public ObservableMap getMap()

    /**

    * Return this event's id. The event id is an ENTRY_*

    * enumerated constants.

    *

    * @return an id

    */

    public int getId()

    /**

    * Return a key associated with this event.

    *

    * @return a key

    */

    public Object getKey()

    /**

    * Return an old value associated with this event.

    * <p>

    * The old value represents a value deleted from or updated in a map.

    * It is always null for "insert" notifications.

    *

    * @return an old value

    */

    public Object getOldValue()

    /**

    * Return a new value associated with this event.

    * <p>

    * The new value represents a new value inserted into or updated in

    * a map. It is always null for "delete" notifications.

    *

    * @return a new value

    */

    public Object getNewValue()

    // ----- Object methods -------------------------------------------------

    /**

    * Return a String representation of this MapEvent object.

    *

    * @return a String representation of this MapEvent object

    */

    public String toString()

    // ----- constants ------------------------------------------------------

    /**

    * This event indicates that an entry has been added to the map.

    */

    public static final int ENTRY_INSERTED = 1;

    /**

    * This event indicates that an entry has been updated in the map.

    */

    public static final int ENTRY_UPDATED  = 2;

    /**

    * This event indicates that an entry has been removed from the map.

    */

    public static final int ENTRY_DELETED  = 3;

    }

21.2了解事件担保

分区的高速缓存服务保证,正常情况下,一个事件只传递一次。不过,也有两种情况,可能会打破这种担保:

    灾难性的群集故障造成的数据丢失(例如,两台机器,保存数据的同时崩溃) 。在这种情况下,的PARTITION_LOST事件发射到所有的注册PartitionListener实例在服务器端。

    客户端断开连接。在这种情况下,的MEMBER_LEFT事件被发射到在客户端上的所有的注册MemberListener实例。

21.3 支持时间的CachesClasses

所有Coherence缓存实现ObservableMap ;事实上, NamedCache的所实作所有相干缓存的扩展接口的ObservableMap接口。这意味着应用程序可以注册接收事件从任何缓存,该缓存是否是local, partitioned, near, replicated, using read-through, write-through, write-behind, overflow, disk storage,等等。

注意事项:
无论高速缓存拓扑结构的服务器的数量,并且即使修改由其他服务器,事件被传递到该应用程序的侦听器。

除了Coherence高速缓存(得到这些对象通过相干缓存工厂) ,其他几类相干还实现了ObservableMap的接口:

    ObservableHashMap
    LocalCache
    OverflowMap
    NearCache
    ReadWriteBackingMap
    AbstractSerializationCache , SerializationCache , SerializationPagedCache
    WrapperObservableMap , WrapperConcurrentMap , WrapperNamedCache

对于公布的实现类的完整列表,请参阅的相干的Javadoc ObservableMap 

21.4签署的所有事件

要注册事件,简单地传递一个对象来实现的MapListener接口的addMapListenerObservableMap方法。的addMapListener方法中示出在实施例21-3中。

21-3方法上ObservableMap API

public void addMapListener(MapListener listener);

public void addMapListener(MapListener listener, Object oKey, boolean fLite);

public void addMapListener(MapListener listener, Filter filter, boolean fLite);

让我们创建一个例子MapListener实现。例21-4给出了一个示例MapListener实现打印每一个事件,因为它收到。

示例21-4示例MapListener实施

/**

* A MapListener implementation that prints each event as it receives

* them.

*/

public static class EventPrinter

        extends Base

        implements MapListener

    {

    public void entryInserted(MapEvent evt)

        {

        out(evt);

        }

    public void entryUpdated(MapEvent evt)

        {

        out(evt);

        }

    public void entryDeleted(MapEvent evt)

        {

        out(evt);

        }

    }

使用本实施方案,打印所有事件从任何给定的缓存(因为所有的缓存实现ObservableMap接口)是非常简单的:

cache.addMapListener(new EventPrinter());

当然,可以以后删除监听器,它是必要的持有的引用听者的:

21-5控股的参考监听

Listener listener = new EventPrinter();

cache.addMapListener(listener);

m_listener = listener; // store the listener in a field


后来,删除监听器:

21-6删除监听器

Listener listener = m_listener;

if (listener != null)

    {

    cache.removeMapListener(listener);

    m_listener = null; // clean up the listener field

    }

每个addMapListener方法上ObservableMap接口有相应的removeMapListener方法。要删除监听器,使用方法,对应的的addMapListener方法被用来添加监听removeMapListener 

21.5使用内部类作为MapListener

当创建一个的内部类作为MapListener使用,或当执行一个MapListener ,只听一或两种类型的事件(插入,更新或删除) ,你可以使用基类, AbstractMapListener 。例如,例21-7匿名内部类只打印出的插入事件的缓存。

21-7内部类只打印缓存中插入事件

cache.addMapListener(new AbstractMapListener()

    {

    public void entryInserted(MapEvent evt)

        {

        out(evt);

        }

    });

另一个有用的基类,用于创建MapListener的是的MultiplexingMapListener ,这所有的事件路由到一个单一的处理方法。这一类可以简化EventPrinter的例子,例21-8所示的代码。由于只有一个方法必须实现捕捉到的所有事件, MultiplexingMapListener也可以是非常有用的,当创建内部类作为MapListener使用的。

21-8路由到一个单一的方法处理所有活动

public static class EventPrinter

        extends MultiplexingMapListener

    {

    public void onMapEvent(MapEvent evt)

        {

        out(evt);

        }

    }

21.6 配置MapListener缓存

如果监听器应该总是​​在一个特定的缓存中,然后将其放置到缓存配置自动添加监听器配置缓存时使用的<listener>元素与连贯。

21.7签署具体身份的事件

签署了针对特定身份(键)发生的事件一样简单。例如,要打印所有发生的事件,对整数键

cache.addMapListener(new EventPrinter(), new Integer(5), false);

因此,例21-9中的代码只会触发一个事件整数键5时插入或更新:

21-9触发一个事件,当一个特定的整数键插入或更新

for (int i = 0; i < 10; ++i)

    {

    Integer key   = new Integer(i);

    String  value = "test value for key " + i;

    cache.put(key, value);

}

21.8过滤事件

听特定的键类似,它是可以收听特定的事件。例21-10一个一个过滤器,允许监听器只接收删除事件侦听器添加到缓存中。

示例21-10删除的事件添加一个监听器,过滤器

/ /过滤器必须使用分区缓存
/ /序列化,外部化或ExternalizableLite

public class DeletedFilter

        implements Filter, Serializable

    {

    public boolean evaluate(Object o)

        {

        MapEvent evt = (MapEvent) o;

        return evt.getId() == MapEvent.ENTRY_DELETED;

        }

    }

cache.addMapListener(new EventPrinter(), new DeletedFilter(), false);

注意事项:
事件过滤与过滤缓存的数据:

当建立一个过滤器,用于查询,传递给evaluate方法的筛选对象是一个值从缓存中,或者 如果过滤器实现的EntryFilter接口 从缓存整个Map.Entry的。当建立一个过滤器过滤事件为MapListener ,对象传递给evaluate方法的过滤器是类型MapEvent 

请参见“Advanced: Listening to Querie,如何使用查询过滤器听缓存事件的更多信息。

如果您进行以下的调用序列:

cache.put("hello", "world");

cache.put("hello", "again");

cache.remove("hello");

其结果将是:

CacheEvent{LocalCache deleted: key=hello, value=again}

欲了解更多信息,请参阅:"Advanced: Listening to Queries".

21.9 "Lite" 事件

默认情况下, Coherence提供旧的和新值作为事件的一部分。请看下面的例子:

示例21-11插入,更新和删除值从Cache

MapListener listener = new MultiplexingMapListener()

    {

    public void onMapEvent(MapEvent evt)

        {

        out("event has occurred: " + evt);

        out("(the wire-size of the event would have been "

            + ExternalizableHelper.toBinary(evt).length()

            + " bytes.)");

        }

    };

cache.addMapListener(listener);

// insert a 1KB value

cache.put("test", new byte[1024]);

// update with a 2KB value

cache.put("test", new byte[2048]);

// remove the 2KB value

cache.remove("test");

从运行试验,图示的实施例21-12中,示出的输出的第一个事件进行的1KB插入值,第二个事件进行更换的的1KB值和新的2KB值,第三个事件进行删除2KB值。

21-12样品输出
event has occurred: CacheEvent{LocalCache added: key=test, value=[B@a470b8}

(the wire-size of the event would have been 1283 bytes.)

event has occurred: CacheEvent{LocalCache updated: key=test, old value=[B@a470b8, new value=[B@1c6f579}

(the wire-size of the event would have been 3340 bytes.)

event has occurred: CacheEvent{LocalCache deleted: key=test, value=[B@1c6f579}

(the wire-size of the event would have been 2307 bytes.)

当一个应用程序不需要的旧的和新的价值包含在事件,它可以表明只要求精简版的事件。当添加一个监听器,你可以要求建兴事件通过使用addMapListener方法,需要一个额外的布尔弗莱特参数。在示例21-11 ,唯一的变化将是:

cache.addMapListener(listener, (Filter) null, true);

注意事项:
显然,一个建兴事件的旧值和新值可以为null。然而,即使你要求建兴事件,旧的和新的价值可能被包括在内,如果没有额外成本产生并将事件发送。换句话说,要求一个MapListener接收建兴事件的仅仅是一个提示系统MapListener不知道旧的和新的价值观的事件。

21.10高级:监听查询

所有Coherence缓存支持任何标准查询。当一个应用程序的高速缓存中的数据的查询,其结果是一个时间点的快照,无论是作为一组身份的keySet或身份/值对的一组(的entrySet )的。确定结果集的内容的机制被称为过滤,它允许应用程序开发人员构建任意复杂的查询,使用了一组丰富的现成的过滤器(例如,等于,小于一样,之间,等等) ,或提供他们自己的定制的过滤器(例如, XPath)的。

用于查询的高速缓存的相同的过滤器,可以收听来自一个高速缓存的事件。例如,在一个交易系统是可能查询所有开放Order对象为特定的交易者:

21-13从高速缓存中的事件监听

NamedCache mapTrades = ...

Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid),

                              new EqualsFilter("getStatus", Status.OPEN));

Set setOpenTrades = mapTrades.entrySet(filter);

新行业被打开由该交易商,贸易商,关闭或重新分配,或从另一贸易商要接收通知,应用程序可以使用相同的过滤器:

21-14对象上的事件监听

// receive events for all trade IDs that this trader is interested in

mapTrades.addMapListener(listener, new MapEventFilter(filter), true);

MapEventFilter转换查询过滤器事件过滤器。

MapEventFilter有一些非常强大的选项,允许一个应用程序的监听器只接收事件,在于它是专门感兴趣,更重要的是可扩展性和性能,只有所需的事件必须要在网络上沟通,和他们只传达到服务器和客户表示有兴趣在这些特定的事件。例21-15给出了这些场景。

21-15使用MapEventFilter筛选各种活动

// receive all events for all trades that this trader is interested in

nMask = MapEventFilter.E_ALL;

mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events for all this trader's trades that are closed or

// re-assigned to a different trader

nMask = MapEventFilter.E_UPDATED_LEFT | MapEventFilter.E_DELETED;

mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events for all trades as they are assigned to this trader

nMask = MapEventFilter.E_INSERTED | MapEventFilter.E_UPDATED_ENTERED;

mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

// receive events only fornew trades assigned to this trader

nMask = MapEventFilter.E_INSERTED;

mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);

有关支持的各种选项的更多信息,请参阅的API文档MapEventFilter 

21.10.1 过滤事件与过滤缓存的数据

当建立一个用于查询筛选,传递给evaluate方法的筛选对象是从缓存中的值,或如果过滤器实现从缓存的EntryFilter界面,整个的Map.Entry 。当建立一个筛选过滤事件为MapListener ,传递给evaluate方法的筛选对象是类型MapEvent 

MapEventFilter转换过滤器是用来做查询到一个过滤器,用于过滤为MapListener事件。换句话说,构造一个过滤器,查询缓存MapEventFilter ,导致MapEventFilter的是一个过滤器,将它们转换成查询过滤的对象希望评估MapEvent对象。

21.11高级:合成事件

活动的变化通常反映被缓存。例如,一台服务器上修改一个条目在高速缓存中,而另一台服务器上添加几个项目的高速缓存,而第三个服务器中删除一个项目从相同的缓存,所有集群中的每个服务器上,而50线程访问数据从相同的高速缓存!所有的修改动作产生群集内的任何服务器可以选择接收的事件。我们是指客户端的行动,事件被派往客户的这些行动,即使在这种情况下, 客户实际上是服务器。这是一个自然的概念,一个真正的对等体系结构,如一个连贯性集群:每一个同行是一个客户端和一个服务器,从同行的两个消费服务和提供服务,其同行。在一个典型的Java企业应用,一个同行是一个应用服务器实例作为一个容器的应用程序,和客户端的那部分应用程序直接访问和修改缓存和听取事件缓存。

有些事件来自内缓存本身。例子有很多,但最常见的情况是:

    当项目自动从缓存过期;

    当项目被逐出最大的高速缓存大小的缓存,因为已达到;

    当条目被透明地添加到高速缓存为读通操作的结果;

    当在高速缓存中的条目,透明地更新作为一个预读刷新预操作的结果。

每一个都代表修改,但修改代表从一个高速缓存内的自然(通常是自动的)操作。这些事件被称为合成事件。

在必要时,应用程序可以简单地通过区分客户端的诱导和合成的事件,要求事件,如果它是合成的。此信息进行的一个亚类的MapEvent ,称为CacheEvent的。使用在以前的EventPrinter例如,它有可能只打印合成事件:

示例21-16确定合成事件

public static class EventPrinter

        extends MultiplexingMapListener

    {

    public void onMapEvent(MapEvent evt)

        {

        if (evt instanceof CacheEvent && ((CacheEvent) evt).isSynthetic())

            {

            out(evt);

            )

        }

    }

有关此功能的更多信息,请参阅的API文档CacheEvent 

21.12 Advanced: Backing Map 事件

虽然它有可能听连贯性高速缓存,其中每个提出分布式,分区,复制,近缓存,连续查询, read-through/write-through和 write-behind 背后的本地视图事件,它是窗帘后面偷看,也可以这么说。

对于一些高级的用例中,它可能是必要的“listen to”“map”后面的“service” 。在分布式环境中管理数据的复制,分区和其他方法都是配送服务。服务仍然有东西在其中实际管理数据,那东西被称为“backing map” 。

可以配置支持映射。如果一个特定的缓存中的所有数据应以对象的形式保存在堆中,然后用一个无限的,非届满LocalCache ( ,或SafeHashMap如果不需要统计) 。如果只有一小的项目数量应该保持在内存中,使用一个LocalCache 。如果要需求​​从数据库读取数据,然后使用一个ReadWriteBackingMap知道如何来读取和写入到应用程序的DAO实现,并反过来给ReadWriteBackingMapSafeHashMap或一个LocalCache来存储它的一个后盾图数据英寸

一些支持映射观察。 ,来自这些支持地图的事件通常不会有直接利害关系的应用程序。相反,连贯性,将其转换成所必须采取的行动(连贯性) ,以保持数据同步,并妥善备份,这也意味着他们在适当的时候到应用听众的要求,提供整个集群的聚集事件。例如,如果分区的高速缓存有LocalCache为依托地图,本地缓存过期的条目,该事件导致相干到期该项目的所有备份副本。此外,任何听众已注册分区的高速缓存,如果该事件与事件过滤器匹配,那么该事件传递给那些听众的服务器上注册的那些听众。

在一些先进的使用情况下,应用程序必须处理事件数据所在的服务器正在维护,它必须这样做,实际上是管理的数据结构(背​​地图) 。在这些情况下,如果底层映射是一个可以观察地图,听者可以配置后盾地图上可以编程方式添加到支持地图。 (如果支持地图无法观测,它可以观察到包装在WrapperObservableMap 。 )

每一个backing地图事件被调度一次且仅一次。然而,事件可以产生多个后备地图从单一的放。例如,如果从安放的条目被重新分配,然后分发创建的事件(从原始节点中删除,并插入一个新的节点) 。在这种情况下,支持地图监听器被调用多次的单放。

最后,备份地图听众是始终是同步的,他们做修改操作,而同步显示器为后盾地图存放在一个线程被解雇。事件往往时间内部backingmap听众,不立即处理,但排队和处理后异步。

有关此功能的更多信息,请参阅的API文档BackingMapManager 

21.12.1从分布式缓存生产可读备份MapListener的活动

返回后盾MapListener事件可读的Java格式复制缓存。然而,支持分布式缓存返回的MapListener事件是内部一致性格式。连贯性孵化器公共项目提供了一个AbstractMultiplexingBackingMapListener类使您获得可读支持MapListener事件,从分布式缓存。见http://coherence.oracle.com/display/INCUBATOR/Coherence+Common下载连贯性的公共库。

要可读的支持MapListener事件从分布式缓存:

实施的AbstractMultiplexingBackingMapListener类。

    注册的<listener>部分后备缓存的配置文件中的地图计划的实施。

    启动缓存服务器应用程序文件和客户端的的cacheconfig Java属性文件:

    Dtangosol.coherence.cacheconfig =“缓存-config.xml

AbstractMultiplexingBackingMapListener类提供onBackingMapEvent的方法,你可以覆盖到指定如何你想返回的事件。

下面的列表的VerboseBackingMapListener类是的示例实现AbstractMultiplexingBackingMapListener 。的onBackingMapEvent方法一直缠身,将结果发送到标准输出。

例如21-17 AbstractMultiplexingBackingMapListener实现

import com.tangosol.net.BackingMapManagerContext;

import com.tangosol.util.MapEvent;

public class VerboseBackingMapListener extends AbstractMultiplexingBackingMapListener {

        public VerboseBackingMapListener(BackingMapManagerContext context) {

                super(context);

        }

        

        @Override

        protected void onBackingMapEvent(MapEvent mapEvent, Cause cause) {

                

                System.out.printf("Thread: %s Cause: %s Event: %s\n", Thread.currentThread().getName(), cause, mapEvent);

                

                try {

                        Thread.currentThread().sleep(5000);

                } catch (InterruptedException e) {

                        // add Auto-generated catch block

                 e.printStackTrace();

                }

                

        }

}

21-18分布式计划指定详细的备份地图监听

<distributed-scheme>

   <scheme-name>my-dist-scheme</scheme-name>

   <service-name>DistributedCache</service-name>

      <backing-map-scheme>

         <read-write-backing-map-scheme>

            <internal-cache-scheme>

               <local-scheme>

                  <high-units>0</high-units>

                  <expiry-delay>0</expiry-delay>

               </local-scheme>

            </internal-cache-scheme>

            <cachestore-scheme>

               <class-scheme>

                  <class-name>CustomCacheStore</class-name>

                  <init-params>

                     <init-param>

                        <param-type>java.lang.String</param-type>

                        <param-value>{cache-name}</param-value>

                     </init-param>

                  </init-params>

               </class-scheme>

            </cachestore-scheme>

            <listener> 

               <class-scheme>

                  <class-name>VerboseBackingMapListener</class-name>

                     <init-params>

                        <init-param>

                           <param-type>com.tangosol.net.BackingMapManagerContext

                           </param-type>

                           <param-value>{manager-context}</param-value>

                        </init-param>

                     </init-params>

                  </class-scheme>

            </listener> 

         </read-write-backing-map-scheme>                                            

      </backing-map-scheme>

   <autostart>true</autostart>

</distributed-scheme>

21.13高级:同步事件监听器

一些事件,使异步传递不中断应用听众的缓存服务生成的事件。在一些罕见的情况下,异步传送可以导致持续经营的结果相比,模糊事件的排序。为了保证缓存API操作和事件是有序的,如果本地群集系统视图是单线程,一个MapListener必须实现标记接口SynchronousListener 

连贯性本身的一个例子使用同步听众是近缓存,它可以使用本地缓存的数据(切腹” )无效的事件。

有关此功能的更多信息,请参阅的API文档MapListenerSupport.SynchronousListener