知識庫 / Spring / Spring Boot RSS 訂閱

使用 Spring Boot 查詢 S3 數據中的 Amazon Athena

Cloud,Spring Boot
HongKong
5
11:05 AM · Dec 06 ,2025

1. 概述

我們經常將大量數據存儲在 Amazon S3 中,但分析這些數據可能具有挑戰性。傳統的做法需要我們移動數據或設置複雜的系統,例如數據倉庫。

Amazon Athena 提供了一種更簡單的解決方案,允許我們直接使用 SQL 查詢 S3 中的數據。

在本教程中,我們將探索如何使用 Amazon Athena 分析我們 S3 存儲中的數據,使用 Spring Boot。 我們將逐步瞭解所需的配置、通過編程方式執行 Athena 查詢以及處理結果。

2. 瞭解 Amazon Athena

Amazon Athena 是一種無服務器查詢服務,允許我們在不設置任何基礎設施的情況下,對存儲在 S3 存儲桶中的數據執行臨時查詢。

使用 Athena 的關鍵優勢之一是,我們只需支付執行查詢時消耗的數據量,使其成為臨時和偶爾的數據分析的經濟高效解決方案。

Athena 還使用“讀時定義模式” (schema-on-read) 將我們的 S3 數據在查詢過程中轉換為類似表的數據結構。具體來説,這意味着我們無需修改源數據,也不需要執行提取、轉換和加載 (ETL) 操作即可查詢數據。我們定義的 Athena 表不包含實際數據,而是存儲了用於查詢源數據的轉換指令。

我們 S3 存儲桶中的數據可以來自各種 AWS 服務,例如 CloudTrail 日誌VPC Flow 日誌ALB 訪問日誌,甚至我們存儲在 S3 中的自定義數據,格式包括 JSON、XML、Parquet 等。

3. 項目設置

在開始使用 Amazon Athena 之前,我們需要包含其依賴項並正確配置我們的應用程序。

3.1. 依賴項

讓我們首先將 Amazon Athena 依賴項 添加到我們項目的 pom.xml 文件中:

<dependencies>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>athena</artifactId>
        <version>2.26.0</version>
    </dependency>
</dependencies>

此依賴項為我們提供了 AthenaClient 以及其他相關類,我們將使用它們與 Athena 服務進行交互。

3.2. 定義 Athena 配置屬性

現在,為了與 Athena 服務交互並執行查詢,我們需要配置 AWS 憑據用於身份驗證,Athena 數據庫名稱用於運行 SQL 查詢,以及查詢結果位置,即 Athena 存儲我們查詢結果的 S3 存儲桶。

我們將這些屬性存儲在項目的 application.yaml 文件中,並使用 @ConfigurationProperties 將值映射到 POJO,該 POJO 由服務層在與 Athena 交互時引用。

@Getter
@Setter
@Validated
@ConfigurationProperties(prefix = "com.baeldung.aws")
class AwsConfigurationProperties {

    @NotBlank
    private String accessKey;

    @NotBlank
    private String secretKey;

    @Valid
    private Athena athena = new Athena();

    @Getter
    @Setter
    public class Athena {

        @Nullable
        private String database = "default";

        @NotBlank
        private String s3OutputLocation;

    }

}

s3OutputLocation 字段代表 Athena 存儲我們查詢結果的 S3 桶位置。 這是必要的,因為 Athena 是無服務器的,它本身不存儲任何數據。 相反,它執行查詢並將結果寫入指定的 S3 位置,我們的應用程序可以從中讀取。

我們還添加了驗證註釋,以確保所有必需的屬性已正確配置。 如果任何定義的驗證失敗,則 Spring 的 ApplicationContext 將無法啓動。 這使我們能夠遵循“快速失敗”模式。

以下是我們的 application.yaml 文件的片段,它定義了將自動映射到我們的 AwsConfigurationProperties 類的必需屬性:

com:
  baeldung:
    aws:
      access-key: ${AWS_ACCESS_KEY}
      secret-key: ${AWS_SECRET_KEY}
      athena:
        database: ${AMAZON_ATHENA_DATABASE}
        s3-output-location: ${AMAZON_ATHENA_S3_OUTPUT_LOCATION}

因此,這種配置允許我們外部化 Athena 屬性並輕鬆地在我們的應用程序中訪問它們。

4. 在 Spring Boot 中配置 Athena

現在我們已經定義了屬性,接下來我們將使用它們來配置與 Athena 交互所需的 Bean。

4.1. 創建 AthenaClient Bean

AthenaClient 是與 Athena 服務交互的主要入口點。我們將創建一個 Bean 來進行設置:

@Bean
public AthenaClient athenaClient() {
    String accessKey = awsConfigurationProperties.getAccessKey();
    String secretKey = awsConfigurationProperties.getSecretKey();
    AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(accessKey, secretKey);
    
    return AthenaClient.builder()
      .credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
      .build();
}

在這裏,我們使用配置好的 AWS 憑據創建一個 AthenaClient 實例。該客户端用於啓動查詢執行和從 S3 存儲桶檢索結果。

4.2. 定義 QueryExecutionContext Bean

接下來,我們需要告訴 Athena 在運行 SQL 查詢時使用哪個數據庫:

@Bean
public QueryExecutionContext queryExecutionContext() {
    String database = awsConfigurationProperties.getAthena().getDatabase();
    return QueryExecutionContext.builder()
      .database(database)
      .build();
}

我們創建一個 QueryExecutionContext Bean,並指定用於查詢的數據庫。數據庫名稱從我們的配置屬性中檢索,如果未明確指定,則默認為 default 數據庫。

4.3. 設置 ResultConfiguration Bean

最後,我們需要配置 Athena 應該存儲我們 SQL 查詢結果的位置:

@Bean
public ResultConfiguration resultConfiguration() {
    String outputLocation = awsConfigurationProperties.getAthena().getS3OutputLocation();
    return ResultConfiguration.builder()
      .outputLocation(outputLocation)
      .build();
}

需要注意的是,用於存儲查詢結果的 S3 存儲桶應與包含我們原始數據存儲桶分開

這種隔離可以防止查詢結果被解釋為額外的原始數據,從而避免產生意外的查詢結果。此外,Athena 應該僅具有對原始存儲桶的只讀訪問權限,並且僅在為存儲結果配置的存儲桶上授予寫入權限。

5. 執行 Athena 查詢

配置完成後,讓我們看看如何使用 Athena 執行查詢。我們將創建一個 QueryService 類,自動注入我們創建的所有 Bean,並暴露一個公共 execute() 方法,該方法封裝了查詢執行的邏輯。

5.1. 啓動查詢執行

首先,我們將使用 AthenaClient 實例來啓動查詢執行:

public <T> List<T> execute(String sqlQuery, Class<T> targetClass) {
    String queryExecutionId;
    try {
        queryExecutionId = athenaClient.startQueryExecution(query -> 
            query.queryString(sqlQuery)
              .queryExecutionContext(queryExecutionContext)
              .resultConfiguration(resultConfiguration)
        ).queryExecutionId();
    } catch (InvalidRequestException exception) {
        log.error("Invalid SQL syntax detected in query {}", sqlQuery, exception);
        throw new QueryExecutionFailureException();
    }

    // ...rest of the implementation in the upcoming sections
}

我們提供 SQL 查詢字符串、QueryExecutionContextResultConfiguration,用於啓動查詢執行。 startQueryExecution() 方法返回一個唯一的 queryExecutionId,我們將使用它來跟蹤查詢的狀態並檢索結果

targetClass 參數指定我們將要將查詢結果映射到的 Java 類。

我們還處理了 Athena SDK 拋出的 InvalidRequestException,該異常發生在提供的 SQL 查詢包含語法錯誤時。 我們捕獲此異常,記錄錯誤消息以及無效查詢,並拋出一個自定義的 QueryExecutionFailureException

5.2. 等待查詢完成

在啓動查詢執行後,我們需要等待其完成,然後再嘗試檢索結果

private static final long WAIT_PERIOD = 30;

private void waitForQueryToComplete(String queryExecutionId) {
    QueryExecutionState queryState;

    do {
        GetQueryExecutionResponse response = athenaClient.getQueryExecution(request -> 
            request.queryExecutionId(queryExecutionId));
        queryState = response.queryExecution().status().state();

        switch (queryState) {
            case FAILED:
            case CANCELLED:
                String error = response.queryExecution().status().athenaError().errorMessage();
                log.error("Query execution failed: {}", error);
                throw new QueryExecutionFailureException();
            case QUEUED:
            case RUNNING:
                TimeUnit.MILLISECONDS.sleep(WAIT_PERIOD);
                break;
            case SUCCEEDED:
                queryState = QueryExecutionState.SUCCEEDED;
                return;
        }
    } while (queryState != QueryExecutionState.SUCCEEDED);
}

我們創建一個私有的 waitForQueryToComplete() 方法,並使用 getQueryExecution() 方法定期輪詢查詢的狀態,直到它達到 SUCCEEDED 狀態。

如果查詢失敗或被取消,我們記錄錯誤消息並拋出我們自定義的 QueryExecutionFailureException如果它已排隊或正在運行,則在檢查之前等待一段時間

我們從我們的 execute() 方法中調用 waitForQueryToComplete() 方法,並傳入啓動查詢執行時接收到的 queryExecutionId

5.3. 處理查詢結果

查詢執行成功後,我們可以檢索結果:

GetQueryResultsResponse queryResult = athenaClient.getQueryResults(request -> 
    request.queryExecutionId(queryExecutionId));

getQueryResults() 方法返回一個 GetQueryResultsResponse 對象,其中包含結果集。我們可以處理這些結果並將它們轉換為由 execute() 方法的 targetClass 參數指定的類的實例:

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JsonOrgModule());

private <T> List<T> transformQueryResult(GetQueryResultsResponse queryResultsResponse, Class<T> targetClass) {
    List<T> response = new ArrayList<T>();
    List<Row> rows = queryResultsResponse.resultSet().rows();
    List<String> headers = rows.get(0).data().stream().map(Datum::varCharValue).toList();

    rows.stream()
      .skip(1)
      .forEach(row -> {
          JSONObject element = new JSONObject();
          List<Datum> data = row.data();
           
          for (int i = 0; i < headers.size(); i++) {
              String key = headers.get(i);
              String value = data.get(i).varCharValue();
              element.put(key, value);
          }
          T obj = OBJECT_MAPPER.convertValue(element, targetClass);
          response.put(obj);
      });
    return response;
}

在這裏,我們從結果集的第 1 行中提取標題,然後處理每一行,將其轉換為一個 JSONObject,其中鍵是列名,值是相應的單元格值。然後我們使用 ObjectMapper 將每個 JSONObject 轉換為指定目標類的實例,代表領域模型。這些領域模型對象被添加到返回的列表中。

需要注意的是,我們的 transformQueryResult() 實現是通用的,可以處理所有類型的讀取查詢,無論表或領域模型如何

5.4. 使用 execute() 方法執行 SQL 查詢

藉助我們已完全實現好的 execute() 方法,我們可以輕鬆地向我們的 S3 數據中運行 SQL 查詢,並將結果作為領域模型對象檢索出來:

String query = "SELECT * FROM users WHERE age < 25;";
User user = queryService.execute(query, User.class);

record User(Integer id, String name, Integer age, String city) {};

在這裏,我們定義了一個 SQL 查詢,用於選擇所有年齡小於 25 歲的用户。我們傳遞了這個查詢和 User 類到我們的 execute() 方法。 User 類是一個簡單的記錄,代表我們期望檢索的數據結構。

execute() 方法負責啓動查詢執行,等待其完成,檢索結果,並將它們轉換為 User 對象列表。 這種抽象允許我們專注於查詢和領域模型,而無需擔心與 Athena 的底層交互。

5.5. 使用 Athena 的參數化語句

需要注意的是,在構建使用用户輸入的數據查詢語句時,應謹慎評估 SQL 注入攻擊的風險。Athena 支持 參數化語句,這允許我們將 SQL 查詢語句與參數值分離,從而提供了一種更安全的方式來執行帶有用户輸入的數據查詢語句。雖然我們在此處使用原始 SQL 查詢語句進行演示目的,但 在構建帶有用户提供輸入的數據查詢語句時,強烈建議使用參數化語句

為了使用參數化查詢,我們可以修改我們的 execute() 方法以接受一個可選的參數列表:

public <T> List<T> execute(String sqlQuery, List<String> parameters, Class<T> targetClass) {
    // ... same as above
    
    queryExecutionId = athenaClient.startQueryExecution(query -> 
        query.queryString(sqlQuery)
          .queryExecutionContext(queryExecutionContext)
          .resultConfiguration(resultConfiguration)
          .executionParameters(parameters)
    ).queryExecutionId();
    
    // ... same as above
}

我們已在 execute() 方法中添加了一個新的 parameters 參數,它是一個字符串值的列表,將在參數化查詢中使用。在啓動查詢執行時,我們使用 executionParameters() 方法傳遞這些 parameters

讓我們看看如何使用我們更新的 execute() 方法:

public List<User> getUsersByName(String name) {
    String query = "SELECT * FROM users WHERE name = ?";
    return queryService.execute(query, List.of(name), User.class);
}

此示例定義了一個帶有佔位符 ‘?’ 的 SQL 查詢,用於 name 參數。我們通過包含單個元素的列表將 name 值傳遞給 execute() 方法,以及查詢和目標類。

6. 自動化數據庫和表創建

要使用 Athena 查詢我們的 S3 數據,首先需要定義一個數據庫和一個表,以便將其映射到我們 S3 存儲桶中的數據。雖然我們可以使用 AWS 管理控制枱手動創建這些對象,但將其作為應用程序啓動過程中的自動化操作更方便。

我們將 SQL 腳本放置在新的 athena-init 目錄中,該目錄位於 src/main/resources 目錄內部。

要執行這些 SQL 腳本,我們將創建一個實現 ApplicationRunner 接口的 AthenaInitializer 類:

@Component
@RequiredArgsConstructor
class AthenaInitializer implements ApplicationRunner {

    private final QueryService queryService;
    private final ResourcePatternResolver resourcePatternResolver;

    @Override
    public void run(ApplicationArguments args) {
        Resource[] initScripts = resourcePatternResolver.getResources("classpath:athena-init/*.sql");
        for (Resource script : initScripts) {
            String sqlScript = FileUtils.readFileToString(script.getFile(), StandardCharsets.UTF_8);
            queryService.execute(sqlScript, Void.class);
        }
    }

}

通過使用 Lombok 進行構造器注入,我們注入了 ResourcePatternResolver 和我們先前創建的 QueryService 實例。

我們使用 ResourcePatternResolver 來定位我們位於 athena-init 目錄下的所有 SQL 腳本。然後,我們遍歷這些腳本,使用 Apache Commons IO 讀取它們的全部內容,並使用我們的 QueryService 執行它們。

我們首先將創建一個 create-database.sql 腳本,以創建自定義數據庫:

CREATE DATABASE IF NOT EXISTS baeldung;

如果不存在,我們將創建一個名為 baeldung 的自定義數據庫。 此處使用的數據庫名稱可以在 application.yaml文件中進行配置,正如我們在教程前面所見

同樣,為了在 baeldung數據庫中創建一個名為 users的表,我們將創建一個名為 create-users-table.sql的腳本,內容如下:

CREATE EXTERNAL TABLE IF NOT EXISTS users (
  id INT,
  name STRING,
  age INT,
  city STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://baeldung-athena-tutorial-bucket/';

此腳本創建了一個名為 users 的外部表,其中包含與我們存儲在 S3 中的 JSON 數據字段相對應的列。我們指定 JsonSerDe 作為行格式,並提供我們存儲 JSON 文件的位置。

重要的是,為了使用 Athena 查詢存儲在 S3 中的數據,必須確保每個 JSON 記錄都位於單個文本行中,並且鍵和值之間沒有空格或換行符:

{"id":1,"name":"Homelander","age":41,"city":"New York"}
{"id":2,"name":"Black Noir","age":58,"city":"Los Angeles"}
{"id":3,"name":"Billy Butcher","age":46,"city":"London"}

7. IAM 權限

為了使我們的應用程序正常運行,我們需要配置針對應用程序中配置的 IAM 用户的一些權限。

我們的策略應配置對 Athena 和 S3 的訪問:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowAthenaQueryExecution",
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryExecution",
                "athena:GetQueryResults"
            ],
            "Resource": "arn:aws:athena:region:account-id:workgroup/primary"
        },
        {
            "Sid": "AllowS3ReadAccessToSourceBucket",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::baeldung-athena-tutorial-bucket",
                "arn:aws:s3:::baeldung-athena-tutorial-bucket/*"
            ]
        },
        {
            "Sid": "AllowS3AccessForAthenaQueryResults",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::baeldung-athena-tutorial-results-bucket",
                "arn:aws:s3:::baeldung-athena-tutorial-results-bucket/*"
            ]
        },
        {
            "Sid": "AllowGlueCatalogAccessForAthena",
            "Effect": "Allow",
            "Action": [
                "glue:CreateDatabase",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:GetTable"
            ],
            "Resource": [
                "arn:aws:glue:region:account-id:catalog",
                "arn:aws:glue:region:account-id:database/baeldung",
                "arn:aws:glue:region:account-id:table/baeldung/users"
            ]
        }
    ]
}

IAM策略由四個關鍵語句組成,用於構建我們Spring Boot應用程序所需的權限。 AllowAthenaQueryExecution 語句提供與Athena本身交互所需的權限,包括啓動查詢、檢查其狀態和檢索結果。

然後,AllowS3ReadAccessToSourceBucket 語句允許讀取包含我們要查詢的源數據的S3 bucket的權限。 AllowS3AccessForAthenaQueryResults 語句專注於Athena存儲查詢結果的S3 bucket,授予Athena寫入配置的S3 bucket中的結果以及我們的應用程序檢索這些結果的權限

最後,為了允許與AWS Glue進行交互,Athena將其用作元數據存儲,我們定義了 AllowGlueCatalogAccessForAthena 語句。 它允許我們創建和檢索數據庫和表定義,這些定義對於Athena理解我們S3數據的結構並執行SQL查詢至關重要。

我們的IAM策略符合最小權限原則,僅授予應用程序正確運行所需的必要權限

8. 結論

在本文中,我們探討了如何使用 Amazon Athena 與 Spring Boot 直接查詢來自 S3 存儲桶中的數據,而無需設置任何複雜的底層基礎設施。

我們討論了啓動查詢執行、等待其完成以及對查詢結果進行通用處理。此外,我們還通過在應用程序啓動期間執行 SQL 腳本,實現了數據庫和表的自動化創建。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.