Adhesive框架系列文章--分布式组件客户端模块使用

时间:2022-07-13 18:08:58

Memcached是一个很常见的分布式组件,现在有很多.NET下开源的Memcached组件的客户端实现,比如EnyimMemcachedClient。在Adhesive框架中实现了一个分布式组件客户端,同时也实现了第一个具体的客户端,也就是Memcached。与其它实现不同的是,这里我们提供了Memcached二进制协议的完整实现,并且也实现了一些特色功能,进一步封装了Memcached的一些基础API。

 

要使用Adhesive.DistributedComponentClient,首先免不了进行配置。和框架的其它模块一样,这里我们也使用了配置服务:

Adhesive框架系列文章--分布式组件客户端模块使用

进一步查看:

Adhesive框架系列文章--分布式组件客户端模块使用

在这里我们定义了一个TestMemcachedCluster:

Adhesive框架系列文章--分布式组件客户端模块使用

对于每一个集群,需要配置其名字以及尝试恢复节点的时间间隔。也就是在节点连接不上的时候,多久尝试进行一次节点的恢复。在Adhesive的实现中,一个集群下应该有多个节点,节点的分配根据Key进行一致性哈希,如果某个节点不可用,进行节点的重新分配,并不会影响程序的使用,当然由于重分配,必定会有1/N的数据丢失。在节点恢复后,又会进行重新分配。下面看一下节点的配置:

Adhesive框架系列文章--分布式组件客户端模块使用

在这里定义了四个节点,随便点进去一个看看:

Adhesive框架系列文章--分布式组件客户端模块使用

这里定义了:

1、节点名字

2、节点地址

3、节点的权重:所谓权重就是在进行Key到Node分配时候的权重,权重越高分配的Key越高,在使用的时候负载也就越重。对于Memcached来说,可以把内存少的节点权重设置低一些。

4、最大的连接数:连接池中允许的最大连接数

5、最小的连接数:连接池中保留的最小连接数

6、最大闲置时间:闲置时间过长的连接会被回收

7、最大忙碌时间:忙碌时间过长的连接会被强制关闭

8、连接超时时间

9、发送数据超时时间

10、接受数据超时时间

11、连接池维护时间间隔:连接池在维护的时候会回收闲置过长的连接,会关闭忙碌过长的连接,会补充连接到最小连接数。

在某些情况下,您可能希望独立使用这个组件,也不是依赖配置服务,那么可以如下操作:

找到DistributedServerConfiguration.cs,把private static DistributedServerConfigurationEntity GetConfig()中的:

var defaultConfig = GetDefaultConfig();
var config = configService.GetConfigItemValue(true, "DistributedServerConfiguration", defaultConfig);

替换为:

var defaultConfig = GetDefaultConfig(); 
var config = LocalConfigService.GetConfig(defaultConfig);

也就是使用本地配置服务来替代通用配置服务。对于本地配置服务会在程序的Config目录下生成.config的配置文件,这样就可以不依赖配置服务直接启动程序,比如:

 

<?xml version="1.0" encoding="utf-8"?>
<DistributedServerConfigurationEntity xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
  <ClientClusterConfigurations>
    <item>
      <key>
        <string>TestMemcachedCluster</string>
      </key>
      <value>
        <ClientClusterConfiguration>
          <Name>TestMemcachedCluster</Name>
          <TryRecoverNodeInterval>00:00:10</TryRecoverNodeInterval>
          <ClientNodeConfigurations>
            <item>
              <key>
                <string>MemcachedNode1</string>
              </key>
              <value>
                <ClientNodeConfiguration>
                  <Name>MemcachedNode1</Name>
                  <Address>192.168.135.222:12000</Address>
                  <Weight>Medium</Weight>
                  <MaxConnections>50</MaxConnections>
                  <MinConnections>5</MinConnections>
                  <MaxIdleTime>00:01:00</MaxIdleTime>
                  <MaxBusyTime>00:01:00</MaxBusyTime>
                  <ConnectTimeout>00:00:05</ConnectTimeout>
                  <SendTimeout>00:00:05</SendTimeout>
                  <ReceiveTimeout>00:00:05</ReceiveTimeout>
                  <MaintenanceInterval>00:00:30</MaintenanceInterval>
                </ClientNodeConfiguration>
              </value>
            </item>
            <item>
              <key>
                <string>MemcachedNode2</string>
              </key>
              <value>
                <ClientNodeConfiguration>
                  <Name>MemcachedNode2</Name>
                  <Address>192.168.135.222:12001</Address>
                  <Weight>Medium</Weight>
                  <MaxConnections>50</MaxConnections>
                  <MinConnections>5</MinConnections>
                  <MaxIdleTime>00:01:00</MaxIdleTime>
                  <MaxBusyTime>00:01:00</MaxBusyTime>
                  <ConnectTimeout>00:00:05</ConnectTimeout>
                  <SendTimeout>00:00:05</SendTimeout>
                  <ReceiveTimeout>00:00:05</ReceiveTimeout>
                  <MaintenanceInterval>00:00:30</MaintenanceInterval>
                </ClientNodeConfiguration>
              </value>
            </item>
            <item>
              <key>
                <string>MemcachedNode3</string>
              </key>
              <value>
                <ClientNodeConfiguration>
                  <Name>MemcachedNode3</Name>
                  <Address>192.168.135.221:12000</Address>
                  <Weight>Medium</Weight>
                  <MaxConnections>50</MaxConnections>
                  <MinConnections>5</MinConnections>
                  <MaxIdleTime>00:01:00</MaxIdleTime>
                  <MaxBusyTime>00:01:00</MaxBusyTime>
                  <ConnectTimeout>00:00:05</ConnectTimeout>
                  <SendTimeout>00:00:05</SendTimeout>
                  <ReceiveTimeout>00:00:05</ReceiveTimeout>
                  <MaintenanceInterval>00:00:30</MaintenanceInterval>
                </ClientNodeConfiguration>
              </value>
            </item>
            <item>
              <key>
                <string>MemcachedNode4</string>
              </key>
              <value>
                <ClientNodeConfiguration>
                  <Name>MemcachedNode4</Name>
                  <Address>192.168.135.221:12001</Address>
                  <Weight>Medium</Weight>
                  <MaxConnections>50</MaxConnections>
                  <MinConnections>5</MinConnections>
                  <MaxIdleTime>00:01:00</MaxIdleTime>
                  <MaxBusyTime>00:01:00</MaxBusyTime>
                  <ConnectTimeout>00:00:05</ConnectTimeout>
                  <SendTimeout>00:00:05</SendTimeout>
                  <ReceiveTimeout>00:00:05</ReceiveTimeout>
                  <MaintenanceInterval>00:00:30</MaintenanceInterval>
                </ClientNodeConfiguration>
              </value>
            </item>
          </ClientNodeConfigurations>
        </ClientClusterConfiguration>
      </value>
    </item>
  </ClientClusterConfigurations>
</DistributedServerConfigurationEntity>

在介绍了配置之后,就来介绍如何使用客户端,首先可以获得一个集群:

var client = MemcachedClient.GetClient("TestMemcachedCluster");

然后可以通过client调用各种API了:

Adhesive框架系列文章--分布式组件客户端模块使用

1、清空数据:

Dictionary<string, bool> Flush(TimeSpan expire)

Dictionary<string, bool> Flush()

这里提供了两个重载。可以清空整个集群所有数据,也可以清空一定未来一定时间内将会过期的所有数据。

2、获取服务端版本:

Dictionary<string, string> Version()

将会列出每一个节点的版本号

3、获取服务端状态:

Dictionary<string, Dictionary<string, string>> Stat(StatType statType)

Dictionary<string, Dictionary<string, string>> Stat()

在这里提供了两个重载,可以获取指定类型的状态数据,也可以获取所有状态数据。StatType 定义如下:

    public enum StatType
    {
        General = 0,
        Item = 1,
        Setting = 2,
    }

要注意,在这里同样是按照节点分组的。

4、获取数据的操作:

string Get(string key)

T Get<T>(string key)

string Get(string key, out ulong version)

T Get<T>(string key, out ulong version)

在这里提供了获取纯字符串和获取泛型类型两组方法,也提供了仅仅获取值以及获取值和版本号两组方法。在并发状态下,很可能我们在获取一个Value之后进行了一些修改,但在Set到Memcached的时候已经被其它人Set过一次了,那么我们可以通过Get时候获取到的版本号来确保Set的时候的版本和之前Get时候的版本一致:

 ulong version = 0;
                    var key = Guid.NewGuid().ToString();
                    Assert.IsTrue(client.Add(key, stringValue, expire, version));
                    Assert.AreEqual(stringValue, client.Get(key, out version));
                    Assert.IsTrue(client.Replace(key, stringValue, version));
                    Assert.IsFalse(client.Replace(key, stringValue, version));

上面的测试代码表明在进行一次操作之后版本号会修改,再使用老的版本号进行Replace操作将会失败。

5、存入数据的操作:

bool Set(string key, string value)

bool Set(string key, string value, ulong version)

bool Set(string key, string value, TimeSpan expire)

bool Set(string key, string value, TimeSpan expire, ulong version)

bool Set<T>(string key, T value)

bool Set<T>(string key, T value, ulong version)

bool Set<T>(string key, T value, TimeSpan expire)

bool Set<T>(string key, T value, TimeSpan expire, ulong version)

对于泛型和纯字符串的重载,每一组中都会有普通的Set、检查版本号的Set、设置超时时间的Set以及设置超时时间+检查版本号的Set。

对于Set操作来说,如果Key存在,那么就会替换已有的Value,如果Key不存在就会增加一组KeyValue。

如果赋值了超时时间,那么超过这个时间值就或Get不到,当然由于memcached的限制,超时时间不能超过30天。

6、替换数据的操作:

bool Replace(string key, string value)

bool Replace(string key, string value, ulong version)

bool Replace(string key, string value, TimeSpan expire)

bool Replace(string key, string value, TimeSpan expire, ulong version)

bool Replace<T>(string key, T value)

bool Replace<T>(string key, T value, ulong version)

bool Replace<T>(string key, T value, TimeSpan expire)

bool Replace<T>(string key, T value, TimeSpan expire, ulong version)

方法签名和Set基本一致,只不过要注意,如果Key不存在,那么Replace会失败,也就是返回false。

 var key = Guid.NewGuid().ToString();
                    Assert.IsFalse(client.Replace(key, stringValue));
                    Assert.IsTrue(client.Set(key, ""));
                    Assert.IsTrue(client.Replace(key, stringValue));
                    Assert.AreEqual(client.Get(key), stringValue);

7、增加数据的操作:

bool Add(string key, string value)

bool Add(string key, string value, ulong version)

bool Add(string key, string value, TimeSpan expire)

bool Add(string key, string value, TimeSpan expire, ulong version)

bool Add<T>(string key, T value)

bool Add<T>(string key, T value, ulong version)

bool Add<T>(string key, T value, TimeSpan expire)

bool Add<T>(string key, T value, TimeSpan expire, ulong version)

方法签名和Set基本一致,只不过要注意,如果Key已经存在,那么Add会失败,也就是返回false。

 var key = Guid.NewGuid().ToString();
                    Assert.IsTrue(client.Set(key, ""));
                    Assert.IsFalse(client.Add(key, stringValue, expire));
                    Assert.IsTrue(client.Delete(key));
                    Assert.IsTrue(client.Add(key, stringValue, expire));
                    Assert.AreEqual(client.Get(key), stringValue);

8、递增的操作:

ulong? Increment(string key, ulong amount)

ulong? Increment(string key, ulong amount, ulong version)

ulong? IncrementWithInit(string key, ulong seed, TimeSpan expire, ulong amount)

ulong? IncrementWithInit(string key, ulong seed, ulong amount)

ulong? IncrementWithInit(string key, ulong seed, TimeSpan expire, ulong amount, ulong version)

ulong? IncrementWithInit(string key, ulong seed, ulong amount, ulong version)

在这里主要有两组操作,第一组是增加一个已有的值,第二组是增加一个已有的值或者初始化值。先来看第一组的测试代码:

 var key = Guid.NewGuid().ToString();
                    var result = client.Increment(key, 10);
                    Assert.IsFalse(result.HasValue);
                    Assert.IsTrue(client.Set(key, i));
                    result = client.Increment(key, 10);
                    Assert.IsTrue(result.HasValue);
                    Assert.AreEqual(result.Value, (ulong)i + 10);

第一次调用的时候由于没有值会得到一个空值。在设置了值之后再调用就会得到值了。再来看看第二组的测试代码:

var key = Guid.NewGuid().ToString();
                    var result = client.IncrementWithInit(key, (ulong)i, expire, 10);
                    Assert.IsTrue(result.HasValue);
                    Assert.AreEqual(result.Value, (ulong)i);
                    result = client.IncrementWithInit(key, (ulong)i, expire, 10);
                    Assert.IsTrue(result.HasValue);
                    Assert.AreEqual(result.Value, (ulong)i + 10);

使用i进行初始化,并且递增步进设置为10,第一次由于没有值结果应该就是i,第二次则是i+10了。对于IncrementWithInit,由于附带了初始化Value的功能,所以我们可以提供一个过期时间。

9、和递增相对应的递减操作:

ulong? Decrement(string key, ulong amount)

ulong? Decrement(string key, ulong amount, ulong version)

ulong? DecrementWithInit(string key, ulong seed, TimeSpan expire, ulong amount)

ulong? DecrementWithInit(string key, ulong seed, ulong amount)

ulong? DecrementWithInit(string key, ulong seed, TimeSpan expire, ulong amount, ulong version)

ulong? DecrementWithInit(string key, ulong seed, ulong amount, ulong version)

这里就不过多介绍了,递减和递增相对应,为seed减少一个amount。

10、为字符串加值:

bool Append(string key, string value)

bool Prepend(string key, string value)

分别是把另一个字符串value加到key对应的字符串的后面和前面:

var key = Guid.NewGuid().ToString();
                    Assert.IsTrue(client.Set(key, stringValue, expire));
                    Assert.IsTrue(client.Append(key, "你好"));
                    Assert.AreEqual(stringValue + "你好", client.Get(key));
var key = Guid.NewGuid().ToString();
                    Assert.IsTrue(client.Set(key, stringValue, expire));
                    Assert.IsTrue(client.Prepend(key, "你好"));
                    Assert.AreEqual("你好" + stringValue, client.Get(key));

11、删除一个Value:

bool Delete(string key)

这个没啥好说的,删除后将会Get不到这个Key。

12、快速操作:

由于Set和Delete很常用,并且我们往往不需要知道操作的返回值也允许操作发生失败,因此在这里提供了FastSet和FastDelete的功能。对于这些API,不提供返回值,只是尝试进行Set和Delete操作。相比普通的Set和Delete操作,Fast版本可以有10%以上的性能提高,因为我们不会解析Response的消息,并且在不出错的情况下,Memcached服务端也不会返回Response消息。

void FastSet(string key, string value)

void FastSet(string key, string value, ulong version)

void FastSet(string key, string value, TimeSpan expire)

void FastSet(string key, string value, TimeSpan expire, ulong version)

void FastSet<T>(string key, T value)

void FastSet<T>(string key, T value, ulong version)

void FastSet<T>(string key, T value, TimeSpan expire)

void FastSet<T>(string key, T value, TimeSpan expire, ulong version)

void FastDelete(string key)

 

上面介绍的这些API其实都是Memcached服务端提供的API,下面会介绍一些利用这些API提供的特色功能,进一步方便使用者。在这里我们仅仅介绍使用,不会介绍其实现,大家可以思考一下,如何利用之前提到的12个API来完成下面的功能。

1、判断Key是否存在:

bool Exists(string key)

2、Get+Set:

也就是如果Key存在的话则获取,如果不存在的话,使用Set值为回调方法提供的值,最后返回值

string GetAndSet(string key, Func<string> getValue, TimeSpan expire)

string GetAndSet(string key, Func<string> getValue)

T GetAndSet<T>(string key, Func<T> getValue, TimeSpan expire)

T GetAndSet<T>(string key, Func<T> getValue)

string GetAndSetWhen(string key, Func<string> getValue, TimeSpan exipre, Func<bool> condition)

string GetAndSetWhen(string key, Func<string> getValue, Func<bool> condition)

T GetAndSetWhen<T>(string key, Func<T> getValue, Func<bool> condition)

T GetAndSetWhen<T>(string key, Func<T> getValue, TimeSpan expire, Func<bool> condition)

这里还定义了一组GetAndSetWhen方法,也就是满足condition的时候才进行值的Set。

我们来看一个应用:

   private int InternalGetDataCount(string typeFullName, string databaseName, string tableName, string columnName, DateTime begin, DateTime end, IMongoQuery filterquery)
        {
            try
            {
                var count = memcachedClient.GetAndSetWhen<int?>(GetMemcachedKey(typeFullName, databaseName, tableName, columnName,
                    begin.ToString("yyyyMMddHHmmss"), end.ToString("yyyyMMddHHmmss"), filterquery), () =>
                    {
                        var query = Query.And(Query.LT(columnName, end).GTE(begin), filterquery);
                        var server = CreateSlaveMongoServer(typeFullName);
                        var database = server.GetDatabase(databaseName);
                        var collection = database.GetCollection(tableName);
                        var c = collection.Count(query);
                        Console.WriteLine(string.Format("Db {0} {1} {2} {3} {4} {5} {6} {7}", typeFullName, databaseName, tableName, columnName,
                           begin.ToString("yyyyMMddHHmmss"), end.ToString("yyyyMMddHHmmss"), filterquery == null ? "" : filterquery.ToString(), c));
                        return c;
                    }, TimeSpan.FromDays(1), () => DateTime.Now > end);
                return count.Value;
            }
            catch (Exception ex)
            {
                LocalLoggingService.Error(ex.ToString());
                throw;
            }
        }

比如在这里,我们从Mongodb数据库获取数据量,并希望把结果缓存在Memcached中。但是我们不希望在结束时间大于当前时间的情况下缓存值,因为只有过去时间获取的统计值才不会变动,才可以缓存下来。这是就可以使用GetAndSetWhen了。

3、分布式锁

IDisposable AcquireLock(string key, TimeSpan timeOut)

Memcached由于是单点,因此很适合作为分布式锁,测试代码如下:

 var timeoutCount = 0;
                var key = Guid.NewGuid().ToString();
                Parallel.For(0, 10, i =>
                {
                    try
                    {
                        using (var locker = client.AcquireLock(key, TimeSpan.FromMilliseconds(100)))
                        {
                            Thread.Sleep(1000);
                        }
                    }
                    catch (TimeoutException)
                    {
                        Interlocked.Increment(ref timeoutCount);
                    }
                });

                Assert.AreEqual(timeoutCount, 9);

在这里,我们在获得了锁之后等待1秒,并且允许锁的超时时间为10毫秒。这样的话,在并发情况下,只应该有一次调用可以成功,其它9次都会失败,失败的时候会抛出TimeoutException异常。

4、列表操作:

List<string> GetList(string listKey, int pageSize, int pageIndex)

List<string> GetList(string listKey)

List<T> GetList<T>(string listKey, int pageSize, int pageIndex)

List<T> GetList<T>(string listKey)

 

bool SetListItem(string listKey, string itemKey, string itemValue)

bool SetListItem(string listKey, string itemKey, string itemValue, TimeSpan expire)

bool SetListItem<T>(string listKey, string itemKey, T itemValue, TimeSpan expire)

bool SetListItem<T>(string listKey, string itemKey, T itemValue)

 

bool DeleteListItem(string listKey, string itemKey)

有的时候,我们会在Memcached中存一组数据列表数据,并且又不希望在修改项的时候获取整个列表然后修改后回存回去。这个时候就需要保存多个KeyValue,并且使用一个KeyValue来存放列表中所有的Key。这组API封装了其中的实现。允许我们:

1)获取列表所有数据

2)分页方式获取部分数据

3)设置某个项

4)删除某个项

测试代码如下:

var listKey = Guid.NewGuid().ToString();
                var pageIndex = 2;
                var pageSize = 10;

                Parallel.For(0, 100, i =>
                {
                    var itemKey = Guid.NewGuid().ToString();
                    client.SetListItem(listKey, itemKey, new User(i), TimeSpan.FromMinutes(10));
                });
                var value = client.GetList<User>(listKey, pageSize, pageIndex);
                Assert.AreEqual(value.Count, pageSize);
                Assert.AreEqual(value.First().GetType(), typeof(User));

5、批量获取操作:

Dictionary<string, string> GetMultiple(IList<string> keys)

Dictionary<string, T> GetMultiple<T>(IList<string> keys)

也就是直接获取一组Key的值。由于提供的Key可能分布在多个节点上,因此在这里我们使用并行方式同时到都个节点上获取相应的值。测试代码如下:

 var keys = new System.Collections.Concurrent.ConcurrentBag<string>();
                Parallel.For(0, 1000, i =>
                {
                    var key = Guid.NewGuid().ToString();
                    Assert.IsTrue(client.Set(key, new User(i), TimeSpan.FromMinutes(10)));
                    keys.Add(key);
                });

                var value = client.GetMultiple<User>(keys.ToList());
                Assert.AreEqual(value.Count, keys.Count);
                foreach (var item in value)
                {
                    Assert.AreEqual(item.Value.GetType(), typeof(User));
                }

 

至此,我们介绍了所有API的使用,可以直接查看源代码中的Adhesive.DistributedComponentClient.Test项目以及Adhesive.DistributedComponentClient.UnitTest来了解如何使用。经过测试,Adhesive的这个Memcached客户端的实现和Enyim的实现在性能上有5%左右的提高,并且API更丰富(后面那些特色API)。