1. 引言
在本教程中,我們將學習如何使用 Spring Boot 將 CSV 文件中的數據導入 Elasticsearch。從 CSV 文件中導入數據是常見用例,當我們需要從遺留系統或外部來源遷移數據,或準備測試數據集時,經常會用到。
2. 使用 Docker 設置 Elasticsearch
為了使用 Elasticsearch,我們將使用 Docker 在本地設置它。請按照以下步驟啓動 Elasticsearch 容器:
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.17.0接下來,我們使用以下命令運行容器:
docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.17.0
讓我們創建一個示例 Excel 文件“products.csv”幷包含以下數據:
id,name,category,price,stock
1,Microwave,Appliances,705.77,136
2,Vacuum Cleaner,Appliances,1397.23,92
...3. 使用手動循環處理 CSV 數據
第一種方法涉及使用手動 for 循環讀取和索引 CSV 文件中的記錄,並將其導入到 Elasticsearch 中。 要實現此方法,我們將使用 Apache Commons CSV 庫解析 CSV 文件,以及 Elasticsearch Rest High-Level Client 與 Elasticsearch 搜索引擎集成。
讓我們首先將所需的依賴項添加到我們的 <em ref="pom.xml">pom.xml</em> 文件中:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.11</version>
</dependency><p>添加依賴後,我們需要設置 Elasticsearch 的配置。我們創建一個配置類來設置 <em >RestHighLevelClient</em>:</p>
@Configuration
public class ElasticsearchConfig {
@Bean
public RestHighLevelClient restHighLevelClient() {
return RestClients.create(ClientConfiguration.builder()
.connectedTo("localhost:9200")
.build()).rest();
}
}接下來,我們創建一個 Product 類來表示 CSV 數據:
@Document(indexName = "products")
public class Product {
@Id
private String id;
private String name;
private String category;
private double price;
private int stock;
// Getters and setters
}
稍後,我們將為我們的 Spring Boot 應用程序創建一個服務,用於處理 CSV 導入過程。 在服務中,我們使用 for 循環來迭代 CSV 文件中的每一條記錄:
@Autowired
private RestHighLevelClient restHighLevelClient;
public void importCSV(File file) {
try (Reader reader = new FileReader(file)) {
Iterable<CSVRecord> records = CSVFormat.DEFAULT
.withHeader("id", "name", "category", "price", "stock")
.withFirstRecordAsHeader()
.parse(reader);
for (CSVRecord record : records) {
IndexRequest request = new IndexRequest("products")
.id(record.get("id"))
.source(Map.of(
"name", record.get("name"),
"category", record.get("category"),
"price", Double.parseDouble(record.get("price")),
"stock", Integer.parseInt(record.get("stock"))
));
restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
} catch (Exception e) {
// handle exception
}
}對於每個記錄,我們構造一個 IndexRequest 對象,以準備數據進行 Elasticsearch 索引。然後,使用 RestHighLevelClient 索引數據,它是與 Elasticsearch 交互的主要客户端庫。
以下是如何從 CSV 文件導入數據到 Elasticsearch 索引:
File csvFile = Paths.get("src", "test", "resources", "products.csv").toFile();
importCSV(csvFile);接下來,我們查詢第一個索引並驗證其內容與預期值是否匹配:
IndexRequest firstRequest = captor.getAllValues().get(0);
assertEquals(Map.of(
"name", "Microwave",
"category", "Appliances",
"price", 705.77,
"stock", 136
), firstRequest.sourceAsMap());這種方法簡單直接,使我們對整個過程擁有完全的控制權。 然而,它更適合處理較小的數據集,因為對於大型文件來説,它可能效率低下且耗時。
4. 使用 Spring Batch 進行可擴展的數據導入
Spring Batch 是 Java 中用於批處理的強大框架。它非常適合通過分塊處理數據來處理大規模的數據導入。
要使用 Spring Batch,我們需要將 Spring Batch 依賴 添加到我們的 pom.xml 文件中:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.4.1</version>
</dependency>4.1. 定義 Spring 配置文件
接下來,讓我們創建一個配置類來定義批處理作業。在這一配置中,我們使用 <em @EnableBatchProcessing 註解來激活 Spring Batch 功能,從而允許我們創建和管理批處理作業。
我們設置了一個 FlatFileItemReader 來讀取 CSV 文件,以及一個 ItemWriter 將數據寫入 Elasticsearch。 此外,我們還創建並配置了 RestHighLevelClient Bean 在 Spring 配置文件中:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
// ...
@Autowired
private RestHighLevelClient restHighLevelClient
}4.2. 定義讀取器
為了從 CSV 文件讀取數據,我們創建一個名為 reader() 的方法,並定義一個 FlatFileItemReader。 我們將使用 FlatFileItemReaderBuilder 來配置讀取器,並設置各種參數。
@Bean
public FlatFileItemReader<Product> reader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new FileSystemResource("products.csv"))
.delimited()
.names("id", "name", "category", "price", "stock")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(Product.class);
}})
.build();
}我們使用 name() 方法為讀者分配一個名稱,這有助於在批處理作業中識別它。 此外,resource() 方法通過使用 FileSystemResource 指定 CSV 文件的位置,“products.csv”。 文件應為逗號分隔格式,這通過 delimited() 方法進行指定。
names() 方法列出 CSV 文件中的列標題並將其映射到 Product 類中的字段。 此外, fieldSetMapper() 方法使用 BeanWrapperFieldSetMapper 將 CSV 文件中的每一行映射到 Product 對象。
4.3. 定義 Writer
接下來,我們創建一個 writer() 方法,用於將處理後的數據寫入 Elasticsearch。該方法定義了一個 ItemWriter,它接收一個包含 Product 對象的列表。它使用 RestHighLevelClient 與 Elasticsearch 交互:
@Bean
public ItemWriter<Product> writer(RestHighLevelClient restHighLevelClient) {
return products -> {
for (Product product : products) {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(Map.of(
"name", product.getName(),
"category", product.getCategory(),
"price", product.getPrice(),
"stock", product.getStock()
));
restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
};
}對於列表中每個產品,我們創建一條 IndexRequest,以指定 Elasticsearch 索引和文檔結構。 id() 方法使用 Product 對象中的 ID 為每個文檔分配一個唯一的 ID。
source() 方法將 Product 對象的字段(如 name、category、price 和 stock)映射為 Elasticsearch 可以存儲的鍵值格式。
配置請求後,我們使用 client.index() 方法將 Product 記錄發送到 Elasticsearch,確保 product 能夠被搜索和檢索。
4.4. 定義 Spring Batch 任務
最後,讓我們創建 importJob() 方法,並使用 Spring Batch 的 <em >JobBuilder</em> 和 <em >StepBuilder</em> 來配置任務及其步驟:
@Bean
public Job importJob(JobRepository jobRepository, PlatformTransactionManager transactionManager,
RestHighLevelClient restHighLevelClient) {
return new JobBuilder("importJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Product, Product>chunk(10, transactionManager)
.reader(reader())
.writer(writer(restHighLevelClient))
.build())
.build();
}在此示例中,我們使用 JobBuilder 來配置工作項。它接受“importJob”作為工作項名稱和 JobRepository 作為參數。 我們還配置了一個名為“step1”的步驟,並指定工作項將同時處理 10 條記錄。 transactionManager 確保在分塊處理過程中數據的一致性。
reader() 和 writer() 方法已集成到步驟中,用於從 CSV 到 Elasticsearch 的數據流處理。 接下來,我們使用 start() 方法將工作項與步驟連接起來。 這種連接確保步驟作為工作項的一部分執行。 完成此配置後,可以使用 Spring 的 JobLauncher 運行工作項。
4.5. 運行批處理作業
讓我們看一下使用 <em >JobLauncher</em> 運行 Spring Batch 作業的代碼。我們將創建一個 <em >CommandLineRunner</em> Bean,在應用程序啓動時執行該作業:
@Configuration
public class JobRunnerConfig {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importJob;
@Bean
public CommandLineRunner runJob() {
return args -> {
try {
JobExecution execution = jobLauncher.run(importJob, new JobParameters());
} catch (Exception e) {
// handle exception
}
};
}
}在作業成功運行後,我們可以通過使用 curl 發起請求來測試結果:
curl -X GET "http://localhost:9200/products/_search" \
-H "Content-Type: application/json" \
-d '{
"query": {
"match_all": {}
}
}'讓我們來看預期的結果:
{
...
"hits": {
"total": {
"value": 25,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "products",
"_type": "_doc",
"_id": "1",
"_score": 1.0,
"_source": {
"id": "1",
"name": "Microwave",
"category": "Appliances",
"price": 705.77,
"stock": 136
}
},
{
"_index": "products",
"_type": "_doc",
"_id": "2",
"_score": 1.0,
"_source": {
"id": "1",
"name": "Vacuum Cleaner",
"category": "Appliances",
"price": 1397.23,
"stock": 92
}
}
...
]
}
}此方法比之前的那些方法設置更為複雜,但它提供了導入數據的可擴展性和靈活性。
5. 使用 Logstash 導入 CSV 數據
Logstash 是 Elastic Stack 的一部分,旨在進行數據處理和導入。
我們可以使用 Docker 快速設置 Logstash。首先,讓我們拉取並運行 Logstash 鏡像:
docker pull docker.elastic.co/logstash/logstash:8.17.0在獲取圖像後,我們為 Logstash 創建一個配置文件“csv-to-es.conf”。該文件定義了 Logstash 如何讀取 CSV 文件以及如何將數據發送到 Elasticsearch。
input {
file {
path => "/path/to/your/products.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["id", "name", "category", "price", "stock"]
}
mutate {
convert => { "price" => "float" }
convert => { "stock" => "integer" }
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "products"
}
stdout {
codec => json_lines
}
}
本文件中,我們定義了數據管道的輸入、過濾和輸出階段。輸入階段指定要讀取的 CSV 文件,而過濾階段則處理和轉換數據。 最終,輸出階段將處理後的數據發送到 Elasticsearch。
配置完成後,我們需要調用 docker run 命令來執行 Logstash 管道:
docker run --rm -v $(pwd)/csv-to-es.conf:/usr/share/logstash/pipeline/logstash.conf \
-v $(pwd)/products.csv:/usr/share/logstash/products.csv \
docker.elastic.co/logstash/logstash:8.17.0
此命令將我們的配置文件和 CSV 文件掛載到 Logstash 容器,並運行數據管道以將數據導入 Elasticsearch。 在命令成功運行後,我們可以再次運行 curl 查詢以驗證結果。
Logstash 能夠高效地將 CSV 數據導入 Elasticsearch,無需自定義代碼,因此成為處理大型數據集和設置自動化數據管道的流行選擇。
6. 總結
現在我們已經探討了三種從 CSV 文件導入到 Elasticsearch 的方法,接下來讓我們比較它們的優缺點:
| 方法 | 優點 | 缺點 |
|---|---|---|
| 手動循環 | 易於實現;完全控制 | 不適合大型文件 |
| Spring Batch | 可擴展處理大型數據集 | 初學者設置複雜 |
| Logstash | 無需編碼;高性能 | 需要 Logstash 安裝 |
7. 結論
在本文中,我們介紹瞭如何使用三種方法將 CSV 數據導入 Elasticsearch:手動循環、Spring Batch 和 Logstash。每種方法都有其優勢,適用於不同的用例。