9.5.2 使用FileSystem API读取数据

9.5.1 节提到在应用中会出现不能使用URLStreamHandlerFactory的情况,这时就需要使用FileSystem的API打开一个文件的输入流了。

文件在Hadoop文件系统中被视为一个Hadoop Path对象。我们可以把一个路径视为Hadoop的文件系统URI,比如上文中的hdfs://localhost/user/ubuntu/In/hello.txt。

FileSystemAPI是一个高层抽象的文件系统API,所以,首先要找到这里的文件系统实例HDFS。取得FileSystem实例有两种静态工厂方法:

public static FileSystem get(Configuration conf)throws IOException

public static FileSystem get(URI uri, Configuration conf)throws IOException

Configuration对象封装了一个客户端或服务器的配置,这是用路径读取的配置文件设置的,一般为conf/core-site.xml。第一个方法返回的是默认文件系统,如果没有设置,则为默认的本地文件系统。第二个方法使用指定的URI方案决定文件系统的权限,如果指定的URI中没有指定方案,则退回默认的文件系统。

有了FileSystem实例后,可通过open()方法得到一个文件的输入流:


public FSDataInputStream open(Path f)throws IOException

public abstract FSDataInputStream open(Path f, int bufferSize)throws IOException


第一个方法直接使用默认的4KB的缓冲区,如例9-2所示。

例9-2:使用FileSystem API显示Hadoop文件系统中的文件


public class FileSystemCat{

public static void main(String[]args)throws Exception{

String uri=args[0];

Configuration conf=new Configuration();

FileSystem fs=FileSystem.get(URI.create(uri),conf);

InputStream in=null;

try{

in=fs.open(new Path(uri));

IOUtils.copyBytes(in, System.out,4096,false);

}finally{

IOUtils.closeStream(in);

}

}

}


然后设置程序运行参数为hdfs://localhost/user/ubuntu/In/hello.txt,运行程序即可看到hello.txt中的文本内容“Hello Hadoop!”。

下面对例9-2中的程序进行扩展,重点关注FSDataInputStream。

FileSystem中的open方法实际上返回的是一个FSDataInputStream,而不是标准的java.io类。这个类是java.io.DataInputStream的一个子类,支持随机访问,并可以从流的任意位置读取,代码如下:


public class FSDataInputStream extends DataInputStream

implements Seekable, PositionedReadable{

//implementation elided

}


Seekable接口允许在文件中定位并提供一个查询方法用于查询当前位置相对于文件开始的偏移量(getPos()),代码如下:


public interface Seekable{

void seek(long pos)throws IOException;

long getPos()throws IOException;

boolean seekToNewSource(long targetPos)throws IOException;

}


其中,调用seek()来定位大于文件长度的位置会导致IOException异常。开发人员并不常用seekT-oNewSource()方法,此方法倾向于切换到数据的另一个副本,并在新的副本中找寻targetPos制定的位置。HDFS就采用这样的方法在数据节点出现故障时为客户端提供可靠的数据流访问的。如例9-3所示。

例9-3:扩展例9-2,通过使用seek读取一次后,重新定位到文件头第三位,再次显示Hadoop文件系统中的文件内容


package cn.edn.ruc.cloudcomputing.book.chapter09;

import java.io.*;

import java.net.URI;

import java.net.URL;

import java.util.*;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapred.*;

import org.apache.hadoop.util.*;

public class DoubleCat{

public static void main(String[]args)throws Exception{

String uri=args[0];

Configuration conf=new Configuration();

FileSystem fs=FileSystem.get(URI.create(uri),conf);

FSDataInputStream in=null;

try{

in=fs.open(new Path(uri));

IOUtils.copyBytes(in, System.out,4096,false);

in.seek(3);//go back to pos 3 of the file

IOUtils.copyBytes(in, System.out,4096,false);

}finally{

IOUtils.closeStream(in);

}

}

}


然后设置程序运行参数为hdfs://localhost/user/ubuntu/In/hello.txt,运行程序即可看到hello.txt中的文本内容“Hello Hadoop!lo Hadoop!”。

同时,FSDataInputStream也实现了PositionedReadable接口,从一个制定位置读取一部分数据。这里不再详细介绍,大家可以参考以下源代码。


public interface PositionedReadable{

public int read(long position, byte[]buffer, int offset, int length)

throws IOException;

public void readFully(long position, byte[]buffer, int offset, int length)

throws IOException;

public void readFully(long position, byte[]buffer)throws IOException;

}


需要注意的是,seek()是一个高开销的操作,需要慎重使用。通常我们是依靠流数据MapReduce构建应用访问模式,而不是大量地执行seek操作。