Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,26 @@ private Constants() {
*/
public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";

/**
* Explicit request for the SDK region resolution.
* Value: {@code}.
*/
public static final String SDK_REGION = "sdk";

/**
* Declare as running in EC2.
* Currently hands off to the SDK for resolution; it may change in future.
* Value: {@code}.
*/
public static final String EC2_REGION = "ec2";

/**
* An empty region is the historic fall-through to the SDK.
* Value: ""
*/
public static final String EMPTY_REGION = "";


/**
* Flag for create performance.
* This can be set in the {code createFile()} builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
Expand Down Expand Up @@ -57,17 +51,14 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.apache.hadoop.fs.s3a.impl.RegionResolution;
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED_DEFAULT;
Expand All @@ -77,7 +68,9 @@
import static org.apache.hadoop.fs.s3a.auth.SignerFactory.createHttpSigner;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AUTH_SCHEME_AWS_SIGV_4;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.s3a.impl.RegionResolution.RegionResolutionMechanism.Ec2Metadata;
import static org.apache.hadoop.fs.s3a.impl.RegionResolution.RegionResolutionMechanism.Sdk;
import static org.apache.hadoop.fs.s3a.impl.RegionResolution.calculateRegion;


/**
Expand All @@ -92,11 +85,6 @@ public class DefaultS3ClientFactory extends Configured

private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";

private static final String S3_SERVICE_NAME = "s3";

private static final Pattern VPC_ENDPOINT_PATTERN =
Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$");

/**
* Subclasses refer to this.
*/
Expand All @@ -106,26 +94,14 @@ public class DefaultS3ClientFactory extends Configured
/**
* A one-off warning of default region chains in use.
*/
private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN =
private static final LogExactlyOnce DEFAULT_REGION_CHAIN =
new LogExactlyOnce(LOG);

/**
* Warning message printed when the SDK Region chain is in use.
* Message printed when the SDK Region chain is in use.
*/
private static final String SDK_REGION_CHAIN_IN_USE =
"S3A filesystem client is using"
+ " the SDK region resolution chain.";


/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);

/**
* Error message when an endpoint is set with FIPS enabled: {@value}.
*/
@VisibleForTesting
public static final String ERROR_ENDPOINT_WITH_FIPS =
"Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true";
"S3A filesystem client is using the SDK region resolution chain.";

/**
* A one-off log stating whether S3 Access Grants are enabled.
Expand Down Expand Up @@ -319,162 +295,63 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
* <li> S3 cross region is enabled by default irrespective of region or endpoint
* is set or not.</li>
* </ol>
*
* @param builder S3 client builder.
* @param parameters parameter object
* @param conf conf configuration object
* @param conf conf configuration object
* @param <BuilderT> S3 client builder type
* @param <ClientT> S3 client type
* @return how the region was resolved.
* @throws IllegalArgumentException if endpoint is set when FIPS is enabled.
*/
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
final String endpointStr = parameters.getEndpoint();
final URI endpoint = getS3Endpoint(endpointStr, conf);

final String configuredRegion = parameters.getRegion();
Region region = null;
String origin = "";

// If the region was configured, set it.
if (configuredRegion != null && !configuredRegion.isEmpty()) {
origin = AWS_REGION;
region = Region.of(configuredRegion);
}
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> RegionResolution.Resolution configureEndpointAndRegion(
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) throws IOException {

// FIPs? Log it, then reject any attempt to set an endpoint
final boolean fipsEnabled = parameters.isFipsEnabled();
if (fipsEnabled) {
LOG.debug("Enabling FIPS mode");
}
// always setting it guarantees the value is non-null,
// which tests expect.
builder.fipsEnabled(fipsEnabled);

if (endpoint != null) {
boolean endpointEndsWithCentral =
endpointStr.endsWith(CENTRAL_ENDPOINT);
checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s",
ERROR_ENDPOINT_WITH_FIPS,
endpoint);

// No region was configured,
// determine the region from the endpoint.
if (region == null) {
region = getS3RegionFromEndpoint(endpointStr,
endpointEndsWithCentral);
if (region != null) {
origin = "endpoint";
}
}
final RegionResolution.Resolution resolution =
calculateRegion(parameters, conf);
LOG.debug("Region Resolution: {}", resolution);

// No need to override endpoint with "s3.amazonaws.com".
// Let the client take care of endpoint resolution. Overriding
// the endpoint with "s3.amazonaws.com" causes 400 Bad Request
// errors for non-existent buckets and objects.
// ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
if (!endpointEndsWithCentral) {
builder.endpointOverride(endpoint);
LOG.debug("Setting endpoint to {}", endpoint);
} else {
origin = "central endpoint with cross region access";
LOG.debug("Enabling cross region access for endpoint {}",
endpointStr);
}
}
// always setting to true or false guarantees the value is non-null,
// which tests expect.
builder.fipsEnabled(resolution.isUseFips());

if (region != null) {
builder.region(region);
} else if (configuredRegion == null) {
// no region is configured, and none could be determined from the endpoint.
// Use US_EAST_2 as default.
region = Region.of(AWS_S3_DEFAULT_REGION);
builder.region(region);
origin = "cross region access fallback";
} else if (configuredRegion.isEmpty()) {
final RegionResolution.RegionResolutionMechanism mechanism = resolution.getMechanism();
if (Sdk == mechanism) {
// handing off all resolution to SDK.
// region configuration was set to empty string.
// allow this if people really want it; it is OK to rely on this
// when deployed in EC2.
WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE);
DEFAULT_REGION_CHAIN.info(SDK_REGION_CHAIN_IN_USE);
LOG.debug(SDK_REGION_CHAIN_IN_USE);
origin = "SDK region chain";
}
boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED,
AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT);
// s3 cross region access
if (isCrossRegionAccessEnabled) {
builder.crossRegionAccessEnabled(true);
} else {

// a region has been determined from configuration,
// or it is falling back to central region.

final Region region = resolution.getRegion();
builder.region(requireNonNull(region));
// s3 cross region access
if (resolution.isCrossRegionAccessEnabled()) {
builder.crossRegionAccessEnabled(true);
}
final URI endpointUri = resolution.getEndpointUri();
if (endpointUri != null && !resolution.isUseCentralEndpoint()) {
LOG.debug("Setting endpoint to {}", endpointUri);
builder.endpointOverride(endpointUri);
}
}
LOG.debug("Setting region to {} from {} with cross region access {}",
region, origin, isCrossRegionAccessEnabled);
return resolution;
}

/**
* Given a endpoint string, create the endpoint URI.
*
* <p>Kept in as subclasses use it.
* @param endpoint possibly null endpoint.
* @param conf config to build the URI from.
* @return an endpoint uri
*/
protected static URI getS3Endpoint(String endpoint, final Configuration conf) {

boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);

String protocol = secureConnections ? "https" : "http";

if (endpoint == null || endpoint.isEmpty()) {
// don't set an endpoint if none is configured, instead let the SDK figure it out.
return null;
}

if (!endpoint.contains("://")) {
endpoint = String.format("%s://%s", protocol, endpoint);
}

try {
return new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Parses the endpoint to get the region.
* If endpoint is the central one, use US_EAST_2.
*
* @param endpoint the configure endpoint.
* @param endpointEndsWithCentral true if the endpoint is configured as central.
* @return the S3 region, null if unable to resolve from endpoint.
*/
@VisibleForTesting
static Region getS3RegionFromEndpoint(final String endpoint,
final boolean endpointEndsWithCentral) {

if (!endpointEndsWithCentral) {
// S3 VPC endpoint parsing
Matcher matcher = VPC_ENDPOINT_PATTERN.matcher(endpoint);
if (matcher.find()) {
LOG.debug("Mapping to VPCE");
LOG.debug("Endpoint {} is vpc endpoint; parsing region as {}", endpoint, matcher.group(1));
return Region.of(matcher.group(1));
}

LOG.debug("Endpoint {} is not the default; parsing", endpoint);
return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
}

// Select default region here to enable cross-region access.
// If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
// Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
// This applies to Spark versions with the changes of SPARK-35878.
// ref:
// https://github.com/apache/spark/blob/v3.5.0/core/
// src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
// If we do not allow cross region access, Spark would not be able to
// access any bucket that is not present in the given region.
// Hence, we should use default region us-east-2 to allow cross-region
// access.
return Region.of(AWS_S3_DEFAULT_REGION);
return RegionResolution.buildEndpointUri(endpoint, secureConnections);
}

private static <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
Expand Down
Loading
Loading