Skip to content

Instantly share code, notes, and snippets.

@AngusFu
Created April 19, 2022 08:46
Show Gist options
  • Save AngusFu/b99b0ffb001b0bf4009d09b5e3b041d6 to your computer and use it in GitHub Desktop.
Save AngusFu/b99b0ffb001b0bf4009d09b5e3b041d6 to your computer and use it in GitHub Desktop.
Upload Tasks
export class Uploader<FileData, UploadResponse> {
private tasks: ITask<FileData, UploadResponse>[] = [];
constructor(public options: UploadOptions<FileData, UploadResponse>) {}
private async uploadWithRetry(task: ITask<FileData, UploadResponse>) {
let index = 0;
const {
file,
deferred: { resolve, reject },
} = task;
const {
backoff = fibonacci,
maxRetries = 5,
uploadContent,
shouldRetry,
} = this.options;
while (index < 1 + maxRetries) {
try {
resolve(await uploadContent(file));
break;
} catch (err) {
if (shouldRetry?.(err)) {
await wait(backoff((index += 1)));
} else {
reject({ reason: 'unableToRetry' });
}
}
}
reject({ reason: 'maxRetryCountExceeded' });
}
private checkTasks() {
const {
tasks,
options: { parallelLimit = 5 },
} = this;
let available = parallelLimit - tasks.filter((task) => task.running).length;
for (let i = 0; available > 0 && i < tasks.length; i++) {
const task = tasks[i];
if (!task.running) {
task.running = true;
available -= 1;
this.uploadWithRetry(task).finally(() => {
tasks.splice(tasks.indexOf(task), 1);
process.nextTick(() => this.checkTasks());
});
}
}
}
public upload(file: FileData): Promise<UploadResponse> {
return new Promise((resolve, reject) => {
this.tasks.push({
file,
running: false,
deferred: { resolve, reject },
});
process.nextTick(() => this.checkTasks());
});
}
}
export type UploadOptions<FileData, UploadResponse> = {
maxRetries?: number;
parallelLimit?: number;
backoff?(n: number): number;
shouldRetry(err: unknown): boolean | undefined;
uploadContent(file: FileData): Promise<UploadResponse>;
};
type ITask<F, R> = {
file: F;
running: boolean;
deferred: {
resolve(response: R): void;
reject(error: { reason: 'unableToRetry' | 'maxRetryCountExceeded' }): void;
};
};
function wait(seconds: number) {
return new Promise((resolve) => setTimeout(resolve, seconds * 1000));
}
function fibonacci(n: number, ac1 = 1, ac2 = 1): number {
return n < 2 ? ac2 : fibonacci(n - 1, ac2, ac1 + ac2);
}
import crypto from 'crypto';
import { Uploader } from './uploader';
const cache = new Map<string, string>();
const md5 = (file: string | Buffer) =>
crypto.createHash('md5').update(file).digest('hex');
const uploader = new Uploader({
maxRetries: 10,
parallelLimit: 5,
backoff(n) {
return Math.pow(n, 2); // fibonacci(n)
},
shouldRetry(error) {
// 这里可以针对 error 的类型做一些判断
// 有些错误可以尝试重新上传的返回 true 即可
if (error && (error as { code: number }).code === 1234) {
return true;
}
return false;
},
async uploadContent(file: {
name: string;
content: string | Buffer;
extname: string;
}) {
const hash = md5(file.content);
if (cache.has(hash)) {
return cache.get(hash);
}
return fetch('/path/to/upload', { body: file.content });
},
});
uploader
.upload({
name: 'hello.js',
content: 'console.log("hello world");',
extname: 'js',
})
.then((res) => {
// 上传成功
})
.catch((err) => {
// 上传失败
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment