mirror of
https://git.mirrors.martin98.com/https://github.com/actions/toolkit
synced 2026-04-02 09:13:19 +08:00
Add option for concurrent cache downloads with timeout (#1484)
* Add option for concurrent cache downloads with timeout * Add release notes * Fix lint
This commit is contained in:
32
packages/cache/src/internal/cacheHttpClient.ts
vendored
32
packages/cache/src/internal/cacheHttpClient.ts
vendored
@@ -20,7 +20,11 @@ import {
|
||||
ITypedResponseWithError,
|
||||
ArtifactCacheList
|
||||
} from './contracts'
|
||||
import {downloadCacheHttpClient, downloadCacheStorageSDK} from './downloadUtils'
|
||||
import {
|
||||
downloadCacheHttpClient,
|
||||
downloadCacheHttpClientConcurrent,
|
||||
downloadCacheStorageSDK
|
||||
} from './downloadUtils'
|
||||
import {
|
||||
DownloadOptions,
|
||||
UploadOptions,
|
||||
@@ -171,14 +175,26 @@ export async function downloadCache(
|
||||
const archiveUrl = new URL(archiveLocation)
|
||||
const downloadOptions = getDownloadOptions(options)
|
||||
|
||||
if (
|
||||
downloadOptions.useAzureSdk &&
|
||||
archiveUrl.hostname.endsWith('.blob.core.windows.net')
|
||||
) {
|
||||
// Use Azure storage SDK to download caches hosted on Azure to improve speed and reliability.
|
||||
await downloadCacheStorageSDK(archiveLocation, archivePath, downloadOptions)
|
||||
if (archiveUrl.hostname.endsWith('.blob.core.windows.net')) {
|
||||
if (downloadOptions.useAzureSdk) {
|
||||
// Use Azure storage SDK to download caches hosted on Azure to improve speed and reliability.
|
||||
await downloadCacheStorageSDK(
|
||||
archiveLocation,
|
||||
archivePath,
|
||||
downloadOptions
|
||||
)
|
||||
} else if (downloadOptions.concurrentBlobDownloads) {
|
||||
// Use concurrent implementation with HttpClient to work around blob SDK issue
|
||||
await downloadCacheHttpClientConcurrent(
|
||||
archiveLocation,
|
||||
archivePath,
|
||||
downloadOptions
|
||||
)
|
||||
} else {
|
||||
// Otherwise, download using the Actions http-client.
|
||||
await downloadCacheHttpClient(archiveLocation, archivePath)
|
||||
}
|
||||
} else {
|
||||
// Otherwise, download using the Actions http-client.
|
||||
await downloadCacheHttpClient(archiveLocation, archivePath)
|
||||
}
|
||||
}
|
||||
|
||||
168
packages/cache/src/internal/downloadUtils.ts
vendored
168
packages/cache/src/internal/downloadUtils.ts
vendored
@@ -203,6 +203,166 @@ export async function downloadCacheHttpClient(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Download the cache using the Actions toolkit http-client concurrently
|
||||
*
|
||||
* @param archiveLocation the URL for the cache
|
||||
* @param archivePath the local path where the cache is saved
|
||||
*/
|
||||
export async function downloadCacheHttpClientConcurrent(
|
||||
archiveLocation: string,
|
||||
archivePath: fs.PathLike,
|
||||
options: DownloadOptions
|
||||
): Promise<void> {
|
||||
const archiveDescriptor = await fs.promises.open(archivePath, 'w')
|
||||
const httpClient = new HttpClient('actions/cache', undefined, {
|
||||
socketTimeout: options.timeoutInMs,
|
||||
keepAlive: true
|
||||
})
|
||||
try {
|
||||
const res = await retryHttpClientResponse(
|
||||
'downloadCacheMetadata',
|
||||
async () => await httpClient.request('HEAD', archiveLocation, null, {})
|
||||
)
|
||||
|
||||
const lengthHeader = res.message.headers['content-length']
|
||||
if (lengthHeader === undefined || lengthHeader === null) {
|
||||
throw new Error('Content-Length not found on blob response')
|
||||
}
|
||||
|
||||
const length = parseInt(lengthHeader)
|
||||
if (Number.isNaN(length)) {
|
||||
throw new Error(`Could not interpret Content-Length: ${length}`)
|
||||
}
|
||||
|
||||
const downloads: {
|
||||
offset: number
|
||||
promiseGetter: () => Promise<DownloadSegment>
|
||||
}[] = []
|
||||
const blockSize = 4 * 1024 * 1024
|
||||
|
||||
for (let offset = 0; offset < length; offset += blockSize) {
|
||||
const count = Math.min(blockSize, length - offset)
|
||||
downloads.push({
|
||||
offset,
|
||||
promiseGetter: async () => {
|
||||
return await downloadSegmentRetry(
|
||||
httpClient,
|
||||
archiveLocation,
|
||||
offset,
|
||||
count
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// reverse to use .pop instead of .shift
|
||||
downloads.reverse()
|
||||
let actives = 0
|
||||
let bytesDownloaded = 0
|
||||
const progress = new DownloadProgress(length)
|
||||
progress.startDisplayTimer()
|
||||
const progressFn = progress.onProgress()
|
||||
|
||||
const activeDownloads: {[offset: number]: Promise<DownloadSegment>} = []
|
||||
let nextDownload:
|
||||
| {offset: number; promiseGetter: () => Promise<DownloadSegment>}
|
||||
| undefined
|
||||
|
||||
const waitAndWrite: () => Promise<void> = async () => {
|
||||
const segment = await Promise.race(Object.values(activeDownloads))
|
||||
await archiveDescriptor.write(
|
||||
segment.buffer,
|
||||
0,
|
||||
segment.count,
|
||||
segment.offset
|
||||
)
|
||||
actives--
|
||||
delete activeDownloads[segment.offset]
|
||||
bytesDownloaded += segment.count
|
||||
progressFn({loadedBytes: bytesDownloaded})
|
||||
}
|
||||
|
||||
while ((nextDownload = downloads.pop())) {
|
||||
activeDownloads[nextDownload.offset] = nextDownload.promiseGetter()
|
||||
actives++
|
||||
|
||||
if (actives >= (options.downloadConcurrency ?? 10)) {
|
||||
await waitAndWrite()
|
||||
}
|
||||
}
|
||||
|
||||
while (actives > 0) {
|
||||
await waitAndWrite()
|
||||
}
|
||||
} finally {
|
||||
httpClient.dispose()
|
||||
await archiveDescriptor.close()
|
||||
}
|
||||
}
|
||||
|
||||
async function downloadSegmentRetry(
|
||||
httpClient: HttpClient,
|
||||
archiveLocation: string,
|
||||
offset: number,
|
||||
count: number
|
||||
): Promise<DownloadSegment> {
|
||||
const retries = 5
|
||||
let failures = 0
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const timeout = 30000
|
||||
const result = await promiseWithTimeout(
|
||||
timeout,
|
||||
downloadSegment(httpClient, archiveLocation, offset, count)
|
||||
)
|
||||
if (typeof result === 'string') {
|
||||
throw new Error('downloadSegmentRetry failed due to timeout')
|
||||
}
|
||||
|
||||
return result
|
||||
} catch (err) {
|
||||
if (failures >= retries) {
|
||||
throw err
|
||||
}
|
||||
|
||||
failures++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function downloadSegment(
|
||||
httpClient: HttpClient,
|
||||
archiveLocation: string,
|
||||
offset: number,
|
||||
count: number
|
||||
): Promise<DownloadSegment> {
|
||||
const partRes = await retryHttpClientResponse(
|
||||
'downloadCachePart',
|
||||
async () =>
|
||||
await httpClient.get(archiveLocation, {
|
||||
Range: `bytes=${offset}-${offset + count - 1}`
|
||||
})
|
||||
)
|
||||
|
||||
if (!partRes.readBodyBuffer) {
|
||||
throw new Error('Expected HttpClientResponse to implement readBodyBuffer')
|
||||
}
|
||||
|
||||
return {
|
||||
offset,
|
||||
count,
|
||||
buffer: await partRes.readBodyBuffer()
|
||||
}
|
||||
}
|
||||
|
||||
declare class DownloadSegment {
|
||||
offset: number
|
||||
count: number
|
||||
buffer: Buffer
|
||||
}
|
||||
|
||||
/**
|
||||
* Download the cache using the Azure Storage SDK. Only call this method if the
|
||||
* URL points to an Azure Storage endpoint.
|
||||
@@ -287,12 +447,12 @@ export async function downloadCacheStorageSDK(
|
||||
}
|
||||
}
|
||||
|
||||
const promiseWithTimeout = async (
|
||||
const promiseWithTimeout = async <T>(
|
||||
timeoutMs: number,
|
||||
promise: Promise<Buffer>
|
||||
): Promise<unknown> => {
|
||||
promise: Promise<T>
|
||||
): Promise<T | string> => {
|
||||
let timeoutHandle: NodeJS.Timeout
|
||||
const timeoutPromise = new Promise(resolve => {
|
||||
const timeoutPromise = new Promise<string>(resolve => {
|
||||
timeoutHandle = setTimeout(() => resolve('timeout'), timeoutMs)
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user