一、
壓縮案例(DEFLATE)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
/**
* MApRedce代碼在執行時候會涉及節點和節點之間的數據傳輸
* 1.在數據傳輸過程中,文件的大小是比較大好,還是比較小好?為什麼?如何做
* 小好,網絡開銷小,壓縮(解壓縮)
* Hadoop壓縮格式有6種:使用Java實現(Deflate、gzip、bzip2)、其它工具實現(LZO、LZ4、Snappy)
* 2.數據在內存中是什麼形式,在網絡傳輸中是什麼形式
* Java對象;二進制
* 3.在網絡傳輸時是什麼類型,即採用了什麼數據結構
* SequenceFile、MapFile
*/
public class demo1_壓縮_deflate {
/**
* Compression所有的壓縮解壓縮類(可以理解為compress是各個壓縮解壓縮類的父類)
*/
public static void main(String[] args) throws ClassNotFoundException, IOException {
//創建連接
Configuration conf = new Configuration();
//指定壓縮方式DeflateCodec,即使用什麼壓縮類
String codecClassName = "org.apache.hadoop.io.compress.DeflateCodec";
//通過Class.forName將上一步的字符串加載為類
Class<?> codecClass = Class.forName(codecClassName);
//創建Compression/DeflateCodec這樣的壓縮工具
//CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,conf);
DeflateCodec codec = (DeflateCodec)ReflectionUtils.newInstance(codecClass,conf);
//輸入數據流
FileInputStream fin = new FileInputStream("./BigData培訓教材.pdf");
//輸出數據流
FileOutputStream fout = new FileOutputStream("./demo1.deflate");
//壓縮輸出流
CompressionOutputStream comOut = codec.createOutputStream(fout);
//開始壓縮
//fin表示輸入流,comOut表示壓縮輸出流,1024表示大小,true表示壓縮完成後自動關閉所有流
IOUtils.copyBytes(fin,comOut,1024,true);
}
}
二、
解壓案例(DEFLATE)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
public class demo2_解壓_deflate {
public static void main(String[] args) throws IOException, ClassNotFoundException {
//創建連接
Configuration conf = new Configuration();
//創建解壓類,方法一
DeflateCodec codec = new DeflateCodec();
ReflectionUtils.setConf(codec,conf);
/*方法二
//創建解壓類,方法二
String codecClassName = "org.apache.hadoop.io.compress.DeflateCodec";
Class<?> codecClass = Class.forName(codecClassName);
DeflateCodec codec = (DeflateCodec)ReflectionUtils.newInstance(codecClass,conf);
*/
/*方法三
//創建解壓類,方法三
String codecClassName = "org.apache.hadoop.io.compress.DeflateCodec";
Class<?> codecClass = Class.forName(codecClassName);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,conf);
*/
//輸入流
FileInputStream fin = new FileInputStream("./demo1.deflate");
//輸出流
FileOutputStream fout = new FileOutputStream("./demo2.pdf");
//解壓流
CompressionInputStream comIn = codec.createInputStream(fin);
//開始解壓
IOUtils.copyBytes(comIn,fout,1024,true);
}
}
三、
使用工廠類解壓案例
(CompressionCodecFactory)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
public class demo3_工廠類_解壓_CompressionCodecFactory {
public static void main(String[] args) throws IOException {
//創建連接
Configuration conf = new Configuration();
//實例化工廠類
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
//方法一:通過文件後綴決定使用哪一個編解碼器Codec
CompressionCodec codec = factory.getCodec(new Path("./demo1.deflate"));
//方法二:通過編解碼器類名決定使用哪一個編解碼器Codec
//CompressionCodec codec = factory.getCodecByName("DeflateCodec");
//方法三:通過編解碼器類名決定使用哪一個編解碼器Codec
//CompressionCodec codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.DeflateCodec");
//判斷是否有編解碼器
if (codec==null){
//打印提示
System.out.println("no codec");
//結束程序,退出代碼(exit code)為1
System.exit(1);
}
//打印提示,使用的是哪一個編解碼器
System.out.println("codec ==> "+codec.toString());
//打印提示,打印文件後綴
System.out.println(codec.getDefaultExtension());
//輸入流
FileInputStream fin = new FileInputStream("./demo1.deflate");
//輸出流
FileOutputStream fout = new FileOutputStream("./demo3_3.pdf");
//解壓流
CompressionInputStream comIn = codec.createInputStream(fin);
//開始解壓
IOUtils.copyBytes(comIn,fout,1024,true);
}
}
四、
使用壓縮池進行壓縮
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
/**
* 什麼是壓縮池?
* CodecPool、Compressor,重用壓縮程序和解壓縮程序
* 壓縮池的作用是什麼?
* 重用壓縮程序和解壓縮程序,儘量縮減系統創建編解碼對象的開銷
*/
public class demo4_壓縮池 {
public static void main(String[] args) throws IOException {
//創建連接
Configuration conf = new Configuration();
//實例化工廠類
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
//指定編解碼器
CompressionCodec codec = factory.getCodecByName("DeflateCodec");
//創建一個空的壓縮池對象實例compressor
Compressor compressor = null;
try {
//給壓縮池對象compressor賦值,即賦編解碼器codec
compressor = CodecPool.getCompressor(codec);
//輸入路徑
FileInputStream fin = new FileInputStream("BigData培訓教材.pdf");
//輸出路徑
FileOutputStream fout = new FileOutputStream("./demo4.deflate");
//壓縮流
CompressionOutputStream comOut = codec.createOutputStream(fout);
//開始壓縮
IOUtils.copyBytes(fin,comOut,1024,false);
}finally {
//取消壓縮池對象的賦值
CodecPool.returnCompressor(compressor);
}
}
}