使用C#和MemoryCache组件实现轮流调用APIKey以提高并发能力

来自:博客园
时间:2024-03-09
阅读:

前言

使用场景是需要使用一个接口,这个接口有限制每个 APIKey 的请求量在 5次/s

一开始是最苯的做法,每次调用之后等个 200 毫秒,这样就不会超出这个限制

但是这样效率也太低了,刚好发现我们拥有不少 APIKey ,那么直接改成并发的吧,安排!

本文做一个简单的记录

思路

将每个 APIKey 的调用情况保存在内存里

C# 提供的 MemoryCache 组件是个 key-value 结构,并且可以设置每个值的过期时间

我把 APIKey 作为 key 存入,value 则是已使用的次数,并设置过期时间为 1 秒

这样只需要判断某个 APIKey 的使用次数是否小于 5 ,小于5就拿来用,大于5就读取配置拿新一个的 APIKey 。

使用 fluent-console

fluent-console 是我之前开发的 C# Console 应用模板,提供「现代化的控制台应用的开发体验」脚手架,能像 Web 应用那样很优雅地整合各种组件,包括依赖注入、配置、日志等功能。

项目地址: https://github.com/Deali-Axy/fluent-dotnet-console

本文需要用到 MemoryCache 等组件,用这个模板会比较方便,首先使用这个模板创建一个项目

# 安装模板
dotnet new install FluentConsole.Templates

创建项目

dotnet new flu-cli -n MyProject

准备配置文件

在配置文件里准备好 APIKeys

编辑项目的 appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Debug"
    }
  },
  "AppSettings": {
    "Name": "The name of this app is fluent console demo",
    "Boolean": true,
    "DemoList": [
      "item1",
      "item2",
      "item3"
    ],
    "ApiKeys": [
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx",
      "apikey-xxx"
    ]
  }
}

fluent-console 模板已经处理好了配置相关的逻辑,后续直接使用即可

配置服务

编辑 Program.cs 文件

添加需要的服务

services.AddMemoryCache();
services.AddScoped<ApiService>();

等下来 ApiService 里写代码

ApiService

Services 文件夹下创建 ApiService.cs 文件

先把依赖注入进来

using Flurl;
using Flurl.Http;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace MyProject.Services;

public class ApiService {
  private readonly ILogger<ApiService> _logger;
  private readonly AppSettings _settings;
  private readonly IMemoryCache _cache;

  public ApiService(IOptions<AppSettings> appOptions, IMemoryCache cache, ILogger<ApiService> logger) {
    _cache = cache;
    _logger = logger;
    _settings = appOptions.Value;
  }
}

封装 keys 管理

这里写了一个方法来获取一个可用的 APIKey

因为需要考虑并发运行,对 _cache 对象加锁

思路很简单上面已经介绍了,直接写成代码,同时写了很清楚的注释

private string? GetNextApiKey() {
  lock (_cache) {
    foreach (var key in _settings.ApiKeys) {
      if (_cache.TryGetValue(key, out int count)) {
        // 如果该 API Key 在缓存中存在,则检查其调用次数
        // 如果达到调用次数上限,则不再使用该 API Key,继续下一个 API Key
        if (count >= 5) {
          continue;
        }

        // 如果调用次数未达到上限,则增加调用次数并返回该 API Key
        _cache.Set(key, count + 1, DateTimeOffset.Now.AddSeconds(1));
        return key;
      }

      // 如果 API Key 不在缓存中,则将其添加到缓存中并返回
      _cache.Set(key, 1, DateTimeOffset.Now.AddSeconds(1));
      return key;
    }
  }

  return null; // 所有 API Key 都已被使用
}

修改属性

因为一开始是单线程版本,我直接用 ApiKey 来读取固定的 APIKey 配置。

现在直接把原本单个 APIKey 的属性改成调用 GetNextApiKey 方法

private string? ApiKey => GetNextApiKey();

请求接口的方法

改动不大,只需要添加一个判断就行

上面的 GetNextApiKey 在没有获取到可用 APIKey 的时候会返回 null

判断一下是否为空就行,没有 APIKey 就等个 1 秒。

public async Task<ApiResponse> RequestData(string somedata) {
  var key = ApiKey;
  
  // 所有API Key 都被用完
  if (key == null) {
    awAIt Task.Delay(1000);
    return await RequestData(somedata);
  }

  var url = "https://api.dealiaxy.com"
    .AppendPathSegment("one")
    .AppendPathSegment("service")
    .AppendPathSegment("v1")
    .SetQueryParams(new {
      key, somedata,
    });

  _logger.LogDebug("请求接口: {url}", url);
  var resp = await url.GetJsonAsync<ApiResponse>();

  return resp;
}

这里我使用了 Flurl 这个库来实现 URL 构建 + 网络请求 ,真滴好用!

并行调用接口

这里我写了一个闭包,其实也可以用 lambda 。

构建一个任务列表,然后使用 await Task.WhenAll 来等待全部任务执行完。

最后保存结果到 json 文件里

public async Task RequestParallel() {
  // 这里准备一些数据,几万个就行
  var data = new string[];

  var results = new List<ResultData>();

  // 写一个闭包来调用接口
  async Task MakeApiCall(int index) {
    var item = data[index];

    var resp = await RequestData(item);
    _logger.LogInformation("调用接口,数据: {data}, status: {status}, message: {message}", item, resp.status, resp.message);
    results.Add(resp.data);
  }

  var tasks = new List<Task>();
  for (var index = 0; index < data.Count; index++) {
    var i = index; // 由于闭包,需要在循环中创建一个新变量以避免问题
    tasks.Add(Task.Run(() => MakeApiCall(i)));
  }

  _logger.LogInformation("共有 {count} 个数据,开始请求", data.Count);
  await Task.WhenAll(tasks);

  _logger.LogInformation("搞定,写入文件");
  await File.WriteAllTextAsync("results.json", JsonSerializer.Serialize(results));
}

搞定!

显示进度

这时候来了个新问题

这么多数据,就算是并行执行,也需要一段时间

这时候显示进度显示就成了一个迫切需求

C# 内置了一个 IProgress ,但是只能设置个 total 之后直接更新当前进度,虽然 MakeApiCall 方法有个 index 表示任务的序号,但并发执行的时候是乱序的,显然不能用这个 index 来更新进度。

这时候只能再搞个 int progress ,每个任务就 +1

真麻烦,我直接上 ShellProgressBar 组件,之前用 C# 写爬虫的时候用过,详见这篇文章: C#爬虫开发小结

这个组件有个 Tick 模式就可以实现这个功能。

上代码吧,每个任务里执行一下 bar.Tick 就行了,很方便?

public async Task RequestParallel() {
  // 这里准备一些数据,几万个就行
  var data = new string[];

  var results = new List<ResultData>();

  var bar = new ProgressBar(data.Count, "正在执行");

  // 写一个闭包来调用接口
  async Task MakeApiCall(int index) {
    var item = data[index];

    var resp = await RequestData(item);
    _logger.LogInformation("调用接口,数据: {data}, status: {status}, message: {message}", item, resp.status, resp.message);
    results.Add(resp.data);

    // 更新进度
    bar.Tick();
  }

  var tasks = new List<Task>();
  for (var index = 0; index < data.Count; index++) {
    var i = index; // 由于闭包,需要在循环中创建一个新变量以避免问题
    tasks.Add(Task.Run(() => MakeApiCall(i)));
  }

  _logger.LogInformation("共有 {count} 个数据,开始请求", data.Count);
  await Task.WhenAll(tasks);

  _logger.LogInformation("搞定,写入文件");
  await File.WriteAllTextAsync("results.json", JsonSerializer.Serialize(results));
}
返回顶部
顶部