1. 概述
Apache Avro 是一種廣泛使用的數據序列化系統,尤其在大數據應用中受到歡迎,因為它具有高效性和模式演進能力。在本教程中,我們將通過 Avro 進行對象轉換到 JSON,以及將整個 Avro 文件轉換為 JSON 文件。這對於數據檢查和調試尤其有用。
在當今數據驅動的世界中,能夠處理不同數據格式的能力至關重要。Apache Avro 經常用於需要高性能和存儲效率的系統,例如 Apache Hadoop。
2. 配置
為了開始,讓我們為我們的 pom.xml文件添加Avro和JSON的依賴項。
我們為本教程添加了Apache Avro的1.11.1版本:1.11.1
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>3. 將Avro對象轉換為JSON
將Java對象通過Avro轉換為JSON涉及若干步驟,包括:
- 推斷/構建Avro模式
- 將Java對象轉換為Avro的
<em >GenericRecord</em>,最後 - 將對象轉換為JSON
我們將利用Avro的Reflect API從Java對象中動態推斷模式,而不是手動定義模式。
為了演示這一點,讓我們創建一個Point類,該類具有兩個整數屬性,x和y:
public class Point {
private int x;
private int y;
public Point(int x, int y) {
this.x = x;
this.y = y;
}
// Getters and setters
}
我們現在來推斷模式:
public Schema inferSchema(Point p) {
return ReflectData.get().getSchema(p.getClass());
}我們定義了一個名為 inferSchema 的方法,並使用 ReflectData 類中的 getSchema 方法來從 point 對象推斷 schema。 schema 描述了字段 x 和 y 及其數據類型。
接下來,讓我們從 Point 對象創建一個 GenericRecord 對象,並將其轉換為 JSON:
public String convertObjectToJson(Point p, Schema schema) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("x", p.getX());
genericRecord.put("y", p.getY());
Encoder encoder = EncoderFactory.get().jsonEncoder(schema, outputStream);
datumWriter.write(genericRecord, encoder);
encoder.flush();
outputStream.close();
return outputStream.toString();
} catch (Exception e) {
throw new RuntimeException(e);
}
}方法 convertObjectToJson 將 Point 對象轉換為 JSON 字符串,使用提供的模式。首先,根據提供的模式創建了一個 GenericRecord 對象,並用 Point 對象的元數據填充,然後使用 DatumWriter 通過 JsonEncoder 對象將數據寫入 ByteArrayOutputStream,最後,使用 OutputStream 對象的 toString 方法獲取 JSON 字符串。
讓我們驗證生成的 JSON 內容:
private AvroFileToJsonFile avroFileToJsonFile;
private Point p;
private String expectedOutput;
@BeforeEach
public void setup() {
avroFileToJsonFile = new AvroFileToJsonFile();
p = new Point(2, 4);
expectedOutput = "{\"x\":2,\"y\":4}";
}
@Test
public void whenConvertedToJson_ThenEquals() {
String response = avroFileToJsonFile.convertObjectToJson(p, avroFileToJsonFile.inferSchema(p));
assertEquals(expectedOutput, response);
}4. 將 Avro 文件轉換為 JSON 文件
將整個 Avro 文件轉換為 JSON 文件遵循相似的過程,但涉及從文件讀取數據。這在我們需要將存儲在 Avro 格式的磁盤數據轉換為更易訪問的格式,例如 JSON 時非常常見。
讓我們首先定義一個方法 writeAvroToFile,用於將 Avro 數據寫入文件:
public void writeAvroToFile(Schema schema, List<Point> records, File writeLocation) {
try {
if (writeLocation.exists()) {
if (!writeLocation.delete()) {
System.err.println("Failed to delete existing file.");
return;
}
}
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, writeLocation);
for (Point record: records) {
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("x", record.getX());
genericRecord.put("y", record.getY());
dataFileWriter.append(genericRecord);
}
dataFileWriter.close();
} catch (IOException e) {
e.printStackTrace();
System.out.println("Error writing Avro file.");
}
}該方法通過將 Point 對象轉換為 Avro 格式,並按照提供的 Schema 將它們結構化為 GenericRecord 實例來實現。 GenericDatumWrite 用於序列化這些記錄,然後使用 DataFileWriter 將它們寫入 Avro 文件。
讓我們驗證文件是否已寫入並存在:
private File dataLocation;
private File jsonDataLocation;
...
@BeforeEach
public void setup() {
// Load files from the resources folder
ClassLoader classLoader = getClass().getClassLoader();
dataLocation = new File(classLoader.getResource("").getFile(), "data.avro");
jsonDataLocation = new File(classLoader.getResource("").getFile(), "data.json");
...
}
...
@Test
public void whenAvroContentWrittenToFile_ThenExist(){
Schema schema = avroFileToJsonFile.inferSchema(p);
avroFileToJsonFile.writeAvroToFile(schema, List.of(p), dataLocation);
assertTrue(dataLocation.exists());
}接下來,我們將從存儲位置讀取文件並將其寫入另一個文件,格式為 JSON。
讓我們創建一個名為 readAvroFromFileToJsonFile 的方法來處理此操作:
public void readAvroFromFileToJsonFile(File readLocation, File jsonFilePath) {
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
try {
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(readLocation, reader);
DatumWriter<GenericRecord> jsonWriter = new GenericDatumWriter<>(dataFileReader.getSchema());
Schema schema = dataFileReader.getSchema();
OutputStream fos = new FileOutputStream(jsonFilePath);
JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, fos);
while (dataFileReader.hasNext()) {
GenericRecord record = dataFileReader.next();
System.out.println(record.toString());
jsonWriter.write(record, jsonEncoder);
jsonEncoder.flush();
}
dataFileReader.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}我們從 readLocation 讀取 Avro 數據,並將其寫入為 JSON 到 jsonFilePath。我們使用 DataFileReader 從 Avro 文件中讀取 GenericRecord 實例,然後使用 JsonEncoder 和 GenericDatumWriter 將這些記錄序列化為 JSON 格式。
讓我們繼續確認寫入文件生成的 JSON 內容:
@Test
public void whenAvroFileWrittenToJsonFile_ThenJsonContentEquals() throws IOException {
avroFileToJsonFile.readAvroFromFileToJsonFile(dataLocation, jsonDataLocation);
String text = Files.readString(jsonDataLocation.toPath());
assertEquals(expectedOutput, text);
}5. 結論
本文介紹瞭如何將 Avro 內容寫入文件、讀取文件並將其存儲為 JSON 格式的文件,並通過示例説明了該過程。 此外,值得注意的是,模式也可以存儲在單獨的文件中,而不是包含在數據中。