使用C#和MemoryCache组件实现轮流调用APIKey以提高并发能力

时间:2024-04-15 08:38:08

文章信息

标题:使用C#和MemoryCache组件实现轮流调用API Key以提高并发能力的技巧

摘要:本文介绍了如何利用C#语言中的MemoryCache组件,结合并发编程技巧,实现轮流调用多个API Key以提高系统的并发能力。通过示例代码和详细说明,读者将了解如何有效地管理API Key的调用次数限制,并优化系统性能。

Title: Techniques for Using C# and MemoryCache Component to Rotate API Keys for Improved Concurrency

Abstract: This article explores how to utilize the MemoryCache component in C#, combined with concurrency programming techniques, to rotate through multiple API keys for enhancing system concurrency. With detailed explanations and example codes, readers will learn how to effectively manage API key usage limits and optimize system performance.

前言

使用场景是需要使用一个接口,这个接口有限制每个 APIKey 的请求量在 5次/s

一开始是最苯的做法,每次调用之后等个 200 毫秒,这样就不会超出这个限制

但是这样效率也太低了,刚好发现我们拥有不少 APIKey ,那么直接改成并发的吧,安排!

本文做一个简单的记录

思路

将每个 APIKey 的调用情况保存在内存里

C# 提供的 MemoryCache 组件是个 key-value 结构,并且可以设置每个值的过期时间

我把 APIKey 作为 key 存入,value 则是已使用的次数,并设置过期时间为 1 秒

这样只需要判断某个 APIKey 的使用次数是否小于 5 ,小于5就拿来用,大于5就读取配置拿新一个的 APIKey 。

使用 fluent-console

fluent-console 是我之前开发的 C# Console 应用模板,提供「现代化的控制台应用的开发体验」脚手架,能像 Web 应用那样很优雅地整合各种组件,包括依赖注入、配置、日志等功能。

项目地址: https://github.com/Deali-Axy/fluent-dotnet-console

本文需要用到 MemoryCache 等组件,用这个模板会比较方便,首先使用这个模板创建一个项目

# 安装模板
dotnet new install FluentConsole.Templates

创建项目

dotnet new flu-cli -n MyProject

准备配置文件

在配置文件里准备好 APIKeys

编辑项目的 appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Debug"
    }
  },
  "AppSettings": {
    "Name": "The name of this app is fluent console demo",
    "Boolean": true,
    "DemoList": [
      "item1",
      "item2",
      "item3"
    ],
    "ApiKeys": [
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx"
    ]
  }
}

fluent-console 模板已经处理好了配置相关的逻辑,后续直接使用即可

配置服务

编辑 Program.cs 文件

添加需要的服务

services.AddMemoryCache();
services.AddScoped<ApiService>();

等下来 ApiService 里写代码

ApiService

Services 文件夹下创建 ApiService.cs 文件

先把依赖注入进来

using Flurl;
using Flurl.Http;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace MyProject.Services;

public class ApiService {
  private readonly ILogger<ApiService> _logger;
  private readonly AppSettings _settings;
  private readonly IMemoryCache _cache;

  public ApiService(IOptions<AppSettings> appOptions, IMemoryCache cache, ILogger<ApiService> logger) {
    _cache = cache;
    _logger = logger;
    _settings = appOptions.Value;
  }
}

封装 keys 管理

这里写了一个方法来获取一个可用的 APIKey

因为需要考虑并发运行,对 _cache 对象加锁

思路很简单上面已经介绍了,直接写成代码,同时写了很清楚的注释

private string? GetNextApiKey() {
  lock (_cache) {
    foreach (var key in _settings.ApiKeys) {
      if (_cache.TryGetValue(key, out int count)) {
        // 如果该 API Key 在缓存中存在,则检查其调用次数
        // 如果达到调用次数上限,则不再使用该 API Key,继续下一个 API Key
        if (count >= 5) {
          continue;
        }

        // 如果调用次数未达到上限,则增加调用次数并返回该 API Key
        _cache.Set(key, count + 1, DateTimeOffset.Now.AddSeconds(1));
        return key;
      }

      // 如果 API Key 不在缓存中,则将其添加到缓存中并返回
      _cache.Set(key, 1, DateTimeOffset.Now.AddSeconds(1));
      return key;
    }
  }

  return null; // 所有 API Key 都已被使用
}

修改属性

因为一开始是单线程版本,我直接用 ApiKey 来读取固定的 APIKey 配置。

现在直接把原本单个 APIKey 的属性改成调用 GetNextApiKey 方法

private string? ApiKey => GetNextApiKey();

请求接口的方法

改动不大,只需要添加一个判断就行

上面的 GetNextApiKey 在没有获取到可用 APIKey 的时候会返回 null

判断一下是否为空就行,没有 APIKey 就等个 1 秒。

public async Task<ApiResponse> RequestData(string somedata) {
  var key = ApiKey;
  
  // 所有API Key 都被用完
  if (key == null) {
    await Task.Delay(1000);
    return await RequestData(somedata);
  }

  var url = "https://api.dealiaxy.com"
    .AppendPathSegment("one")
    .AppendPathSegment("service")
    .AppendPathSegment("v1")
    .SetQueryParams(new {
      key, somedata,
    });

  _logger.LogDebug("请求接口: {url}", url);
  var resp = await url.GetJsonAsync<ApiResponse>();

  return resp;
}

这里我使用了 Flurl 这个库来实现 URL 构建 + 网络请求 ,真滴好用!

并行调用接口

这里我写了一个闭包,其实也可以用 lambda 。

构建一个任务列表,然后使用 await Task.WhenAll 来等待全部任务执行完。

最后保存结果到 json 文件里

public async Task RequestParallel() {
  // 这里准备一些数据,几万个就行
  var data = new string[];

  var results = new List<ResultData>();

  // 写一个闭包来调用接口
  async Task MakeApiCall(int index) {
    var item = data[index];

    var resp = await RequestData(item);
    _logger.LogInformation("调用接口,数据: {data}, status: {status}, message: {message}", item, resp.status, resp.message);
    results.Add(resp.data);
  }

  var tasks = new List<Task>();
  for (var index = 0; index < data.Count; index++) {
    var i = index; // 由于闭包,需要在循环中创建一个新变量以避免问题
    tasks.Add(Task.Run(() => MakeApiCall(i)));
  }

  _logger.LogInformation("共有 {count} 个数据,开始请求", data.Count);
  await Task.WhenAll(tasks);

  _logger.LogInformation("搞定,写入文件");
  await File.WriteAllTextAsync("results.json", JsonSerializer.Serialize(results));
}

搞定!

显示进度

这时候来了个新问题

这么多数据,就算是并行执行,也需要一段时间

这时候显示进度显示就成了一个迫切需求

C# 内置了一个 IProgress ,但是只能设置个 total 之后直接更新当前进度,虽然 MakeApiCall 方法有个 index 表示任务的序号,但并发执行的时候是乱序的,显然不能用这个 index 来更新进度。

这时候只能再搞个 int progress ,每个任务就 +1

真麻烦,我直接上 ShellProgressBar 组件,之前用 C# 写爬虫的时候用过,详见这篇文章: C#爬虫开发小结

这个组件有个 Tick 模式就可以实现这个功能。

上代码吧,每个任务里执行一下 bar.Tick 就行了,很方便????

public async Task RequestParallel() {
  // 这里准备一些数据,几万个就行
  var data = new string[];

  var results = new List<ResultData>();

  var bar = new ProgressBar(data.Count, "正在执行");

  // 写一个闭包来调用接口
  async Task MakeApiCall(int index) {
    var item = data[index];

    var resp = await RequestData(item);
    _logger.LogInformation("调用接口,数据: {data}, status: {status}, message: {message}", item, resp.status, resp.message);
    results.Add(resp.data);

    // 更新进度
    bar.Tick();
  }

  var tasks = new List<Task>();
  for (var index = 0; index < data.Count; index++) {
    var i = index; // 由于闭包,需要在循环中创建一个新变量以避免问题
    tasks.Add(Task.Run(() => MakeApiCall(i)));
  }

  _logger.LogInformation("共有 {count} 个数据,开始请求", data.Count);
  await Task.WhenAll(tasks);

  _logger.LogInformation("搞定,写入文件");
  await File.WriteAllTextAsync("results.json", JsonSerializer.Serialize(results));
}

好好好,这下舒服了。