1 1 | /*
|
2 2 | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
3 3 | * SPDX-License-Identifier: Apache-2.0
|
4 4 | */
|
5 5 | package aws.sdk.kotlin.e2etest
|
6 6 |
|
7 7 | import aws.sdk.kotlin.services.s3.*
|
8 - | import aws.sdk.kotlin.services.s3.S3Client
|
9 8 | import aws.sdk.kotlin.services.s3.model.*
|
10 9 | import aws.sdk.kotlin.services.s3.model.BucketLocationConstraint
|
11 10 | import aws.sdk.kotlin.services.s3.model.ExpirationStatus
|
12 11 | import aws.sdk.kotlin.services.s3.model.LifecycleRule
|
13 12 | import aws.sdk.kotlin.services.s3.paginators.listObjectsV2Paginated
|
14 13 | import aws.sdk.kotlin.services.s3.waiters.waitUntilBucketExists
|
15 14 | import aws.sdk.kotlin.services.s3.waiters.waitUntilBucketNotExists
|
16 15 | import aws.sdk.kotlin.services.s3control.*
|
17 16 | import aws.sdk.kotlin.services.s3control.model.*
|
18 17 | import aws.sdk.kotlin.services.sts.StsClient
|
19 18 | import aws.smithy.kotlin.runtime.http.request.HttpRequest
|
20 - | import aws.smithy.kotlin.runtime.text.ensurePrefix
|
19 + | import aws.smithy.kotlin.runtime.time.Instant
|
20 + | import aws.smithy.kotlin.runtime.time.TimestampFormat
|
21 + | import aws.smithy.kotlin.runtime.util.asyncLazy
|
21 22 | import kotlinx.coroutines.*
|
22 23 | import kotlinx.coroutines.flow.*
|
23 - | import kotlinx.coroutines.withTimeout
|
24 24 | import java.io.OutputStreamWriter
|
25 25 | import java.net.URL
|
26 26 | import java.util.*
|
27 27 | import javax.net.ssl.HttpsURLConnection
|
28 28 | import kotlin.time.Duration.Companion.seconds
|
29 29 |
|
30 30 | object S3TestUtils {
|
31 - |
|
32 31 | const val DEFAULT_REGION = "us-west-2"
|
33 32 |
|
34 33 | // The E2E test account only has permission to operate on buckets with the prefix "s3-test-bucket-"
|
35 - | private const val TEST_BUCKET_PREFIX = "s3-test-bucket-"
|
34 + | private const val TEST_BUCKET_PREFIX = "s3-test-bucket"
|
35 + |
|
36 + | private const val S3_EXPRESS_DIRECTORY_BUCKET_SUFFIX = "x-s3"
|
36 37 |
|
37 - | private const val S3_MAX_BUCKET_NAME_LENGTH = 63 // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
|
38 - | private const val S3_EXPRESS_DIRECTORY_BUCKET_SUFFIX = "--x-s3"
|
38 + | val testRunId by lazy {
|
39 + | Instant
|
40 + | .now()
|
41 + | .format(TimestampFormat.ISO_8601_CONDENSED)
|
42 + | .lowercase()
|
43 + | .also { println("Starting test run ID $it") }
|
44 + | }
|
39 45 |
|
40 - | suspend fun getTestBucket(
|
46 + | suspend fun createTestBucket(
|
41 47 | client: S3Client,
|
42 - | region: String? = null,
|
43 - | accountId: String? = null,
|
44 - | ): String = getBucketWithPrefix(client, TEST_BUCKET_PREFIX, region, accountId)
|
48 + | suffix: String,
|
49 + | region: String = client.config.region!!,
|
50 + | ): String = createBucket(client, TEST_BUCKET_PREFIX, suffix, region)
|
45 51 |
|
46 - | suspend fun getBucketWithPrefix(
|
52 + | suspend fun createBucket(
|
47 53 | client: S3Client,
|
48 54 | prefix: String,
|
49 - | region: String? = null,
|
50 - | accountId: String? = null,
|
55 + | suffix: String,
|
56 + | region: String = client.config.region!!,
|
51 57 | ): String = withTimeout(60.seconds) {
|
52 - | val buckets = client.listBuckets()
|
53 - | .buckets
|
54 - | ?.mapNotNull { it.name }
|
55 - |
|
56 - | var testBucket = buckets?.firstOrNull { bucketName ->
|
57 - | bucketName.startsWith(prefix) &&
|
58 - | region?.let {
|
59 - | client.getBucketLocation {
|
60 - | bucket = bucketName
|
61 - | expectedBucketOwner = accountId
|
62 - | }.locationConstraint?.value == region
|
63 - | } ?: true
|
64 - | }
|
65 - |
|
66 - | if (testBucket == null) {
|
67 - | testBucket = prefix + UUID.randomUUID()
|
68 - | println("Creating S3 bucket: $testBucket")
|
58 + | val bucketName = "$prefix-$testRunId-$suffix"
|
59 + | println("Creating S3 bucket: $bucketName")
|
69 60 |
|
70 - | client.createBucket {
|
71 - | bucket = testBucket
|
72 - | createBucketConfiguration {
|
73 - | locationConstraint = BucketLocationConstraint.fromValue(region ?: client.config.region!!)
|
74 - | }
|
61 + | client.createBucket {
|
62 + | bucket = bucketName
|
63 + | createBucketConfiguration {
|
64 + | locationConstraint = BucketLocationConstraint.fromValue(region)
|
75 65 | }
|
76 - |
|
77 - | client.waitUntilBucketExists { bucket = testBucket }
|
78 - | } else {
|
79 - | println("Using existing S3 bucket: $testBucket")
|
80 66 | }
|
81 67 |
|
68 + | client.waitUntilBucketExists { bucket = bucketName }
|
69 + |
|
82 70 | client.putBucketLifecycleConfiguration {
|
83 - | bucket = testBucket
|
71 + | bucket = bucketName
|
84 72 | lifecycleConfiguration {
|
85 73 | rules = listOf(
|
86 74 | LifecycleRule {
|
87 75 | expiration { days = 1 }
|
88 76 | filter { this.prefix = "" }
|
89 77 | status = ExpirationStatus.Enabled
|
90 78 | id = "delete-old"
|
91 79 | },
|
92 80 | )
|
93 81 | }
|
94 82 | }
|
95 83 |
|
96 - | testBucket
|
84 + | bucketName
|
97 85 | }
|
98 86 |
|
99 - | suspend fun getTestDirectoryBucket(client: S3Client, suffix: String) = withTimeout(60.seconds) {
|
100 - | var testBucket = client.listBuckets()
|
101 - | .buckets
|
102 - | ?.mapNotNull { it.name }
|
103 - | ?.firstOrNull { it.startsWith(TEST_BUCKET_PREFIX) && it.endsWith(S3_EXPRESS_DIRECTORY_BUCKET_SUFFIX) }
|
104 - |
|
105 - | if (testBucket == null) {
|
106 - | // Adding S3 Express suffix surpasses the bucket name length limit... trim the UUID if needed
|
107 - | testBucket = TEST_BUCKET_PREFIX +
|
108 - | UUID.randomUUID().toString().subSequence(0 until (S3_MAX_BUCKET_NAME_LENGTH - TEST_BUCKET_PREFIX.length - suffix.ensurePrefix("--").length)) +
|
109 - | suffix.ensurePrefix("--")
|
110 - |
|
111 - | println("Creating S3 Express directory bucket: $testBucket")
|
112 - |
|
113 - | val availabilityZone = testBucket // s3-test-bucket-UUID--use1-az4--x-s3
|
114 - | .removeSuffix(S3_EXPRESS_DIRECTORY_BUCKET_SUFFIX) // s3-test-bucket-UUID--use1-az4
|
115 - | .substringAfterLast("--") // use1-az4
|
116 - |
|
117 - | client.createBucket {
|
118 - | bucket = testBucket
|
119 - | createBucketConfiguration {
|
120 - | location = LocationInfo {
|
121 - | type = LocationType.AvailabilityZone
|
122 - | name = availabilityZone
|
123 - | }
|
124 - | bucket = BucketInfo {
|
125 - | type = BucketType.Directory
|
126 - | dataRedundancy = DataRedundancy.SingleAvailabilityZone
|
127 - | }
|
87 + | suspend fun createTestDirectoryBucket(
|
88 + | client: S3Client,
|
89 + | availabilityZone: String,
|
90 + | suffix: String,
|
91 + | ) = createDirectoryBucket(client, TEST_BUCKET_PREFIX, availabilityZone, suffix)
|
92 + |
|
93 + | suspend fun createDirectoryBucket(
|
94 + | client: S3Client,
|
95 + | prefix: String,
|
96 + | availabilityZone: String,
|
97 + | suffix: String,
|
98 + | ) = withTimeout(60.seconds) {
|
99 + | val bucketName = "$prefix-$testRunId-$suffix--$availabilityZone--$S3_EXPRESS_DIRECTORY_BUCKET_SUFFIX"
|
100 + | println("Creating S3 Express directory bucket: $bucketName")
|
101 + |
|
102 + | client.createBucket {
|
103 + | bucket = bucketName
|
104 + | createBucketConfiguration {
|
105 + | location = LocationInfo {
|
106 + | type = LocationType.AvailabilityZone
|
107 + | name = availabilityZone
|
108 + | }
|
109 + | bucket = BucketInfo {
|
110 + | type = BucketType.Directory
|
111 + | dataRedundancy = DataRedundancy.SingleAvailabilityZone
|
128 112 | }
|
129 113 | }
|
130 114 | }
|
131 - | testBucket
|
115 + |
|
116 + | bucketName
|
132 117 | }
|
133 118 |
|
134 - | suspend fun deleteBucketAndAllContents(client: S3Client, bucketName: String): Unit = coroutineScope {
|
119 + | suspend fun deleteBucket(client: S3Client, bucketName: String): Unit = coroutineScope {
|
135 120 | deleteBucketContents(client, bucketName)
|
121 + | deleteMultiPartUploads(client, bucketName)
|
136 122 |
|
137 123 | try {
|
138 124 | client.deleteBucket { bucket = bucketName }
|
139 125 |
|
140 126 | client.waitUntilBucketNotExists {
|
141 127 | bucket = bucketName
|
142 128 | }
|
143 129 | } catch (ex: Exception) {
|
144 130 | println("Failed to delete bucket: $bucketName")
|
145 131 | throw ex
|
146 132 | }
|
147 133 | }
|
148 134 |
|
149 - | suspend fun deleteBucketContents(client: S3Client, bucketName: String): Unit = coroutineScope {
|
135 + | private suspend fun deleteBucketContents(client: S3Client, bucketName: String): Unit = coroutineScope {
|
150 136 | val scope = this
|
151 137 |
|
152 138 | try {
|
153 139 | println("Deleting S3 buckets contents: $bucketName")
|
154 140 | val dispatcher = Dispatchers.Default.limitedParallelism(64)
|
155 141 | val jobs = mutableListOf<Job>()
|
156 142 |
|
157 143 | client.listObjectsV2Paginated { bucket = bucketName }
|
158 144 | .mapNotNull { it.contents }
|
159 145 | .collect { contents ->
|
160 146 | val job = scope.launch(dispatcher) {
|
161 147 | client.deleteObjects {
|
162 148 | bucket = bucketName
|
163 149 | delete {
|
164 150 | objects = contents.mapNotNull(Object::key).map { ObjectIdentifier { key = it } }
|
165 151 | }
|
166 152 | }
|
167 153 | }
|
168 154 | jobs.add(job)
|
169 155 | }
|
170 156 |
|
171 157 | jobs.joinAll()
|
172 158 | } catch (ex: Exception) {
|
173 159 | println("Failed to delete buckets contents: $bucketName")
|
174 160 | throw ex
|
175 161 | }
|
176 162 | }
|
177 163 |
|
164 + | private suspend fun deleteMultiPartUploads(client: S3Client, bucketName: String) {
|
165 + | client.listMultipartUploads {
|
166 + | bucket = bucketName
|
167 + | }.uploads?.forEach { upload ->
|
168 + | client.abortMultipartUpload {
|
169 + | bucket = bucketName
|
170 + | key = upload.key
|
171 + | uploadId = upload.uploadId
|
172 + | }
|
173 + | }
|
174 + | }
|
175 + |
|
178 176 | fun responseCodeFromPut(presignedRequest: HttpRequest, content: String): Int {
|
179 177 | val url = URL(presignedRequest.url.toString())
|
180 178 | val connection: HttpsURLConnection = url.openConnection() as HttpsURLConnection
|
181 179 | presignedRequest.headers.forEach { key, values ->
|
182 180 | connection.setRequestProperty(key, values.first())
|
183 181 | }
|
184 182 |
|
185 183 | connection.doOutput = true
|
186 184 | connection.requestMethod = "PUT"
|
187 185 | val out = OutputStreamWriter(connection.outputStream)
|
188 186 | out.write(content)
|
189 187 | out.close()
|
190 188 |
|
191 189 | if (connection.errorStream != null) {
|
192 190 | error("request failed: ${connection.errorStream?.bufferedReader()?.readText()}")
|
193 191 | }
|
194 192 |
|
195 193 | return connection.responseCode
|
196 194 | }
|
197 195 |
|
198 - | internal suspend fun getAccountId(): String {
|
196 + | private val accountId = asyncLazy {
|
199 197 | println("Getting account ID")
|
200 198 |
|
201 - | val accountId = StsClient {
|
202 - | region = "us-west-2"
|
203 - | }.use {
|
204 - | it.getCallerIdentity().account
|
199 + | val accountId = StsClient { region = DEFAULT_REGION }.use { sts ->
|
200 + | sts.getCallerIdentity().account
|
205 201 | }
|
206 202 |
|
207 - | return checkNotNull(accountId) { "Unable to get AWS account ID" }
|
203 + | checkNotNull(accountId) { "Unable to get AWS account ID" }
|
208 204 | }
|
209 205 |
|
210 - | internal suspend fun deleteMultiPartUploads(client: S3Client, bucketName: String) {
|
211 - | client.listMultipartUploads {
|
212 - | bucket = bucketName
|
213 - | }.uploads?.forEach { upload ->
|
214 - | client.abortMultipartUpload {
|
215 - | bucket = bucketName
|
216 - | key = upload.key
|
217 - | uploadId = upload.uploadId
|
218 - | }
|
219 - | }
|
206 + | internal suspend fun getAccountId(): String = accountId.get()
|
207 + |
|
208 + | fun createClient(builder: S3Client.Config.Builder.() -> Unit = { }): S3Client = S3Client {
|
209 + | region = DEFAULT_REGION
|
210 + |
|
211 + | // Apply builder block after setting default region in case of overrides
|
212 + | builder()
|
220 213 | }
|
221 214 | }
|