<?php
namespace Aws\S3;
use Aws;
use Aws\CommandInterface;
use GuzzleHttp\Promise;
use GuzzleHttp\Psr7;
use GuzzleHttp\Promise\PromisorInterface;
use Iterator;
/**
* Transfers files from the local filesystem to S3 or from S3 to the local
* filesystem.
*
* This class does not support copying from the local filesystem to somewhere
* else on the local filesystem or from one S3 bucket to another.
*/
class Transfer implements PromisorInterface
{
private $client;
private $promise;
private $source;
private $sourceMetadata;
private $destination;
private $concurrency;
private $mupThreshold;
private $before;
private $s3Args = [];
/**
* When providing the $source argument, you may provide a string referencing
* the path to a directory on disk to upload, an s3 scheme URI that contains
* the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object
* that yields strings containing filenames that are the path to a file on
* disk or an s3 scheme URI. The "/key" portion of an s3 URI is optional.
*
* When providing an iterator for the $source argument, you must also
* provide a 'base_dir' key value pair in the $options argument.
*
* The $dest argument can be the path to a directory on disk or an s3
* scheme URI (e.g., "s3://bucket/key").
*
* The options array can contain the following key value pairs:
*
* - base_dir: (string) Base directory of the source, if $source is an
* iterator. If the $source option is not an array, then this option is
* ignored.
* - before: (callable) A callback to invoke before each transfer. The
* callback accepts the following positional arguments: string $source,
* string $dest, Aws\CommandInterface $command. The provided command will
* be either a GetObject, PutObject, InitiateMultipartUpload, or
* UploadPart command.
* - mup_threshold: (int) Size in bytes in which a multipart upload should
* be used instead of PutObject. Defaults to 20971520 (20 MB).
* - concurrency: (int, default=5) Number of files to upload concurrently.
* The ideal concurrency value will vary based on the number of files
* being uploaded and the average size of each file. Generally speaking,
* smaller files benefit from a higher concurrency while larger files
* will not.
* - debug: (bool) Set to true to print out debug information for
* transfers. Set to an fopen() resource to write to a specific stream
* rather than writing to STDOUT.
*
* @param S3Client $client Client used for transfers.
* @param string|Iterator $source Where the files are transferred from.
* @param string $dest Where the files are transferred to.
* @param array $options Hash of options.
*/
public function __construct(
S3Client $client,
$source,
$dest,
array $options = []
) {
$this->client = $client;
// Prepare the destination.
$this->destination = $this->prepareTarget($dest);
if ($this->destination['scheme'] === 's3') {
$this->s3Args = $this->getS3Args($this->destination['path']);
}
// Prepare the source.
if (is_string($source)) {
$this->sourceMetadata = $this->prepareTarget($source);
$this->source = $source;
} elseif ($source instanceof Iterator) {
if (empty($options['base_dir'])) {
throw new \InvalidArgumentException('You must provide the source'
. ' argument as a string or provide the "base_dir" option.');
}
$this->sourceMetadata = $this->prepareTarget($options['base_dir']);
$this->source = $source;
} else {
throw new \InvalidArgumentException('source must be the path to a '
. 'directory or an iterator that yields file names.');
}
// Validate schemes.
if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) {
throw new \InvalidArgumentException("You cannot copy from"
. " {$this->sourceMetadata['scheme']} to"
. " {$this->destination['scheme']}."
);
}
// Handle multipart-related options.
$this->concurrency = isset($options['concurrency'])
? $options['concurrency']
: MultipartUploader::DEFAULT_CONCURRENCY;
$this->mupThreshold = isset($options['mup_threshold'])
? $options['mup_threshold']
: 16777216;
if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) {
throw new \InvalidArgumentException('mup_threshold must be >= 5MB');
}
// Handle "before" callback option.
if (isset($options['before'])) {
$this->before = $options['before'];
if (!is_callable($this->before)) {
throw new \InvalidArgumentException('before must be a callable.');
}
}
// Handle "debug" option.
if (isset($options['debug'])) {
if ($options['debug'] === true) {
$options['debug'] = fopen('php://output', 'w');
}
$this->addDebugToBefore($options['debug']);
}
}
/**
* Transfers the files.
*/
public function promise()
{
// If the promise has been created, just return it.
if (!$this->promise) {
// Create an upload/download promise for the transfer.
$this->promise = $this->sourceMetadata['scheme'] === 'file'
? $this->createUploadPromise()
: $this->createDownloadPromise();
}
return $this->promise;
}
/**
* Transfers the files synchronously.
*/
public function transfer()
{
$this->promise()->wait();
}
private function prepareTarget($targetPath)
{
$target = [
'path' => $this->normalizePath($targetPath),
'scheme' => $this->determineScheme($targetPath),
];
if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') {
throw new \InvalidArgumentException('Scheme must be "s3" or "file".');
}
return $target;
}
/**
* Creates an array that contains Bucket and Key by parsing the filename.
*
* @param string $path Path to parse.
*
* @return array
*/
private function getS3Args($path)
{
$parts = explode('/', str_replace('s3://', '', $path), 2);
$args = ['Bucket' => $parts[0]];
if (isset($parts[1])) {
$args['Key'] = $parts[1];
}
return $args;
}
/**
* Parses the scheme from a filename.
*
* @param string $path Path to parse.
*
* @return string
*/
private function determineScheme($path)
{
return !strpos($path, '://') ? 'file' : explode('://', $path)[0];
}
/**
* Normalize a path so that it has UNIX-style directory separators and no trailing /
*
* @param string $path
*
* @return string
*/
private function normalizePath($path)
{
return rtrim(str_replace('\\', '/', $path), '/');
}
private function createDownloadPromise()
{
$parts = $this->getS3Args($this->sourceMetadata['path']);
$prefix = "s3://{$parts['Bucket']}/"
. (isset($parts['Key']) ? $parts['Key'] . '/' : '');
$commands = Aws\map($this->getDownloadsIterator(), function (
$object
) use ($prefix) {
// Prepare the sink.
$sink = $this->destination['path'] . '/'
. preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object);
// Create the directory if needed.
$dir = dirname($sink);
if (!is_dir($dir) && !mkdir($dir, 0777, true)) {
throw new \RuntimeException("Could not create dir: {$dir}");
}
// Create the command.
return $this->client->getCommand(
'GetObject',
$this->getS3Args($object) + ['@http' => ['sink' => $sink]]
);
});
// Create a GetObject command pool and return the promise.
return (new Aws\CommandPool($this->client, $commands, [
'concurrency' => $this->concurrency,
'before' => $this->before,
'rejected' => function ($reason, $idx, Promise\PromiseInterface $p) {
$p->reject($reason);
}
]))->promise();
}
private function createUploadPromise()
{
// Map each file into a promise that performs the actual transfer.
$files = \Aws\map($this->getUploadsIterator(), function ($file) {
return (filesize($file) >= $this->mupThreshold)
? $this->uploadMultipart($file)
: $this->upload($file);
});
// Create an EachPromise, that will concurrently handle the upload
// operations' yielded promises from the iterator.
return Promise\each_limit_all($files, $this->concurrency);
}
/** @return Iterator */
private function getUploadsIterator()
{
if (is_string($this->source)) {
return Aws\filter(
Aws\recursive_dir_iterator($this->sourceMetadata['path']),
function ($file) { return !is_dir($file); }
);
}
return $this->source;
}
/** @return Iterator */
private function getDownloadsIterator()
{
if (is_string($this->source)) {
$listArgs = $this->getS3Args($this->sourceMetadata['path']);
if (isset($listArgs['Key'])) {
$listArgs['Prefix'] = $listArgs['Key'] . '/';
unset($listArgs['Key']);
}
$files = $this->client
->getPaginator('ListObjects', $listArgs)
->search('Contents[].Key');
$files = Aws\map($files, function ($key) use ($listArgs) {
return "s3://{$listArgs['Bucket']}/$key";
});
return Aws\filter($files, function ($key) {
return substr($key, -1, 1) !== '/';
});
}
return $this->source;
}
private function upload($filename)
{
$args = $this->s3Args;
$args['SourceFile'] = $filename;
$args['Key'] = $this->createS3Key($filename);
$command = $this->client->getCommand('PutObject', $args);
$this->before and call_user_func($this->before, $command);
return $this->client->executeAsync($command);
}
private function uploadMultipart($filename)
{
$args = $this->s3Args;
$args['Key'] = $this->createS3Key($filename);
return (new MultipartUploader($this->client, $filename, [
'bucket' => $args['Bucket'],
'key' => $args['Key'],
'before_initiate' => $this->before,
'before_upload' => $this->before,
'before_complete' => $this->before,
'concurrency' => $this->concurrency,
]))->promise();
}
private function createS3Key($filename)
{
$relative_file_path = ltrim(
preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename),
'/\\'
);
if (isset($this->s3Args['Key'])) {
return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path;
}
return $relative_file_path;
}
private function addDebugToBefore($debug)
{
$before = $this->before;
$sourcePath = $this->sourceMetadata['path'];
$s3Args = $this->s3Args;
$this->before = static function (
CommandInterface $command
) use ($before, $debug, $sourcePath, $s3Args) {
// Call the composed before function.
$before and $before($command);
// Determine the source and dest values based on operation.
switch ($operation = $command->getName()) {
case 'GetObject':
$source = "s3://{$command['Bucket']}/{$command['Key']}";
$dest = $command['@http']['sink'];
break;
case 'PutObject':
$source = $command['SourceFile'];
$dest = "s3://{$command['Bucket']}/{$command['Key']}";
break;
case 'UploadPart':
$part = $command['PartNumber'];
case 'CreateMultipartUpload':
case 'CompleteMultipartUpload':
$sourceKey = $command['Key'];
if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) {
$sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1);
}
$source = "{$sourcePath}/{$sourceKey}";
$dest = "s3://{$command['Bucket']}/{$command['Key']}";
break;
default:
throw new \UnexpectedValueException(
"Transfer encountered an unexpected operation: {$operation}."
);
}
// Print the debugging message.
$context = sprintf('%s -> %s (%s)', $source, $dest, $operation);
if (isset($part)) {
$context .= " : Part={$part}";
}
fwrite($debug, "Transferring {$context}\n");
};
}
}
|