6 6 |
|
7 7 | import aws.sdk.kotlin.services.s3.*
|
8 8 | import aws.sdk.kotlin.services.s3.S3Client
|
9 9 | import aws.sdk.kotlin.services.s3.model.*
|
10 10 | import aws.sdk.kotlin.services.s3.model.BucketLocationConstraint
|
11 11 | import aws.sdk.kotlin.services.s3.model.ExpirationStatus
|
12 12 | import aws.sdk.kotlin.services.s3.model.LifecycleRule
|
13 13 | import aws.sdk.kotlin.services.s3.paginators.listObjectsV2Paginated
|
14 14 | import aws.sdk.kotlin.services.s3.waiters.waitUntilBucketExists
|
15 15 | import aws.sdk.kotlin.services.s3.waiters.waitUntilBucketNotExists
|
16 - | import aws.sdk.kotlin.services.s3control.*
|
17 - | import aws.sdk.kotlin.services.s3control.model.*
|
18 16 | import aws.sdk.kotlin.services.sts.StsClient
|
17 + | import aws.smithy.kotlin.runtime.http.HttpBody
|
18 + | import aws.smithy.kotlin.runtime.http.HttpMethod
|
19 + | import aws.smithy.kotlin.runtime.http.SdkHttpClient
|
20 + | import aws.smithy.kotlin.runtime.http.complete
|
21 + | import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
|
19 22 | import aws.smithy.kotlin.runtime.http.request.HttpRequest
|
23 + | import aws.smithy.kotlin.runtime.io.use
|
20 24 | import aws.smithy.kotlin.runtime.text.ensurePrefix
|
25 + | import aws.smithy.kotlin.runtime.util.Uuid
|
21 26 | import kotlinx.coroutines.*
|
22 27 | import kotlinx.coroutines.flow.*
|
28 + | import kotlinx.coroutines.sync.Mutex
|
29 + | import kotlinx.coroutines.sync.withLock
|
23 30 | import kotlinx.coroutines.withTimeout
|
24 - | import java.io.OutputStreamWriter
|
25 - | import java.net.URL
|
26 - | import java.util.*
|
27 - | import javax.net.ssl.HttpsURLConnection
|
28 31 | import kotlin.time.Duration.Companion.seconds
|
29 32 |
|
30 33 | object S3TestUtils {
|
31 34 |
|
32 35 | const val DEFAULT_REGION = "us-west-2"
|
33 36 |
|
34 37 | // The E2E test account only has permission to operate on buckets with the prefix "s3-test-bucket-"
|
35 38 | private const val TEST_BUCKET_PREFIX = "s3-test-bucket-"
|
36 39 |
|
40 + | private var sharedBucket: String? = null
|
41 + | private val bucketMutex = Mutex()
|
42 + |
|
43 + | private val sharedDirectoryBuckets: MutableMap<String, String> = mutableMapOf()
|
44 + | private val directoryBucketMutex = Mutex()
|
45 + |
|
46 + | suspend fun getOrCreateSharedBucket(client: S3Client, region: String = DEFAULT_REGION): String = sharedBucket ?: bucketMutex.withLock {
|
47 + | sharedBucket ?: getTestBucket(client, region).also { sharedBucket = it }
|
48 + | }
|
49 + |
|
50 + | suspend fun cleanupSharedBucket(client: S3Client) {
|
51 + | sharedBucket?.let { bucket ->
|
52 + | deleteBucketContents(client, bucket)
|
53 + | }
|
54 + | }
|
55 + |
|
56 + | suspend fun getOrCreateSharedDirectoryBuckets(client: S3Client, suffix: String): List<String> = directoryBucketMutex.withLock {
|
57 + | (0 until 3).map { index ->
|
58 + | val key = "$suffix:$index"
|
59 + | sharedDirectoryBuckets[key] ?: getTestDirectoryBucket(client, suffix).also {
|
60 + | sharedDirectoryBuckets[key] = it
|
61 + | }
|
62 + | }
|
63 + | }
|
64 + |
|
65 + | suspend fun cleanupSharedDirectoryBuckets(client: S3Client, suffix: String) {
|
66 + | (0 until 3).forEach { index ->
|
67 + | val key = "$suffix:$index"
|
68 + | sharedDirectoryBuckets[key]?.let { bucket ->
|
69 + | deleteBucketContents(client, bucket)
|
70 + | }
|
71 + | }
|
72 + | }
|
73 + |
|
37 74 | private const val S3_MAX_BUCKET_NAME_LENGTH = 63 // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
|
38 75 | private const val S3_EXPRESS_DIRECTORY_BUCKET_SUFFIX = "--x-s3"
|
39 76 |
|
40 77 | suspend fun getTestBucket(
|
41 78 | client: S3Client,
|
42 79 | region: String? = null,
|
43 80 | accountId: String? = null,
|
44 81 | ): String = getBucketWithPrefix(client, TEST_BUCKET_PREFIX, region, accountId)
|
45 82 |
|
46 83 | suspend fun getBucketWithPrefix(
|
47 84 | client: S3Client,
|
48 85 | prefix: String,
|
49 86 | region: String? = null,
|
50 87 | accountId: String? = null,
|
51 88 | ): String = withTimeout(60.seconds) {
|
52 89 | val buckets = client.listBuckets()
|
53 90 | .buckets
|
54 91 | ?.mapNotNull { it.name }
|
55 92 |
|
56 93 | var testBucket = buckets?.firstOrNull { bucketName ->
|
57 94 | bucketName.startsWith(prefix) &&
|
58 95 | region?.let {
|
59 96 | client.getBucketLocation {
|
60 97 | bucket = bucketName
|
61 98 | expectedBucketOwner = accountId
|
62 99 | }.locationConstraint?.value == region
|
63 100 | } ?: true
|
64 101 | }
|
65 102 |
|
66 103 | if (testBucket == null) {
|
67 - | testBucket = prefix + UUID.randomUUID()
|
104 + | testBucket = prefix + Uuid.random().toString()
|
68 105 | println("Creating S3 bucket: $testBucket")
|
69 106 |
|
70 107 | client.createBucket {
|
71 108 | bucket = testBucket
|
72 109 | createBucketConfiguration {
|
73 110 | locationConstraint = BucketLocationConstraint.fromValue(region ?: client.config.region!!)
|
74 111 | }
|
75 112 | }
|
76 113 |
|
77 114 | client.waitUntilBucketExists { bucket = testBucket }
|
78 115 | } else {
|
79 116 | println("Using existing S3 bucket: $testBucket")
|
80 117 | }
|
81 118 |
|
82 119 | client.putBucketLifecycleConfiguration {
|
83 120 | bucket = testBucket
|
84 121 | lifecycleConfiguration {
|
85 122 | rules = listOf(
|
86 123 | LifecycleRule {
|
87 124 | expiration { days = 1 }
|
88 125 | filter { this.prefix = "" }
|
89 126 | status = ExpirationStatus.Enabled
|
90 127 | id = "delete-old"
|
91 128 | },
|
92 129 | )
|
93 130 | }
|
94 131 | }
|
95 132 |
|
96 133 | testBucket
|
97 134 | }
|
98 135 |
|
99 136 | suspend fun getTestDirectoryBucket(client: S3Client, suffix: String) = withTimeout(60.seconds) {
|
100 137 | var testBucket = client.listBuckets()
|
101 138 | .buckets
|
102 139 | ?.mapNotNull { it.name }
|
103 140 | ?.firstOrNull { it.startsWith(TEST_BUCKET_PREFIX) && it.endsWith(S3_EXPRESS_DIRECTORY_BUCKET_SUFFIX) }
|
104 141 |
|
105 142 | if (testBucket == null) {
|
106 143 | // Adding S3 Express suffix surpasses the bucket name length limit... trim the UUID if needed
|
107 144 | testBucket = TEST_BUCKET_PREFIX +
|
108 - | UUID.randomUUID().toString().subSequence(0 until (S3_MAX_BUCKET_NAME_LENGTH - TEST_BUCKET_PREFIX.length - suffix.ensurePrefix("--").length)) +
|
145 + | Uuid.random().toString().subSequence(0 until (S3_MAX_BUCKET_NAME_LENGTH - TEST_BUCKET_PREFIX.length - suffix.ensurePrefix("--").length)) +
|
109 146 | suffix.ensurePrefix("--")
|
110 147 |
|
111 148 | println("Creating S3 Express directory bucket: $testBucket")
|
112 149 |
|
113 150 | val availabilityZone = testBucket // s3-test-bucket-UUID--use1-az4--x-s3
|
114 151 | .removeSuffix(S3_EXPRESS_DIRECTORY_BUCKET_SUFFIX) // s3-test-bucket-UUID--use1-az4
|
115 152 | .substringAfterLast("--") // use1-az4
|
116 153 |
|
117 154 | client.createBucket {
|
118 155 | bucket = testBucket
|
119 156 | createBucketConfiguration {
|
120 157 | location = LocationInfo {
|
121 158 | type = LocationType.AvailabilityZone
|
122 159 | name = availabilityZone
|
123 160 | }
|
124 161 | bucket = BucketInfo {
|
125 162 | type = BucketType.Directory
|
126 163 | dataRedundancy = DataRedundancy.SingleAvailabilityZone
|
127 164 | }
|
128 165 | }
|
129 166 | }
|
167 + | } else {
|
168 + | println("Using existing S3 Express directory bucket: $testBucket")
|
130 169 | }
|
170 + |
|
171 + | client.putBucketLifecycleConfiguration {
|
172 + | bucket = testBucket
|
173 + | lifecycleConfiguration {
|
174 + | rules = listOf(
|
175 + | LifecycleRule {
|
176 + | expiration { days = 1 }
|
177 + | filter { this.prefix = "" }
|
178 + | status = ExpirationStatus.Enabled
|
179 + | id = "delete-old"
|
180 + | },
|
181 + | )
|
182 + | }
|
183 + | }
|
184 + |
|
131 185 | testBucket
|
132 186 | }
|
133 187 |
|
134 188 | suspend fun deleteBucketAndAllContents(client: S3Client, bucketName: String): Unit = coroutineScope {
|
135 189 | deleteBucketContents(client, bucketName)
|
136 190 |
|
137 191 | try {
|
192 + | println("Deleting S3 bucket: $bucketName")
|
138 193 | client.deleteBucket { bucket = bucketName }
|
139 194 |
|
140 195 | client.waitUntilBucketNotExists {
|
141 196 | bucket = bucketName
|
142 197 | }
|
143 198 | } catch (ex: Exception) {
|
144 199 | println("Failed to delete bucket: $bucketName")
|
145 200 | throw ex
|
146 201 | }
|
147 202 | }
|
148 203 |
|
149 - | suspend fun deleteBucketContents(client: S3Client, bucketName: String): Unit = coroutineScope {
|
204 + | suspend fun deleteBucketContents(client: S3Client, bucketName: String) = coroutineScope {
|
150 205 | val scope = this
|
151 206 |
|
152 207 | try {
|
153 - | println("Deleting S3 buckets contents: $bucketName")
|
208 + | println("Deleting contents of S3 bucket: $bucketName")
|
154 209 | val dispatcher = Dispatchers.Default.limitedParallelism(64)
|
155 210 | val jobs = mutableListOf<Job>()
|
156 211 |
|
157 212 | client.listObjectsV2Paginated { bucket = bucketName }
|
158 213 | .mapNotNull { it.contents }
|
159 214 | .collect { contents ->
|
160 215 | val job = scope.launch(dispatcher) {
|
161 216 | client.deleteObjects {
|
162 217 | bucket = bucketName
|
163 218 | delete {
|
164 219 | objects = contents.mapNotNull(Object::key).map { ObjectIdentifier { key = it } }
|
165 220 | }
|
166 221 | }
|
167 222 | }
|
168 223 | jobs.add(job)
|
169 224 | }
|
170 225 |
|
171 226 | jobs.joinAll()
|
172 227 | } catch (ex: Exception) {
|
173 228 | println("Failed to delete buckets contents: $bucketName")
|
174 229 | throw ex
|
175 230 | }
|
176 231 | }
|
177 232 |
|
178 - | fun responseCodeFromPut(presignedRequest: HttpRequest, content: String): Int {
|
179 - | val url = URL(presignedRequest.url.toString())
|
180 - | val connection: HttpsURLConnection = url.openConnection() as HttpsURLConnection
|
181 - | presignedRequest.headers.forEach { key, values ->
|
182 - | connection.setRequestProperty(key, values.first())
|
183 - | }
|
184 - |
|
185 - | connection.doOutput = true
|
186 - | connection.requestMethod = "PUT"
|
187 - | val out = OutputStreamWriter(connection.outputStream)
|
188 - | out.write(content)
|
189 - | out.close()
|
190 - |
|
191 - | if (connection.errorStream != null) {
|
192 - | error("request failed: ${connection.errorStream?.bufferedReader()?.readText()}")
|
193 - | }
|
233 + | suspend fun responseCodeFromPut(engine: HttpClientEngine, presignedRequest: HttpRequest, content: String): Int {
|
234 + | val request = HttpRequest(
|
235 + | method = HttpMethod.PUT,
|
236 + | url = presignedRequest.url,
|
237 + | headers = presignedRequest.headers,
|
238 + | body = HttpBody.fromBytes(content.encodeToByteArray()),
|
239 + | )
|
194 240 |
|
195 - | return connection.responseCode
|
241 + | val call = SdkHttpClient(engine).call(request)
|
242 + | val statusCode = call.response.status.value
|
243 + | call.complete()
|
244 + | return statusCode
|
196 245 | }
|
197 246 |
|
198 247 | internal suspend fun getAccountId(): String {
|
199 - | println("Getting account ID")
|
200 - |
|
201 248 | val accountId = StsClient {
|
202 249 | region = "us-west-2"
|
203 250 | }.use {
|
204 251 | it.getCallerIdentity().account
|
205 252 | }
|
206 253 |
|
207 254 | return checkNotNull(accountId) { "Unable to get AWS account ID" }
|
208 255 | }
|
209 - |
|
210 - | internal suspend fun deleteMultiPartUploads(client: S3Client, bucketName: String) {
|
211 - | client.listMultipartUploads {
|
212 - | bucket = bucketName
|
213 - | }.uploads?.forEach { upload ->
|
214 - | client.abortMultipartUpload {
|
215 - | bucket = bucketName
|
216 - | key = upload.key
|
217 - | uploadId = upload.uploadId
|
218 - | }
|
219 - | }
|
220 - | }
|
221 256 | }
|