Oracle Coherence中文教程二十四:在高速缓存中的数据处理

时间:2020-12-08 07:40:36

在高速缓存中的数据处理

coherence提供了理想的基础设施建设数据网格服务和客户端和基于服务器的应用程序使用数据网格。在一个基本的层面上,相干可以在大量的服务器在网格管理一个巨大的数据量,它可以提供接近零延迟访问该数据,它支持跨数据的并行查询中的map-reduce方式;它支持数据库和EIS系统,作为该数据的记录系统的集成。此外,Coherence提供一些服务,这是建立有效的数据网格的理想选择。

本章包含以下各节:

    有针对性的执行
    并行执行
    基于查询的执行
    数据网格范围内执行
    代理针对性,并行和基于查询的执行
    数据网格聚合
    基于节点的执行
    工作管理

24.1有针对性的执行

Coherence提供执行能力对一个条目的代理人在任何地图数据网格管理的数据:

map.invoke(key, agent);

在分区的数据的情况下,代理上的网格节点拥有数据执行对执行。该网格节点上发生的排队,并发管理,执行代理,由代理的数据访问,和数据修改代理。 (同步所得到的备份数据的修改,如果有的话,需要额外的网络流量。 )对于很多处理的目的,它是更有效的移动代理程序的序列化形式(通常只有几百个字节,最多)比处理分布式并发控制,一致性和数据更新。

对于请求/响应处理,代理返回的结果:

Object oResult = map.invoke(key, agent);

换句话说,作为一个数据网格的连贯性决定位置执行基于数据拓扑结构的配置代理,移动代理,执行代理(自动处理并发控制的项目,同时执行代理) ,备份如有修改,并返回结果。

24.2并行执行

Coherence提供的map-reduce允许代理执行并行跨越所有节点在网格中的条目对收集的功能。并行执行允许平衡跨网格工作要处理大量的数据。 invokeAll是方法的使用方式如下:
map.invokeAll(collectionKeys, agent);

对于请求/响应处理,代理处理每个键返回一个结果:

Map mapResults = map.invokeAll(collectionKeys, agent);

相干决定执行代理程序的基础上的配置数据拓扑的最佳位置,移动代理,执行代理(自动处理并发控制项 ,同时执行代理) ,备份修改(如有的话) ,并返回聚结的结果。请参阅数据网格聚合的说明,对结果集进行聚集。

24.3基于查询的执行

Coherence支持查询的能力在整个数据网格。例如,在一个交易系统是可能查询所有开放Order对象为特定的交易者:

24-1查询整个数据网格

NamedCache map    = CacheFactory.getCache("trades");

Filter     filter = new AndFilter(new EqualsFilter("getTrader", traderid),

                                  new EqualsFilter("getStatus", Status.OPEN));

Set setOpenTradeIds = mapTrades.keySet(filter);

Coherence提供此功能相结合,在数据网格中的并行执行,执行能力的代理人对查询。在上一节中,执行并行发生的,而不是返回的身份或与查询匹配的条目,相干执行代理对条目:

map.invokeAll(filter, agent);

对于请求/响应处理,代理处理每个键返回一个结果:

Map mapResults = map.invokeAll(filter, agent);

换句话说,连贯性结合的并行查询,及其并行执行对数据网格中,实现基于查询的代理调用。

24.4数据网格范围内执行

 AlwaysFilter (或空)invokeAll是方法的一个实例传递导致通过代理执行对InvocableMap中的所有条目:

map.invokeAll((Filter) null, agent);

与其他类型的代理调用,请求/响应处理支持

Map mapResults = map.invokeAll((Filter) null, agent);

应用程序可以处理所有的数据分布在一个特定的地图数据网格,一个单一的代码行。

24.5针对性,并行和基于查询的执行代理

代理实现EntryProcessor的接口,通常是通过扩展AbstractProcessor类。
有几个代理与连贯性,包括:

    AbstractProcessor - 一个抽象基类,用于建设EntryProcessor

    ExtractorProcessor - 提取并返回一个值(如属性值)从一个对象存储在InvocableMap   的

    CompositeProcessor - 捆绑了收集EntryProcessor对象对同一条目顺序调用

    ConditionalProcessor - 有条件调用EntryProcessor ,如果过滤器对进入到进程的计算结 果为true

    PropertyProcessor - 一个抽象基类的实现依赖于一个PropertyManipulator EntryProcessor

    NumberIncrementor - 原始积分类型的任何财产,和字节,短,整型,长,浮动,双,  BigInteger的前或后递增的BigDecimal

    NumberMultiplier - 原始积分类型的任何财产,和字节,短,整型,长,浮动,双,的   BigInteger , BigDecimal的,相乘,并返回以前的或新的价值

EntryProcessor接口(皆为InvocableMap接口)只包含两个方法:

示例24-2方法在EntryProcessor接口

/**

* An invocable agent that operates against the Entry objects within a

* Map.

*/

public interface EntryProcessor

        extends Serializable

    {

    /**

    * Process a Map Entry.

    *

    * @param entry  the Entry to process

    *

    * @return the result of the processing, if any

    */

    public Object process(Entry entry);

    /**

    * Process a Set of InvocableMap Entry objects. This method is

    * semantically equivalent to:

    * <pre>

    *   Map mapResults = new ListMap();

    *   for (Iterator iter = setEntries.iterator(); iter.hasNext(); )

    *       {

    *       Entry entry = (Entry) iter.next();

    *       mapResults.put(entry.getKey(), process(entry));

    *       }

    *   return mapResults;

    * </pre>

    *

    * @param setEntries  a read-only Set of InvocableMap Entry objects to

    *                    process

    *

    * @return a Map containing the results of the processing, up to one

    *         entry for each InvocableMap Entry that was processed, keyed

    *         by the keys of the Map that were processed, with a

    *         corresponding value being the result of the processing for

    *         each key

    */

    public Map processAll(Set setEntries);

    }


AbstractProcessor实现processAll的方法,如在前面的例子所述。 )

InvocableMap.Entry被传递到EntryProcessor的是一个扩展的的Map.Entry接口,允许一个EntryProcessor实现获得必要的信息条目,并作出必要的修改,尽可能最有效的方式:

24-3 InvocableMap.Entry API

/**

* An InvocableMap Entry contains additional information and exposes

* additional operations that the basic Map Entry does not. It allows

* non-existent entries to be represented, thus allowing their optional

* creation. It allows existent entries to be removed from the Map. It

* supports several optimizations that can ultimately be mapped

* through to indexes and other data structures of the underlying Map.

*/

public interface Entry

        extends Map.Entry

    {

    // ----- Map Entry interface ------------------------------------

    /**

    * Return the key corresponding to this entry. The resultant key does

    * not necessarily exist within the containing Map, which is to say

    * that <tt>InvocableMap.this.containsKey(getKey)</tt> could return

    * false. To test for the presence of this key within the Map, use

    * {@link #isPresent}, and to create the entry for the key, use

    * {@link #setValue}.

     *

    * @return the key corresponding to this entry; may be null if the

    *         underlying Map supports null keys

    */

    public Object getKey();

    /**

    * Return the value corresponding to this entry. If the entry does

    * not exist, then the value is null. To differentiate between

    * a null value and a non-existent entry, use {@link #isPresent}.

    * <p/>

    * <b>Note:</b> any modifications to the value retrieved using this

    * method are not guaranteed to persist unless followed by a

    * {@link #setValue} or {@link #update} call.

    *

    * @return the value corresponding to this entry; may be null if the

    *         value is null or if the Entry does not exist in the Map

    */

    public Object getValue();

    /**

    * Store the value corresponding to this entry. If the entry does

    * not exist, then the entry is created by invoking this method,

    * even with a null value (assuming the Map supports null values).

    *

    * @param oValue  the new value for this Entry

    *

    * @return the previous value of this Entry, or null if the Entry did

    *         not exist

    */

    public Object setValue(Object oValue);

    // ----- InvocableMap Entry interface ---------------------------

    /**

    * Store the value corresponding to this entry. If the entry does

    * not exist, then the entry is created by invoking this method,

    * even with a null value (assuming the Map supports null values).

    * <p/>

    * Unlike the other form of {@link #setValue(Object) setValue}, this

    * form does not return the previous value, and consequently may be

    * significantly less expensive (in terms of cost of execution) for

    * certain Map implementations.

    *

    * @param oValue      the new value for this Entry

    * @param fSynthetic  pass true only if the insertion into or

    *                    modification of the Map should be treated as a

    *                    synthetic event

    */

    public void setValue(Object oValue, boolean fSynthetic);

    /**

    * Extract a value out of the Entry's value. Calling this method is

    * semantically equivalent to

    * <tt>extractor.extract(entry.getValue())</tt>, but this method may

    * be significantly less expensive because the resultant value may be

    * obtained from a forward index, for example.

    *

    * @param extractor  a ValueExtractor to apply to the Entry's value

    *

    * @return the extracted value

    */

    public Object extract(ValueExtractor extractor);

    /**

    * Update the Entry's value. Calling this method is semantically

    * equivalent to:

    * <pre>

    *   Object oTarget = entry.getValue();

    *   updater.update(oTarget, oValue);

    *   entry.setValue(oTarget, false);

    * </pre>

    * The benefit of using this method is that it may allow the Entry

    * implementation to significantly optimize the operation, such as

    * for purposes of delta updates and backup maintenance.

    *

    * @param updater  a ValueUpdater used to modify the Entry's value

    */

    public void update(ValueUpdater updater, Object oValue);

    /**

    * Determine if this Entry exists in the Map. If the Entry is not

    * present, it can be created by calling {@link #setValue} or

    * {@link #setValue}. If the Entry is present, it can be destroyed by

    * calling {@link #remove}.

    *

    * @return true iff this Entry is existent in the containing Map

    */

    public boolean isPresent();

    /**

    * Remove this Entry from the Map if it is present in the Map.

    * <p/>

    * This method supports both the operation corresponding to

    * {@link Map#remove} and synthetic operations such as

    * eviction. If the containing Map does not differentiate between

    * the two, then this method must be identical to

    * <tt>InvocableMap.this.remove(getKey())</tt>.

    *

    * @param fSynthetic  pass true only if the removal from the Map

    *                    should be treated as a synthetic event

    */

    public void remove(boolean fSynthetic);

    }

24.6数据网格聚合

在除了标量剂,的InvocableMap接口还支持对条目的一个子集,以获得一个单一的结果,执行操作的条目聚合。条目聚合发生在两端并联工作时,用大量的数据网格提供的map-reduce支持。

24-4汇聚在InvocableMap API

/**

* Perform an aggregating operation against the entries specified by the

* passed keys.

*

* @param collKeys  the Collection of keys that specify the entries within

*                  this Map to aggregate across

* @param agent     the EntryAggregator that is used to aggregate across

*                  the specified entries of this Map

*

* @return the result of the aggregation

*/

public Object aggregate(Collection collKeys, EntryAggregator agent);

/**

* Perform an aggregating operation against the set of entries that are

* selected by the given Filter.

* <p/>

* <b>Note:</b> calling this method on partitioned caches requires a

* Coherence Enterprise Edition (or higher) license.

*

* @param filter  the Filter that is used to select entries within this

*                Map to aggregate across

* @param agent   the EntryAggregator that is used to aggregate across

*                the selected entries of this Map

*

* @return the result of the aggregation

*/

public Object aggregate(Filter filter, EntryAggregator agent);

一个简单的EntryAggregator处理一个组的InvocableMap.Entry对象,实现了结果:

24-5 EntryAggregator API

/**

* An EntryAggregator represents processing that can be directed to occur

* against some subset of the entries in an InvocableMap, resulting in a

* aggregated result. Common examples of aggregation include functions

* such as min(), max() and avg(). However, the concept of aggregation

* applies to any process that must evaluate a group of entries to

* come up with a single answer.

*/

public interface EntryAggregator

        extends Serializable

    {

    /**

    * Process a set of InvocableMap Entry objects to produce an

    * aggregated result.

    *

    * @param setEntries  a Set of read-only InvocableMap Entry objects to

    *                    aggregate

    *

    * @return the aggregated result from processing the entries

    */

    public Object aggregate(Set setEntries);

    }

在数据网格的高效的执行力,聚合过程的设计必须能够以并行方式运行。

例如的24-6 ParallelAwareAggregator API并行运行聚合

/**

* A ParallelAwareAggregator is an advanced extension to EntryAggregator

* that is explicitly capable of being run in parallel, for example in a

* distributed environment.

*/

public interface ParallelAwareAggregator

        extends EntryAggregator

    {

    /**

    * Get an aggregator that can take the place of this aggregator in

    * situations in which the InvocableMap can aggregate in parallel.

    *

    * @return the aggregator that is run in parallel

    */

    public EntryAggregator getParallelAggregator();

    /**

    * Aggregate the results of the parallel aggregations.

    *

    * @return the aggregation of the parallel aggregation results

    */

    public Object aggregateResults(Collection collResults);

    }

Coherence包含的一些诶聚合函数,还有:

    Count

    DistinctValues

    DoubleAverage

    DoubleMax

    DoubleMin

    DoubleSum

    LongMax

    LongMin

LongSum

注意事项:
    所有配有连贯性的聚合是平行的感知。

    见com.tangosol.util.aggregator包的列表连贯性聚合。要实现自己的聚合,请参阅AbstractAggregator的抽象基类。

24.7基于节点的执行

Coherence提供调用服务,它允许执行单通代理(称为Invocable的对象)网格内的任何地方。代理可以执行在某个节点的网格,在网格中任何一组特定的节点上并行,或平行的网格中的所有节点上。

调用服务配置缓存配置文件使用<invocation-scheme>的元素。使用的服务的名称,应用程序可以很容易地获得该服务的引用:

InvocationService service = (InvocationService)CacheFactory.getService("MyService");

代理仅仅是可运行的类是应用程序的一部分。一个简单的代理的一个例子是一个请求从JVMGC 

24-7简单的代理请求垃圾收集

/**

* Agent that issues a garbage collection.

*/

public class GCAgent

        extends AbstractInvocable

    {

    public void run()

        {

        System.gc();

        }

    }

执行该代理在整个集群中,它需要一行代码:

service.execute(new GCAgent(), null, null);

下面是一个例子的一个代理,它支持一个网格范围内的请求/响应模型:

24-8代理以支持网格范围内的请求和响应模型

/**

* 代理确定网格节点有多少空闲内存。

*/

public class FreeMemAgent

        extends AbstractInvocable

    {

    public void run()

        {

        Runtime runtime = Runtime.getRuntime();

        int cbFree  = runtime.freeMemory();

        int cbTotal = runtime.totalMemory();

        setResult(new int[] {cbFree, cbTotal});

        }

    }

执行该代理在整个电网和检索所有结果,但它仍然只需要一行代码:

Map map = service.query(new FreeMemAgent(), null);

虽然它很容易做一个网格范围内的请求/响应,它需要更多的代码打印结果:

24-9从电网范围的请求或响应打印结果

Iterator iter = map.entrySet().iterator();

while (iter.hasNext())

    {

    Map.Entry entry  = (Map.Entry) iter.next();

    Member    member = (Member) entry.getKey();

    int[]     anInfo = (int[]) entry.getValue();

    if (anInfo != null) // nullif member died

        System.out.println("Member " + member + " has "

            + anInfo[0] + " bytes free out of "

            + anInfo[1] + " bytes total");

    }

代理操作可以是有状态的,这意味着它们的调用状态序列化和传输网格节点上运行代理。

24-10状态代理业务

/**

* Agent that carries some state with it.

*/

public class StatefulAgent

        extends AbstractInvocable

    {

    public StatefulAgent(String sKey)

        {

        m_sKey = sKey;

        }

    public void run()

        {

        // the agent has the key that it was constructed with

        String sKey = m_sKey;

        // ...

        }

    private String m_sKey;

    }

24.8工作管理

Coherence提供了支持网格的实施CommonJ工作管理器。使用工作管理器,应用程序可以提交收集的工作,必须执行。工作管理器工作在这样一种方式,它是并行执行的,通常在整个网格分配。换句话说,如果有10个工作项目提交和10在网格中的服务器,每个服务器有可能处理一个工作项目。此外,分布在网格工作项目可以定制,从而使某些服务器(例如,作为一个特定的主机服务网关)是第一选择,为了效率和本地运行某些工作项目的数据。

然后,应用程序可以等待要完成的工作,它可以等待多久可以提供超时。用于此目的的API是相当强大,它允许应用程序等待完成第一个工作项,或指定的一组完成的工作项目。通过这个API相结合的方法,它是可以做这样的事情:这里有10个项目执行;7个不重要的项目,等待不超过5秒,这3个重要的项目中,等待不超过30

24-11使用工作管理

Work[] aWork = ...

Collection collBigItems = new ArrayList();

Collection collAllItems = new ArrayList();

for (int i = 0, c = aWork.length; i < c; ++i)

    {

    WorkItem item = manager.schedule(aWork[i]);

    if (i < 3)

        {

        // the first three work items are the important ones

        collBigItems.add(item);

        }

    collAllItems.add(item);

    }

Collection collDone = manager.waitForAll(collAllItems, 5000L);

if (!collDone.containsAll(collBigItems))

    {

    // wait the remainder of 30 seconds for the important work to finish

    manager.waitForAll(collBigItems, 25000L);

    }