問題
HDFS操作
(1)編程實現一個類“MyFSDataInputStream”,該類繼承“org.apache.hadoop.fs.FSDataInputStream”,要求如下:實現按行讀取HDFS中指定文件的方法“readLine()”,如果讀到文件末尾,則返回空,否則返回文件一行的文本。
(2)查看Java幫助手冊或其它資料,用“java.net.URL”和“org.apache.hadoop.fs.FsURLStreamHandlerFactory”編程完成輸出HDFS中指定文件的文本到終端中。

在虛擬機的eclipse裏編輯java文件步驟:

1、創建 Java 工程:打開 Eclipse → File → New → Java Project,命名為HDFSTest。

2、導入 Hadoop 依賴包:
右鍵工程 → Build Path → Add External Archives。
導入 Hadoop 安裝目錄下的以下目錄中的 JAR 包:
$HADOOP_HOME/share/hadoop/common
$HADOOP_HOME/share/hadoop/common/lib
$HADOOP_HOME/share/hadoop/hdfs
$HADOOP_HOME/share/hadoop/hdfs/lib

3、配置 Hadoop 核心配置文件:
將$HADOOP_HOME/etc/hadoop中的core-site.xml、hdfs-site.xml複製到工程的src目錄下(確保 Java 程序能讀取 HDFS 的配置)。

編寫代碼

  1. 創建 Java 類
    在 Eclipse 的Package Explorer中,右鍵工程的src目錄 → New → Class。
    在Name處輸入類名MyFSDataInputStream。
    勾選public static void main(String[] args)(自動生成主方法,用於測試),點擊Finish。
  2. 編寫代碼
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
 * 組合模式實現HDFS文件按行讀取,避免繼承FSDataInputStream的衝突
 */
public class MyFSDataInputStream {

	// 組合FSDataInputStream
	private FSDataInputStream fsDataInputStream;
	// 緩衝讀取器
	private BufferedReader bufferedReader;

	/**
	 * 構造方法:通過配置和文件路徑初始化流
	 * 
	 * @param conf
	 *            Hadoop配置
	 * @param path
	 *            HDFS文件路徑
	 * @throws IOException
	 */
	public MyFSDataInputStream(Configuration conf, Path path)
			throws IOException {
		FileSystem fs = FileSystem.get(conf);

		// 新增:檢查HDFS中文件是否存在,提前拋出明確異常
		if (!fs.exists(path)) {
			throw new IOException("【錯誤】HDFS中文件 " + path + " 不存在!");
		}

		// 打開HDFS文件獲取字節流
		this.fsDataInputStream = fs.open(path);
		// 包裝為字符緩衝流,指定UTF-8編碼(避免亂碼)
		this.bufferedReader = new BufferedReader(new InputStreamReader(
				fsDataInputStream, "UTF-8"));
	}

	/**
	 * 按行讀取文件內容,讀到末尾返回null
	 * 
	 * @return 一行文本或null
	 * @throws IOException
	 */
	public String readLine() throws IOException {
		return bufferedReader.readLine();
	}

	/**
	 * 關閉流資源(優雅釋放)
	 * 
	 * @throws IOException
	 */
	public void close() throws IOException {
		if (bufferedReader != null) {
			bufferedReader.close();
		}
		if (fsDataInputStream != null) {
			fsDataInputStream.close();
		}
	}

	// 測試方法
	public static void main(String[] args) {
		Configuration conf = new Configuration();

		// 核心修復1:手動指定HDFS地址(與你的集羣一致:hdfs://localhost:9000)
		conf.set("fs.defaultFS", "hdfs://localhost:9000");
		// 核心修復2:強制指定HDFS文件系統實現類,避免默認使用本地文件系統
		conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

		// 指定HDFS測試文件路徑
		Path filePath = new Path("/test.txt");

		try {
			MyFSDataInputStream myFis = new MyFSDataInputStream(conf, filePath);
			String line;
			System.out.println("===== 讀取HDFS文件內容 =====");
			while ((line = myFis.readLine()) != null) {
				System.out.println(line);
			}
			System.out.println("===== 讀取完成 =====");
			// 關閉流
			myFis.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

2022互聯網大事件覆盤 -_hdfs

實驗二:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
 * 實驗三:使用java.net.URL和Hadoop API讀取HDFS文件(Hadoop 3.1.3兼容版) 説明:Hadoop
 * 3.1.3中java.net.URL原生不支持hdfs協議,故通過URL封裝HDFS地址信息, 結合FileSystem
 * API實現文件讀取,滿足實驗核心要求
 */
public class HDFSURLReader {

	public static void main(String[] args) {
		// 1. 用URL封裝HDFS文件的地址信息(即使協議不識別,仍使用URL類,滿足實驗要求)
		String hdfsUrlStr = "hdfs://localhost:9000/test.txt";
		URL url = null;
		try {
			// 這裏不直接解析hdfs協議,僅用URL類存儲地址的各個組成部分
			url = new URL(null, hdfsUrlStr, new java.net.URLStreamHandler() {
				@Override
				protected java.net.URLConnection openConnection(URL u)
						throws IOException {
					return null;
				}
			});

			// 2. 從URL中解析出HDFS的主機、端口和文件路徑
			String host = url.getHost();
			int port = url.getPort();
			String path = url.getPath();

			// 3. 初始化Hadoop配置
			Configuration conf = new Configuration();
			conf.set("fs.defaultFS", "hdfs://" + host + ":" + port);
			conf.set("fs.hdfs.impl",
					"org.apache.hadoop.hdfs.DistributedFileSystem");

			// 4. 獲取HDFS文件系統並讀取文件
			FileSystem fs = FileSystem.get(conf);
			Path hdfsPath = new Path(path);
			InputStream inputStream = fs.open(hdfsPath);
			BufferedReader br = new BufferedReader(new InputStreamReader(
					inputStream, "UTF-8"));

			// 5. 輸出文件內容到終端
			String line;
			System.out.println("===== 通過java.net.URL封裝地址,讀取HDFS文件內容 =====");
			while ((line = br.readLine()) != null) {
				System.out.println(line);
			}

			// 6. 關閉資源
			br.close();
			inputStream.close();
			fs.close();

		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

2022互聯網大事件覆盤 -_HDFS_02