mirror of
https://git.mirrors.martin98.com/https://github.com/actions/toolkit
synced 2026-04-03 18:53:18 +08:00
Use Azure storage SDK to download cache (#497)
* Adds option to download using AzCopy * Bump version number and add release notes * Ensure we use at least v10 * Negate env var so it disables AzCopy * Use Azure storage SDK to download cache * Use same level of parallelism as AzCopy * Fix naming of variable * React to feedback * Bump Node types to Node 12 * Make linter happy * Pass options into restoreCache method * Fix tests * Restructure files and add tests * Add method to get the default download and upload options * Include breaking changes in RELEASES.md Co-authored-by: Josh Gross <joshmgross@github.com>
This commit is contained in:
187
packages/cache/src/internal/cacheHttpClient.ts
vendored
187
packages/cache/src/internal/cacheHttpClient.ts
vendored
@@ -1,18 +1,13 @@
|
||||
import * as core from '@actions/core'
|
||||
import {HttpClient, HttpCodes} from '@actions/http-client'
|
||||
import {HttpClient} from '@actions/http-client'
|
||||
import {BearerCredentialHandler} from '@actions/http-client/auth'
|
||||
import {
|
||||
IHttpClientResponse,
|
||||
IRequestOptions,
|
||||
ITypedResponse
|
||||
} from '@actions/http-client/interfaces'
|
||||
import {IRequestOptions, ITypedResponse} from '@actions/http-client/interfaces'
|
||||
import * as crypto from 'crypto'
|
||||
import * as fs from 'fs'
|
||||
import * as stream from 'stream'
|
||||
import * as util from 'util'
|
||||
import {URL} from 'url'
|
||||
|
||||
import * as utils from './cacheUtils'
|
||||
import {CompressionMethod, SocketTimeout} from './constants'
|
||||
import {CompressionMethod} from './constants'
|
||||
import {
|
||||
ArtifactCacheEntry,
|
||||
InternalCacheOptions,
|
||||
@@ -20,36 +15,21 @@ import {
|
||||
ReserveCacheRequest,
|
||||
ReserveCacheResponse
|
||||
} from './contracts'
|
||||
import {UploadOptions} from '../options'
|
||||
import {downloadCacheHttpClient, downloadCacheStorageSDK} from './downloadUtils'
|
||||
import {
|
||||
DownloadOptions,
|
||||
UploadOptions,
|
||||
getDownloadOptions,
|
||||
getUploadOptions
|
||||
} from '../options'
|
||||
import {
|
||||
isSuccessStatusCode,
|
||||
retryHttpClientResponse,
|
||||
retryTypedResponse
|
||||
} from './requestUtils'
|
||||
|
||||
const versionSalt = '1.0'
|
||||
|
||||
function isSuccessStatusCode(statusCode?: number): boolean {
|
||||
if (!statusCode) {
|
||||
return false
|
||||
}
|
||||
return statusCode >= 200 && statusCode < 300
|
||||
}
|
||||
|
||||
function isServerErrorStatusCode(statusCode?: number): boolean {
|
||||
if (!statusCode) {
|
||||
return true
|
||||
}
|
||||
return statusCode >= 500
|
||||
}
|
||||
|
||||
function isRetryableStatusCode(statusCode?: number): boolean {
|
||||
if (!statusCode) {
|
||||
return false
|
||||
}
|
||||
const retryableStatusCodes = [
|
||||
HttpCodes.BadGateway,
|
||||
HttpCodes.ServiceUnavailable,
|
||||
HttpCodes.GatewayTimeout
|
||||
]
|
||||
return retryableStatusCodes.includes(statusCode)
|
||||
}
|
||||
|
||||
function getCacheApiUrl(resource: string): string {
|
||||
// Ideally we just use ACTIONS_CACHE_URL
|
||||
const baseUrl: string = (
|
||||
@@ -110,75 +90,6 @@ export function getCacheVersion(
|
||||
.digest('hex')
|
||||
}
|
||||
|
||||
export async function retry<T>(
|
||||
name: string,
|
||||
method: () => Promise<T>,
|
||||
getStatusCode: (arg0: T) => number | undefined,
|
||||
maxAttempts = 2
|
||||
): Promise<T> {
|
||||
let response: T | undefined = undefined
|
||||
let statusCode: number | undefined = undefined
|
||||
let isRetryable = false
|
||||
let errorMessage = ''
|
||||
let attempt = 1
|
||||
|
||||
while (attempt <= maxAttempts) {
|
||||
try {
|
||||
response = await method()
|
||||
statusCode = getStatusCode(response)
|
||||
|
||||
if (!isServerErrorStatusCode(statusCode)) {
|
||||
return response
|
||||
}
|
||||
|
||||
isRetryable = isRetryableStatusCode(statusCode)
|
||||
errorMessage = `Cache service responded with ${statusCode}`
|
||||
} catch (error) {
|
||||
isRetryable = true
|
||||
errorMessage = error.message
|
||||
}
|
||||
|
||||
core.debug(
|
||||
`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`
|
||||
)
|
||||
|
||||
if (!isRetryable) {
|
||||
core.debug(`${name} - Error is not retryable`)
|
||||
break
|
||||
}
|
||||
|
||||
attempt++
|
||||
}
|
||||
|
||||
throw Error(`${name} failed: ${errorMessage}`)
|
||||
}
|
||||
|
||||
export async function retryTypedResponse<T>(
|
||||
name: string,
|
||||
method: () => Promise<ITypedResponse<T>>,
|
||||
maxAttempts = 2
|
||||
): Promise<ITypedResponse<T>> {
|
||||
return await retry(
|
||||
name,
|
||||
method,
|
||||
(response: ITypedResponse<T>) => response.statusCode,
|
||||
maxAttempts
|
||||
)
|
||||
}
|
||||
|
||||
export async function retryHttpClientResponse<T>(
|
||||
name: string,
|
||||
method: () => Promise<IHttpClientResponse>,
|
||||
maxAttempts = 2
|
||||
): Promise<IHttpClientResponse> {
|
||||
return await retry(
|
||||
name,
|
||||
method,
|
||||
(response: IHttpClientResponse) => response.message.statusCode,
|
||||
maxAttempts
|
||||
)
|
||||
}
|
||||
|
||||
export async function getCacheEntry(
|
||||
keys: string[],
|
||||
paths: string[],
|
||||
@@ -212,47 +123,23 @@ export async function getCacheEntry(
|
||||
return cacheResult
|
||||
}
|
||||
|
||||
async function pipeResponseToStream(
|
||||
response: IHttpClientResponse,
|
||||
output: NodeJS.WritableStream
|
||||
): Promise<void> {
|
||||
const pipeline = util.promisify(stream.pipeline)
|
||||
await pipeline(response.message, output)
|
||||
}
|
||||
|
||||
export async function downloadCache(
|
||||
archiveLocation: string,
|
||||
archivePath: string
|
||||
archivePath: string,
|
||||
options?: DownloadOptions
|
||||
): Promise<void> {
|
||||
const writeStream = fs.createWriteStream(archivePath)
|
||||
const httpClient = new HttpClient('actions/cache')
|
||||
const downloadResponse = await retryHttpClientResponse(
|
||||
'downloadCache',
|
||||
async () => httpClient.get(archiveLocation)
|
||||
)
|
||||
const archiveUrl = new URL(archiveLocation)
|
||||
const downloadOptions = getDownloadOptions(options)
|
||||
|
||||
// Abort download if no traffic received over the socket.
|
||||
downloadResponse.message.socket.setTimeout(SocketTimeout, () => {
|
||||
downloadResponse.message.destroy()
|
||||
core.debug(`Aborting download, socket timed out after ${SocketTimeout} ms`)
|
||||
})
|
||||
|
||||
await pipeResponseToStream(downloadResponse, writeStream)
|
||||
|
||||
// Validate download size.
|
||||
const contentLengthHeader = downloadResponse.message.headers['content-length']
|
||||
|
||||
if (contentLengthHeader) {
|
||||
const expectedLength = parseInt(contentLengthHeader)
|
||||
const actualLength = utils.getArchiveFileSizeIsBytes(archivePath)
|
||||
|
||||
if (actualLength !== expectedLength) {
|
||||
throw new Error(
|
||||
`Incomplete download. Expected file size: ${expectedLength}, actual file size: ${actualLength}`
|
||||
)
|
||||
}
|
||||
if (
|
||||
downloadOptions.useAzureSdk &&
|
||||
archiveUrl.hostname.endsWith('.blob.core.windows.net')
|
||||
) {
|
||||
// Use Azure storage SDK to download caches hosted on Azure to improve speed and reliability.
|
||||
await downloadCacheStorageSDK(archiveLocation, archivePath, downloadOptions)
|
||||
} else {
|
||||
core.debug('Unable to validate download, no Content-Length header')
|
||||
// Otherwise, download using the Actions http-client.
|
||||
await downloadCacheHttpClient(archiveLocation, archivePath)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -329,10 +216,16 @@ async function uploadFile(
|
||||
const fileSize = fs.statSync(archivePath).size
|
||||
const resourceUrl = getCacheApiUrl(`caches/${cacheId.toString()}`)
|
||||
const fd = fs.openSync(archivePath, 'r')
|
||||
const uploadOptions = getUploadOptions(options)
|
||||
|
||||
const concurrency = options?.uploadConcurrency ?? 4 // # of HTTP requests in parallel
|
||||
const MAX_CHUNK_SIZE = options?.uploadChunkSize ?? 32 * 1024 * 1024 // 32 MB Chunks
|
||||
core.debug(`Concurrency: ${concurrency} and Chunk Size: ${MAX_CHUNK_SIZE}`)
|
||||
const concurrency = utils.assertDefined(
|
||||
'uploadConcurrency',
|
||||
uploadOptions.uploadConcurrency
|
||||
)
|
||||
const maxChunkSize = utils.assertDefined(
|
||||
'uploadChunkSize',
|
||||
uploadOptions.uploadChunkSize
|
||||
)
|
||||
|
||||
const parallelUploads = [...new Array(concurrency).keys()]
|
||||
core.debug('Awaiting all uploads')
|
||||
@@ -342,10 +235,10 @@ async function uploadFile(
|
||||
await Promise.all(
|
||||
parallelUploads.map(async () => {
|
||||
while (offset < fileSize) {
|
||||
const chunkSize = Math.min(fileSize - offset, MAX_CHUNK_SIZE)
|
||||
const chunkSize = Math.min(fileSize - offset, maxChunkSize)
|
||||
const start = offset
|
||||
const end = offset + chunkSize - 1
|
||||
offset += MAX_CHUNK_SIZE
|
||||
offset += maxChunkSize
|
||||
|
||||
await uploadChunk(
|
||||
httpClient,
|
||||
@@ -360,7 +253,7 @@ async function uploadFile(
|
||||
})
|
||||
.on('error', error => {
|
||||
throw new Error(
|
||||
`Cache upload failed because file read failed with ${error.Message}`
|
||||
`Cache upload failed because file read failed with ${error.message}`
|
||||
)
|
||||
}),
|
||||
start,
|
||||
|
||||
8
packages/cache/src/internal/cacheUtils.ts
vendored
8
packages/cache/src/internal/cacheUtils.ts
vendored
@@ -113,3 +113,11 @@ export async function isGnuTarInstalled(): Promise<boolean> {
|
||||
const versionOutput = await getVersion('tar')
|
||||
return versionOutput.toLowerCase().includes('gnu tar')
|
||||
}
|
||||
|
||||
export function assertDefined<T>(name: string, value?: T): T {
|
||||
if (value === undefined) {
|
||||
throw Error(`Expected ${name} but value was undefiend`)
|
||||
}
|
||||
|
||||
return value
|
||||
}
|
||||
|
||||
134
packages/cache/src/internal/downloadUtils.ts
vendored
Normal file
134
packages/cache/src/internal/downloadUtils.ts
vendored
Normal file
@@ -0,0 +1,134 @@
|
||||
import * as core from '@actions/core'
|
||||
import {HttpClient} from '@actions/http-client'
|
||||
import {IHttpClientResponse} from '@actions/http-client/interfaces'
|
||||
import {BlockBlobClient} from '@azure/storage-blob'
|
||||
import * as buffer from 'buffer'
|
||||
import * as fs from 'fs'
|
||||
import * as stream from 'stream'
|
||||
import * as util from 'util'
|
||||
|
||||
import * as utils from './cacheUtils'
|
||||
import {SocketTimeout} from './constants'
|
||||
import {DownloadOptions} from '../options'
|
||||
import {retryHttpClientResponse} from './requestUtils'
|
||||
|
||||
/**
|
||||
* Pipes the body of a HTTP response to a stream
|
||||
*
|
||||
* @param response the HTTP response
|
||||
* @param output the writable stream
|
||||
*/
|
||||
async function pipeResponseToStream(
|
||||
response: IHttpClientResponse,
|
||||
output: NodeJS.WritableStream
|
||||
): Promise<void> {
|
||||
const pipeline = util.promisify(stream.pipeline)
|
||||
await pipeline(response.message, output)
|
||||
}
|
||||
|
||||
/**
|
||||
* Download the cache using the Actions toolkit http-client
|
||||
*
|
||||
* @param archiveLocation the URL for the cache
|
||||
* @param archivePath the local path where the cache is saved
|
||||
*/
|
||||
export async function downloadCacheHttpClient(
|
||||
archiveLocation: string,
|
||||
archivePath: string
|
||||
): Promise<void> {
|
||||
const writeStream = fs.createWriteStream(archivePath)
|
||||
const httpClient = new HttpClient('actions/cache')
|
||||
const downloadResponse = await retryHttpClientResponse(
|
||||
'downloadCache',
|
||||
async () => httpClient.get(archiveLocation)
|
||||
)
|
||||
|
||||
// Abort download if no traffic received over the socket.
|
||||
downloadResponse.message.socket.setTimeout(SocketTimeout, () => {
|
||||
downloadResponse.message.destroy()
|
||||
core.debug(`Aborting download, socket timed out after ${SocketTimeout} ms`)
|
||||
})
|
||||
|
||||
await pipeResponseToStream(downloadResponse, writeStream)
|
||||
|
||||
// Validate download size.
|
||||
const contentLengthHeader = downloadResponse.message.headers['content-length']
|
||||
|
||||
if (contentLengthHeader) {
|
||||
const expectedLength = parseInt(contentLengthHeader)
|
||||
const actualLength = utils.getArchiveFileSizeIsBytes(archivePath)
|
||||
|
||||
if (actualLength !== expectedLength) {
|
||||
throw new Error(
|
||||
`Incomplete download. Expected file size: ${expectedLength}, actual file size: ${actualLength}`
|
||||
)
|
||||
}
|
||||
} else {
|
||||
core.debug('Unable to validate download, no Content-Length header')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Download the cache using the Azure Storage SDK. Only call this method if the
|
||||
* URL points to an Azure Storage endpoint.
|
||||
*
|
||||
* @param archiveLocation the URL for the cache
|
||||
* @param archivePath the local path where the cache is saved
|
||||
* @param options the download options with the defaults set
|
||||
*/
|
||||
export async function downloadCacheStorageSDK(
|
||||
archiveLocation: string,
|
||||
archivePath: string,
|
||||
options: DownloadOptions
|
||||
): Promise<void> {
|
||||
const client = new BlockBlobClient(archiveLocation, undefined, {
|
||||
retryOptions: {
|
||||
// Override the timeout used when downloading each 4 MB chunk
|
||||
// The default is 2 min / MB, which is way too slow
|
||||
tryTimeoutInMs: options.timeoutInMs
|
||||
}
|
||||
})
|
||||
|
||||
const properties = await client.getProperties()
|
||||
const contentLength = properties.contentLength ?? -1
|
||||
|
||||
if (contentLength < 0) {
|
||||
// We should never hit this condition, but just in case fall back to downloading the
|
||||
// file as one large stream
|
||||
core.debug(
|
||||
'Unable to determine content length, downloading file with http-client...'
|
||||
)
|
||||
|
||||
await downloadCacheHttpClient(archiveLocation, archivePath)
|
||||
} else {
|
||||
// Use downloadToBuffer for faster downloads, since internally it splits the
|
||||
// file into 4 MB chunks which can then be parallelized and retried independently
|
||||
//
|
||||
// If the file exceeds the buffer maximum length (~1 GB on 32-bit systems and ~2 GB
|
||||
// on 64-bit systems), split the download into multiple segments
|
||||
const maxSegmentSize = buffer.constants.MAX_LENGTH
|
||||
let offset = 0
|
||||
|
||||
const fd = fs.openSync(archivePath, 'w')
|
||||
|
||||
try {
|
||||
while (offset < contentLength) {
|
||||
const segmentSize = Math.min(maxSegmentSize, contentLength - offset)
|
||||
core.debug(
|
||||
`Downloading segment at offset ${offset} with length ${segmentSize}...`
|
||||
)
|
||||
|
||||
const result = await client.downloadToBuffer(offset, segmentSize, {
|
||||
concurrency: options.downloadConcurrency
|
||||
})
|
||||
|
||||
fs.writeFileSync(fd, result)
|
||||
|
||||
core.debug(`Finished segment at offset ${offset}`)
|
||||
offset += segmentSize
|
||||
}
|
||||
} finally {
|
||||
fs.closeSync(fd)
|
||||
}
|
||||
}
|
||||
}
|
||||
101
packages/cache/src/internal/requestUtils.ts
vendored
Normal file
101
packages/cache/src/internal/requestUtils.ts
vendored
Normal file
@@ -0,0 +1,101 @@
|
||||
import * as core from '@actions/core'
|
||||
import {HttpCodes} from '@actions/http-client'
|
||||
import {
|
||||
IHttpClientResponse,
|
||||
ITypedResponse
|
||||
} from '@actions/http-client/interfaces'
|
||||
|
||||
export function isSuccessStatusCode(statusCode?: number): boolean {
|
||||
if (!statusCode) {
|
||||
return false
|
||||
}
|
||||
return statusCode >= 200 && statusCode < 300
|
||||
}
|
||||
|
||||
export function isServerErrorStatusCode(statusCode?: number): boolean {
|
||||
if (!statusCode) {
|
||||
return true
|
||||
}
|
||||
return statusCode >= 500
|
||||
}
|
||||
|
||||
export function isRetryableStatusCode(statusCode?: number): boolean {
|
||||
if (!statusCode) {
|
||||
return false
|
||||
}
|
||||
const retryableStatusCodes = [
|
||||
HttpCodes.BadGateway,
|
||||
HttpCodes.ServiceUnavailable,
|
||||
HttpCodes.GatewayTimeout
|
||||
]
|
||||
return retryableStatusCodes.includes(statusCode)
|
||||
}
|
||||
|
||||
export async function retry<T>(
|
||||
name: string,
|
||||
method: () => Promise<T>,
|
||||
getStatusCode: (arg0: T) => number | undefined,
|
||||
maxAttempts = 2
|
||||
): Promise<T> {
|
||||
let response: T | undefined = undefined
|
||||
let statusCode: number | undefined = undefined
|
||||
let isRetryable = false
|
||||
let errorMessage = ''
|
||||
let attempt = 1
|
||||
|
||||
while (attempt <= maxAttempts) {
|
||||
try {
|
||||
response = await method()
|
||||
statusCode = getStatusCode(response)
|
||||
|
||||
if (!isServerErrorStatusCode(statusCode)) {
|
||||
return response
|
||||
}
|
||||
|
||||
isRetryable = isRetryableStatusCode(statusCode)
|
||||
errorMessage = `Cache service responded with ${statusCode}`
|
||||
} catch (error) {
|
||||
isRetryable = true
|
||||
errorMessage = error.message
|
||||
}
|
||||
|
||||
core.debug(
|
||||
`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`
|
||||
)
|
||||
|
||||
if (!isRetryable) {
|
||||
core.debug(`${name} - Error is not retryable`)
|
||||
break
|
||||
}
|
||||
|
||||
attempt++
|
||||
}
|
||||
|
||||
throw Error(`${name} failed: ${errorMessage}`)
|
||||
}
|
||||
|
||||
export async function retryTypedResponse<T>(
|
||||
name: string,
|
||||
method: () => Promise<ITypedResponse<T>>,
|
||||
maxAttempts = 2
|
||||
): Promise<ITypedResponse<T>> {
|
||||
return await retry(
|
||||
name,
|
||||
method,
|
||||
(response: ITypedResponse<T>) => response.statusCode,
|
||||
maxAttempts
|
||||
)
|
||||
}
|
||||
|
||||
export async function retryHttpClientResponse<T>(
|
||||
name: string,
|
||||
method: () => Promise<IHttpClientResponse>,
|
||||
maxAttempts = 2
|
||||
): Promise<IHttpClientResponse> {
|
||||
return await retry(
|
||||
name,
|
||||
method,
|
||||
(response: IHttpClientResponse) => response.message.statusCode,
|
||||
maxAttempts
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user