一、

壓縮案例(DEFLATE)



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.util.ReflectionUtils;


import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;


/**
 * MApRedce代碼在執行時候會涉及節點和節點之間的數據傳輸
 * 1.在數據傳輸過程中,文件的大小是比較大好,還是比較小好?為什麼?如何做
 *      小好,網絡開銷小,壓縮(解壓縮)
 *          Hadoop壓縮格式有6種:使用Java實現(Deflate、gzip、bzip2)、其它工具實現(LZO、LZ4、Snappy)
 * 2.數據在內存中是什麼形式,在網絡傳輸中是什麼形式
 *      Java對象;二進制
 * 3.在網絡傳輸時是什麼類型,即採用了什麼數據結構
 *      SequenceFile、MapFile
 */
public class demo1_壓縮_deflate {
    /**
     * Compression所有的壓縮解壓縮類(可以理解為compress是各個壓縮解壓縮類的父類)
     */
    public static void main(String[] args) throws ClassNotFoundException, IOException {


        //創建連接
        Configuration conf = new Configuration();
        //指定壓縮方式DeflateCodec,即使用什麼壓縮類
        String codecClassName = "org.apache.hadoop.io.compress.DeflateCodec";
        //通過Class.forName將上一步的字符串加載為類
        Class<?> codecClass = Class.forName(codecClassName);


        //創建Compression/DeflateCodec這樣的壓縮工具
        //CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,conf);
        DeflateCodec codec = (DeflateCodec)ReflectionUtils.newInstance(codecClass,conf);


        //輸入數據流
        FileInputStream fin = new FileInputStream("./BigData培訓教材.pdf");
        //輸出數據流
        FileOutputStream fout = new FileOutputStream("./demo1.deflate");
        //壓縮輸出流
        CompressionOutputStream comOut = codec.createOutputStream(fout);


        //開始壓縮
        //fin表示輸入流,comOut表示壓縮輸出流,1024表示大小,true表示壓縮完成後自動關閉所有流
        IOUtils.copyBytes(fin,comOut,1024,true);
    }
}


二、

解壓案例(DEFLATE)




import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.util.ReflectionUtils;


import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;


public class demo2_解壓_deflate {
    public static void main(String[] args) throws IOException, ClassNotFoundException {


        //創建連接
        Configuration conf = new Configuration();
        //創建解壓類,方法一
        DeflateCodec codec = new DeflateCodec();
        ReflectionUtils.setConf(codec,conf);


        /*方法二
        //創建解壓類,方法二
        String codecClassName = "org.apache.hadoop.io.compress.DeflateCodec";
        Class<?> codecClass = Class.forName(codecClassName);
        DeflateCodec codec = (DeflateCodec)ReflectionUtils.newInstance(codecClass,conf);
        */
        /*方法三
        //創建解壓類,方法三
        String codecClassName = "org.apache.hadoop.io.compress.DeflateCodec";
        Class<?> codecClass = Class.forName(codecClassName);
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,conf);
        */


        //輸入流
        FileInputStream fin = new FileInputStream("./demo1.deflate");
        //輸出流
        FileOutputStream fout = new FileOutputStream("./demo2.pdf");


        //解壓流
        CompressionInputStream comIn = codec.createInputStream(fin);
        //開始解壓
        IOUtils.copyBytes(comIn,fout,1024,true);
    }
}


三、

使用工廠類解壓案例

(CompressionCodecFactory)




import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;


import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;


public class demo3_工廠類_解壓_CompressionCodecFactory {


    public static void main(String[] args) throws IOException {
        //創建連接
        Configuration conf = new Configuration();
        //實例化工廠類
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        //方法一:通過文件後綴決定使用哪一個編解碼器Codec
        CompressionCodec codec = factory.getCodec(new Path("./demo1.deflate"));
        //方法二:通過編解碼器類名決定使用哪一個編解碼器Codec
        //CompressionCodec codec = factory.getCodecByName("DeflateCodec");
        //方法三:通過編解碼器類名決定使用哪一個編解碼器Codec
        //CompressionCodec codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.DeflateCodec");


        //判斷是否有編解碼器
        if (codec==null){
            //打印提示
            System.out.println("no codec");
            //結束程序,退出代碼(exit code)為1
            System.exit(1);
        }
        //打印提示,使用的是哪一個編解碼器
        System.out.println("codec ==> "+codec.toString());
        //打印提示,打印文件後綴
        System.out.println(codec.getDefaultExtension());


        //輸入流
        FileInputStream fin = new FileInputStream("./demo1.deflate");
        //輸出流
        FileOutputStream fout = new FileOutputStream("./demo3_3.pdf");


        //解壓流
        CompressionInputStream comIn = codec.createInputStream(fin);
        //開始解壓
        IOUtils.copyBytes(comIn,fout,1024,true);
    }
}


四、

使用壓縮池進行壓縮




import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;


import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;


/**
 * 什麼是壓縮池?
 *      CodecPool、Compressor,重用壓縮程序和解壓縮程序
 * 壓縮池的作用是什麼?
 *      重用壓縮程序和解壓縮程序,儘量縮減系統創建編解碼對象的開銷
 */


public class demo4_壓縮池 {
    public static void main(String[] args) throws IOException {
        //創建連接
        Configuration conf = new Configuration();


        //實例化工廠類
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        //指定編解碼器
        CompressionCodec codec = factory.getCodecByName("DeflateCodec");


        //創建一個空的壓縮池對象實例compressor
        Compressor compressor = null;
        try {
            //給壓縮池對象compressor賦值,即賦編解碼器codec
            compressor = CodecPool.getCompressor(codec);
            //輸入路徑
            FileInputStream fin = new FileInputStream("BigData培訓教材.pdf");
            //輸出路徑
            FileOutputStream fout = new FileOutputStream("./demo4.deflate");
            //壓縮流
            CompressionOutputStream comOut = codec.createOutputStream(fout);
            //開始壓縮
            IOUtils.copyBytes(fin,comOut,1024,false);
        }finally {
            //取消壓縮池對象的賦值
            CodecPool.returnCompressor(compressor);
        }
    }
}