知識庫 / Spring / Spring Boot RSS 訂閱

使用 Spring Boot 在 Elasticsearch 中導入 CSV 文件

Spring Boot
HongKong
6
10:52 AM · Dec 06 ,2025

1. 引言

在本教程中,我們將學習如何使用 Spring Boot 將 CSV 文件中的數據導入 Elasticsearch。從 CSV 文件中導入數據是常見用例,當我們需要從遺留系統或外部來源遷移數據,或準備測試數據集時,經常會用到。

2. 使用 Docker 設置 Elasticsearch

為了使用 Elasticsearch,我們將使用 Docker 在本地設置它。請按照以下步驟啓動 Elasticsearch 容器:

docker pull docker.elastic.co/elasticsearch/elasticsearch:8.17.0

接下來,我們使用以下命令運行容器:

docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.17.0

讓我們創建一個示例 Excel 文件“products.csv”幷包含以下數據:

id,name,category,price,stock
1,Microwave,Appliances,705.77,136
2,Vacuum Cleaner,Appliances,1397.23,92
...

3. 使用手動循環處理 CSV 數據

第一種方法涉及使用手動 for 循環讀取和索引 CSV 文件中的記錄,並將其導入到 Elasticsearch 中。 要實現此方法,我們將使用 Apache Commons CSV 庫解析 CSV 文件,以及 Elasticsearch Rest High-Level Client 與 Elasticsearch 搜索引擎集成。

讓我們首先將所需的依賴項添加到我們的 <em ref="pom.xml">pom.xml</em> 文件中:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-csv</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.17.11</version>
</dependency>
<p>添加依賴後,我們需要設置 Elasticsearch 的配置。我們創建一個配置類來設置 <em >RestHighLevelClient</em>:</p>
@Configuration
public class ElasticsearchConfig {
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return RestClients.create(ClientConfiguration.builder()
          .connectedTo("localhost:9200")
          .build()).rest();
    }
}

接下來,我們創建一個 Product 類來表示 CSV 數據:

@Document(indexName = "products")
public class Product {
    @Id
    private String id;
    private String name;
    private String category;
    private double price;
    private int stock;

    // Getters and setters
}

稍後,我們將為我們的 Spring Boot 應用程序創建一個服務,用於處理 CSV 導入過程。 在服務中,我們使用 for 循環來迭代 CSV 文件中的每一條記錄

@Autowired
private RestHighLevelClient restHighLevelClient;

public void importCSV(File file) {
    try (Reader reader = new FileReader(file)) {
        Iterable<CSVRecord> records = CSVFormat.DEFAULT
          .withHeader("id", "name", "category", "price", "stock")
          .withFirstRecordAsHeader()
          .parse(reader);

        for (CSVRecord record : records) {
            IndexRequest request = new IndexRequest("products")
              .id(record.get("id"))
              .source(Map.of(
                "name", record.get("name"),
                "category", record.get("category"),
                "price", Double.parseDouble(record.get("price")),
                "stock", Integer.parseInt(record.get("stock"))
              ));

            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        }
    } catch (Exception e) {
        // handle exception
    }
}

對於每個記錄,我們構造一個 IndexRequest 對象,以準備數據進行 Elasticsearch 索引。然後,使用 RestHighLevelClient 索引數據,它是與 Elasticsearch 交互的主要客户端庫。

以下是如何從 CSV 文件導入數據到 Elasticsearch 索引:

File csvFile = Paths.get("src", "test", "resources", "products.csv").toFile();
importCSV(csvFile);

接下來,我們查詢第一個索引並驗證其內容與預期值是否匹配:

IndexRequest firstRequest = captor.getAllValues().get(0);
assertEquals(Map.of(
  "name", "Microwave",
  "category", "Appliances",
  "price", 705.77,
  "stock", 136
), firstRequest.sourceAsMap());

這種方法簡單直接,使我們對整個過程擁有完全的控制權。 然而,它更適合處理較小的數據集,因為對於大型文件來説,它可能效率低下且耗時。

4. 使用 Spring Batch 進行可擴展的數據導入

Spring Batch 是 Java 中用於批處理的強大框架。它非常適合通過分塊處理數據來處理大規模的數據導入。

要使用 Spring Batch,我們需要將 Spring Batch 依賴 添加到我們的 pom.xml 文件中:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.4.1</version>
</dependency>

4.1. 定義 Spring 配置文件

接下來,讓我們創建一個配置類來定義批處理作業。在這一配置中,我們使用 <em @EnableBatchProcessing 註解來激活 Spring Batch 功能,從而允許我們創建和管理批處理作業。

我們設置了一個 FlatFileItemReader 來讀取 CSV 文件,以及一個 ItemWriter 將數據寫入 Elasticsearch。 此外,我們還創建並配置了 RestHighLevelClient Bean 在 Spring 配置文件中:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    // ...
    @Autowired
    private RestHighLevelClient restHighLevelClient
}

4.2. 定義讀取器

為了從 CSV 文件讀取數據,我們創建一個名為 reader() 的方法,並定義一個 FlatFileItemReader。 我們將使用 FlatFileItemReaderBuilder 來配置讀取器,並設置各種參數。

@Bean
public FlatFileItemReader<Product> reader() {
    return new FlatFileItemReaderBuilder<Product>()
      .name("productReader")
      .resource(new FileSystemResource("products.csv"))
      .delimited()
      .names("id", "name", "category", "price", "stock")
      .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
          setTargetType(Product.class);
      }})
      .build();
}

我們使用 name() 方法為讀者分配一個名稱,這有助於在批處理作業中識別它。 此外,resource() 方法通過使用 FileSystemResource 指定 CSV 文件的位置,“products.csv”。 文件應為逗號分隔格式,這通過 delimited() 方法進行指定。

names() 方法列出 CSV 文件中的列標題並將其映射到 Product 類中的字段。 此外, fieldSetMapper() 方法使用 BeanWrapperFieldSetMapper 將 CSV 文件中的每一行映射到 Product 對象。

4.3. 定義 Writer

接下來,我們創建一個 writer() 方法,用於將處理後的數據寫入 Elasticsearch。該方法定義了一個 ItemWriter,它接收一個包含 Product 對象的列表。它使用 RestHighLevelClient 與 Elasticsearch 交互:

@Bean
public ItemWriter<Product> writer(RestHighLevelClient restHighLevelClient) {
    return products -> {
        for (Product product : products) {
            IndexRequest request = new IndexRequest("products")
              .id(product.getId())
              .source(Map.of(
                "name", product.getName(),
                "category", product.getCategory(),
                "price", product.getPrice(),
                "stock", product.getStock()
              ));
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        }
    };
}

對於列表中每個產品,我們創建一條 IndexRequest,以指定 Elasticsearch 索引和文檔結構。 id() 方法使用 Product 對象中的 ID 為每個文檔分配一個唯一的 ID。

source() 方法將 Product 對象的字段(如 namecategorypricestock)映射為 Elasticsearch 可以存儲的鍵值格式。

配置請求後,我們使用 client.index() 方法將 Product 記錄發送到 Elasticsearch,確保 product 能夠被搜索和檢索。

4.4. 定義 Spring Batch 任務

最後,讓我們創建 importJob() 方法,並使用 Spring Batch 的 <em >JobBuilder</em><em >StepBuilder</em> 來配置任務及其步驟:

@Bean
public Job importJob(JobRepository jobRepository, PlatformTransactionManager transactionManager, 
  RestHighLevelClient restHighLevelClient) {
    return new JobBuilder("importJob", jobRepository)
      .start(new StepBuilder("step1", jobRepository)
        .<Product, Product>chunk(10, transactionManager)
        .reader(reader())
        .writer(writer(restHighLevelClient))
        .build())
      .build();
}

在此示例中,我們使用 JobBuilder 來配置工作項。它接受“importJob”作為工作項名稱和 JobRepository 作為參數。 我們還配置了一個名為“step1”的步驟,並指定工作項將同時處理 10 條記錄。 transactionManager 確保在分塊處理過程中數據的一致性。

reader()writer() 方法已集成到步驟中,用於從 CSV 到 Elasticsearch 的數據流處理。 接下來,我們使用 start() 方法將工作項與步驟連接起來。 這種連接確保步驟作為工作項的一部分執行。 完成此配置後,可以使用 Spring 的 JobLauncher 運行工作項。

4.5. 運行批處理作業

讓我們看一下使用 <em >JobLauncher</em> 運行 Spring Batch 作業的代碼。我們將創建一個 <em >CommandLineRunner</em> Bean,在應用程序啓動時執行該作業:

@Configuration
public class JobRunnerConfig {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importJob;

    @Bean
    public CommandLineRunner runJob() {
        return args -> {
            try {
                JobExecution execution = jobLauncher.run(importJob, new JobParameters());
            } catch (Exception e) {
                // handle exception
            }
        };
    }
}

在作業成功運行後,我們可以通過使用 curl 發起請求來測試結果:

curl -X GET "http://localhost:9200/products/_search" \
  -H "Content-Type: application/json" \
  -d '{
        "query": {
          "match_all": {}
        }
      }'

讓我們來看預期的結果:

{
  ...
  "hits": {
    "total": {
      "value": 25,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "products",
        "_type": "_doc",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "id": "1",
          "name": "Microwave",
          "category": "Appliances",
          "price": 705.77,
          "stock": 136
        }
      },
      {
        "_index": "products",
        "_type": "_doc",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "id": "1",
          "name": "Vacuum Cleaner",
          "category": "Appliances",
          "price": 1397.23,
          "stock": 92
        }
      }
      ...
    ]
  }
}

此方法比之前的那些方法設置更為複雜,但它提供了導入數據的可擴展性和靈活性。

5. 使用 Logstash 導入 CSV 數據

Logstash 是 Elastic Stack 的一部分,旨在進行數據處理和導入。

我們可以使用 Docker 快速設置 Logstash。首先,讓我們拉取並運行 Logstash 鏡像:

docker pull docker.elastic.co/logstash/logstash:8.17.0

在獲取圖像後,我們為 Logstash 創建一個配置文件“csv-to-es.conf”。該文件定義了 Logstash 如何讀取 CSV 文件以及如何將數據發送到 Elasticsearch。

input {
    file {
        path => "/path/to/your/products.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    csv {
        separator => ","
        columns => ["id", "name", "category", "price", "stock"]
    }

    mutate {
        convert => { "price" => "float" }
        convert => { "stock" => "integer" }
    }
}

output {
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "products"
    }

    stdout {
        codec => json_lines
    }
}

本文件中,我們定義了數據管道的輸入、過濾和輸出階段。輸入階段指定要讀取的 CSV 文件,而過濾階段則處理和轉換數據。 最終,輸出階段將處理後的數據發送到 Elasticsearch。

配置完成後,我們需要調用 docker run 命令來執行 Logstash 管道:

docker run --rm -v $(pwd)/csv-to-es.conf:/usr/share/logstash/pipeline/logstash.conf \
  -v $(pwd)/products.csv:/usr/share/logstash/products.csv \
  docker.elastic.co/logstash/logstash:8.17.0

此命令將我們的配置文件和 CSV 文件掛載到 Logstash 容器,並運行數據管道以將數據導入 Elasticsearch。 在命令成功運行後,我們可以再次運行 curl 查詢以驗證結果。

Logstash 能夠高效地將 CSV 數據導入 Elasticsearch,無需自定義代碼,因此成為處理大型數據集和設置自動化數據管道的流行選擇。

6. 總結

現在我們已經探討了三種從 CSV 文件導入到 Elasticsearch 的方法,接下來讓我們比較它們的優缺點:

方法 優點 缺點
手動循環 易於實現;完全控制 不適合大型文件
Spring Batch 可擴展處理大型數據集 初學者設置複雜
Logstash 無需編碼;高性能 需要 Logstash 安裝

7. 結論

在本文中,我們介紹瞭如何使用三種方法將 CSV 數據導入 Elasticsearch:手動循環、Spring Batch 和 Logstash。每種方法都有其優勢,適用於不同的用例。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.