前言
使用场景是需要使用一个接口,这个接口有限制每个 APIKey 的请求量在 5次/s
一开始是最苯的做法,每次调用之后等个 200 毫秒,这样就不会超出这个限制
但是这样效率也太低了,刚好发现我们拥有不少 APIKey ,那么直接改成并发的吧,安排!
本文做一个简单的记录
思路
将每个 APIKey 的调用情况保存在内存里
C# 提供的 MemoryCache 组件是个 key-value 结构,并且可以设置每个值的过期时间
我把 APIKey 作为 key 存入,value 则是已使用的次数,并设置过期时间为 1 秒
这样只需要判断某个 APIKey 的使用次数是否小于 5 ,小于5就拿来用,大于5就读取配置拿新一个的 APIKey 。
使用 fluent-console
fluent-console 是我之前开发的 C# Console 应用模板,提供「现代化的控制台应用的开发体验」脚手架,能像 Web 应用那样很优雅地整合各种组件,包括依赖注入、配置、日志等功能。
项目地址: https://github.com/Deali-Axy/fluent-dotnet-console
本文需要用到 MemoryCache 等组件,用这个模板会比较方便,首先使用这个模板创建一个项目
# 安装模板
dotnet new install FluentConsole.Templates
创建项目
dotnet new flu-cli -n MyProject
准备配置文件
在配置文件里准备好 APIKeys
编辑项目的 appsettings.json
{
"Logging": {
"LogLevel": {
"Default": "Debug"
}
},
"AppSettings": {
"Name": "The name of this app is fluent console demo",
"Boolean": true,
"DemoList": [
"item1",
"item2",
"item3"
],
"ApiKeys": [
"apikey-xxx",
"apikey-xxx",
"apikey-xxx",
"apikey-xxx",
"apikey-xxx",
"apikey-xxx",
"apikey-xxx",
"apikey-xxx"
]
}
}
fluent-console 模板已经处理好了配置相关的逻辑,后续直接使用即可
配置服务
编辑 Program.cs
文件
添加需要的服务
services.AddMemoryCache();
services.AddScoped<ApiService>();
等下来 ApiService
里写代码
ApiService
在 Services
文件夹下创建 ApiService.cs
文件
先把依赖注入进来
using Flurl;
using Flurl.Http;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace MyProject.Services;
public class ApiService {
private readonly ILogger<ApiService> _logger;
private readonly AppSettings _settings;
private readonly IMemoryCache _cache;
public ApiService(IOptions<AppSettings> appOptions, IMemoryCache cache, ILogger<ApiService> logger) {
_cache = cache;
_logger = logger;
_settings = appOptions.Value;
}
}
封装 keys 管理
这里写了一个方法来获取一个可用的 APIKey
因为需要考虑并发运行,对 _cache
对象加锁
思路很简单上面已经介绍了,直接写成代码,同时写了很清楚的注释
private string? GetNextApiKey() {
lock (_cache) {
foreach (var key in _settings.ApiKeys) {
if (_cache.TryGetValue(key, out int count)) {
// 如果该 API Key 在缓存中存在,则检查其调用次数
// 如果达到调用次数上限,则不再使用该 API Key,继续下一个 API Key
if (count >= 5) {
continue;
}
// 如果调用次数未达到上限,则增加调用次数并返回该 API Key
_cache.Set(key, count + 1, DateTimeOffset.Now.AddSeconds(1));
return key;
}
// 如果 API Key 不在缓存中,则将其添加到缓存中并返回
_cache.Set(key, 1, DateTimeOffset.Now.AddSeconds(1));
return key;
}
}
return null; // 所有 API Key 都已被使用
}
修改属性
因为一开始是单线程版本,我直接用 ApiKey
来读取固定的 APIKey 配置。
现在直接把原本单个 APIKey 的属性改成调用 GetNextApiKey
方法
private string? ApiKey => GetNextApiKey();
请求接口的方法
改动不大,只需要添加一个判断就行
上面的 GetNextApiKey
在没有获取到可用 APIKey 的时候会返回 null
判断一下是否为空就行,没有 APIKey 就等个 1 秒。
public async Task<ApiResponse> RequestData(string somedata) {
var key = ApiKey;
// 所有API Key 都被用完
if (key == null) {
awAIt Task.Delay(1000);
return await RequestData(somedata);
}
var url = "https://api.dealiaxy.com"
.AppendPathSegment("one")
.AppendPathSegment("service")
.AppendPathSegment("v1")
.SetQueryParams(new {
key, somedata,
});
_logger.LogDebug("请求接口: {url}", url);
var resp = await url.GetJsonAsync<ApiResponse>();
return resp;
}
这里我使用了 Flurl
这个库来实现 URL 构建 + 网络请求 ,真滴好用!
并行调用接口
这里我写了一个闭包,其实也可以用 lambda 。
构建一个任务列表,然后使用 await Task.WhenAll
来等待全部任务执行完。
最后保存结果到 json 文件里
public async Task RequestParallel() {
// 这里准备一些数据,几万个就行
var data = new string[];
var results = new List<ResultData>();
// 写一个闭包来调用接口
async Task MakeApiCall(int index) {
var item = data[index];
var resp = await RequestData(item);
_logger.LogInformation("调用接口,数据: {data}, status: {status}, message: {message}", item, resp.status, resp.message);
results.Add(resp.data);
}
var tasks = new List<Task>();
for (var index = 0; index < data.Count; index++) {
var i = index; // 由于闭包,需要在循环中创建一个新变量以避免问题
tasks.Add(Task.Run(() => MakeApiCall(i)));
}
_logger.LogInformation("共有 {count} 个数据,开始请求", data.Count);
await Task.WhenAll(tasks);
_logger.LogInformation("搞定,写入文件");
await File.WriteAllTextAsync("results.json", JsonSerializer.Serialize(results));
}
搞定!
显示进度
这时候来了个新问题
这么多数据,就算是并行执行,也需要一段时间
这时候显示进度显示就成了一个迫切需求
C# 内置了一个 IProgress
,但是只能设置个 total 之后直接更新当前进度,虽然 MakeApiCall
方法有个 index
表示任务的序号,但并发执行的时候是乱序的,显然不能用这个 index
来更新进度。
这时候只能再搞个 int progress
,每个任务就 +1
真麻烦,我直接上 ShellProgressBar
组件,之前用 C# 写爬虫的时候用过,详见这篇文章: C#爬虫开发小结
这个组件有个 Tick
模式就可以实现这个功能。
上代码吧,每个任务里执行一下 bar.Tick
就行了,很方便?
public async Task RequestParallel() {
// 这里准备一些数据,几万个就行
var data = new string[];
var results = new List<ResultData>();
var bar = new ProgressBar(data.Count, "正在执行");
// 写一个闭包来调用接口
async Task MakeApiCall(int index) {
var item = data[index];
var resp = await RequestData(item);
_logger.LogInformation("调用接口,数据: {data}, status: {status}, message: {message}", item, resp.status, resp.message);
results.Add(resp.data);
// 更新进度
bar.Tick();
}
var tasks = new List<Task>();
for (var index = 0; index < data.Count; index++) {
var i = index; // 由于闭包,需要在循环中创建一个新变量以避免问题
tasks.Add(Task.Run(() => MakeApiCall(i)));
}
_logger.LogInformation("共有 {count} 个数据,开始请求", data.Count);
await Task.WhenAll(tasks);
_logger.LogInformation("搞定,写入文件");
await File.WriteAllTextAsync("results.json", JsonSerializer.Serialize(results));
}