網(wǎng)上有很多關(guān)于pos機(jī)架,帶你HDFS讀文件過程分析的知識(shí),也有很多人為大家解答關(guān)于pos機(jī)架的問題,今天pos機(jī)之家(m.afbey.com)為大家整理了關(guān)于這方面的知識(shí),讓我們一起來(lái)看下吧!
本文目錄一覽:
pos機(jī)架
前言我們可以從java.io.InputStream類中看到,抽象出一個(gè)read方法,用來(lái)讀取已經(jīng)打開的InputStream實(shí)例中的字節(jié),每次調(diào)用read方法,會(huì)讀取一個(gè)字節(jié)數(shù)據(jù),該方法抽象定義,如下所示:public abstract int read() throws IOException;Hadoop的DFSClient.DFSInputStream類實(shí)現(xiàn)了該抽象邏輯,如果我們清楚了如何從HDFS中讀取一個(gè)文件的一個(gè)block的一個(gè)字節(jié)的原理,更加抽象的頂層只需要迭代即可獲取到該文件的全部數(shù)據(jù)。從HDFS讀文件過程分析:獲取文件對(duì)應(yīng)的Block列表中,我們已經(jīng)獲取到一個(gè)文件對(duì)應(yīng)的Block列表信息,打開一個(gè)文件,接下來(lái)就要讀取實(shí)際的物理塊數(shù)據(jù),我們從下面的幾個(gè)方面來(lái)詳細(xì)說明讀取數(shù)據(jù)的過程。
Client從Datanode讀取文件的一個(gè)字節(jié)
下面,我們通過分析DFSClient.DFSInputStream中實(shí)現(xiàn)的代碼,讀取HDFS上文件的內(nèi)容。首先從下面的方法開始:
@Overridepublic synchronized int read() throws IOException {int ret = read( oneByteBuf, 0, 1 );return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);}
上面調(diào)用read(oneByteBuf, 0, 1)讀取一個(gè)字節(jié)到單字節(jié)緩沖區(qū)oneByteBuf中,具體實(shí)現(xiàn)見如下方法:
@Overridepublic synchronized int read(byte buf[], int off, int len) throws IOException {checkOpen(); // 檢查Client是否正在運(yùn)行if (closed) {throw new IOException("Stream closed");}failures = 0;if (pos < getFileLength()) { // getFileLength()獲取文件所包含的總字節(jié)數(shù),pos表示讀取當(dāng)前文件的第(pos+1)個(gè)字節(jié)int retries = 2;while (retries > 0) {try {if (pos > blockEnd) { // blockEnd表示文件的長(zhǎng)度(字節(jié)數(shù))currentNode = blockSeekTo(pos); // 找到第pos個(gè)字節(jié)數(shù)據(jù)所在的Datanode(實(shí)際根據(jù)該字節(jié)數(shù)據(jù)所在的block元數(shù)據(jù)來(lái)定位)}int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));int result = readBuffer(buf, off, realLen); // 讀取一個(gè)字節(jié)到緩沖區(qū)中if (result >= 0) {pos += result; // 每成功讀取result個(gè)字節(jié),pos增加result} else {// got a EOS from reader though we expect more data on it.throw new IOException("Unexpected EOS from the reader");}if (stats != null && result != -1) {stats.incrementBytesRead(result);}return result;} catch (ChecksumException ce) {throw ce;} catch (IOException e) {if (retries == 1) {LOG.warn("DFS Read: " + StringUtils.stringifyException(e));}blockEnd = -1;if (currentNode != null) { addToDeadNodes(currentNode); }if (--retries == 0) {throw e;}}}}return -1;}
讀取文件數(shù)據(jù)的一個(gè)字節(jié),具體過程如下:
檢查流對(duì)象是否處于打開狀態(tài)(前面已經(jīng)獲取到文件對(duì)應(yīng)的block列表的元數(shù)據(jù),并打開一個(gè)InputStream對(duì)象)從文件的第一個(gè)block開始讀取,首先需要找到第一個(gè)block對(duì)應(yīng)的數(shù)據(jù)塊所在的Datanode,可以從緩存的block列表中查詢到(如果查找不到,則會(huì)與Namenode進(jìn)行一次RPC通信請(qǐng)求獲取到)打開一個(gè)到該讀取的block所在Datanode節(jié)點(diǎn)的流,準(zhǔn)備讀取block數(shù)據(jù)建立了到Datanode的連接后,讀取一個(gè)字節(jié)數(shù)據(jù)到字節(jié)緩沖區(qū)中,返回讀取的字節(jié)數(shù)(1個(gè)字節(jié))在讀取的過程中,以字節(jié)為單位,通過判斷某個(gè)偏移位置的字節(jié)屬于哪個(gè)block(根據(jù)block元數(shù)據(jù)所限定的字節(jié)偏移范圍),在根據(jù)這個(gè)block去定位某一個(gè)Datanode節(jié)點(diǎn),這樣就可連續(xù)地讀取一個(gè)文件的全部數(shù)據(jù)(組成文件的、連續(xù)的多個(gè)block數(shù)據(jù)塊)。
查找待讀取的一個(gè)字節(jié)所在的Datanode節(jié)點(diǎn)
上面public synchronized int read(byte buf[], int off, int len) throws IOException方法,調(diào)用了blockSeekTo方法來(lái)獲取,文件某個(gè)字節(jié)索引位置的數(shù)據(jù)所在的Datanode節(jié)點(diǎn)。其實(shí),很容易就能想到,想要獲取到數(shù)據(jù)所在的Datanode節(jié)點(diǎn),一定是從block元數(shù)據(jù)中計(jì)算得到,然后根據(jù)Client緩存的block映射列表,找到block對(duì)應(yīng)的Datanode列表,我們看一下blockSeekTo方法的代碼實(shí)現(xiàn):
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {... ...DatanodeInfo chosenNode = null;int refetchToken = 1; // only need to get a new access token oncewhile (true) {LocatedBlock targetBlock = getBlockAt(target, true); // 獲取字節(jié)偏移位置為target的字節(jié)數(shù)據(jù)所在的block元數(shù)據(jù)對(duì)象assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;long offsetIntoBlock = target - targetBlock.getStartOffset();DNAddrPair retval = chooseDataNode(targetBlock); // 選擇一個(gè)Datanode去讀取數(shù)據(jù)chosenNode = retval.info;InetSocketAddress targetAddr = retval.addr;// 先嘗試從本地讀取數(shù)據(jù),如果數(shù)據(jù)不在本地,則正常去讀取遠(yuǎn)程的Datanode節(jié)點(diǎn)Block blk = targetBlock.getBlock();Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();if (shouldTryShortCircuitRead(targetAddr)) {try {blockReader = getLocalBlockReader(conf, src, blk, accessToken,chosenNode, DFSClient.this.socketTimeout, offsetIntoBlock); // 創(chuàng)建一個(gè)用來(lái)讀取本地?cái)?shù)據(jù)的BlockReader對(duì)象return chosenNode;} catch (AccessControlException ex) {LOG.warn("Short circuit access failed ", ex);//Disable short circuit readsshortCircuitLocalReads = false;} catch (IOException ex) {if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {/* Get a new access token and retry. */refetchToken--;fetchBlockAt(target);continue;} else {LOG.info("Failed to read " + targetBlock.getBlock()+ " on local machine" + StringUtils.stringifyException(ex));LOG.info("Try reading via the datanode on " + targetAddr);}}}// 本地讀取失敗,按照更一般的方式去讀取遠(yuǎn)程的Datanode節(jié)點(diǎn)來(lái)獲取數(shù)據(jù)try {s = socketFactory.createSocket();LOG.debug("Connecting to " + targetAddr);NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);s.setSoTimeout(socketTimeout);blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),accessToken,blk.getGenerationStamp(),offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,buffersize, verifyChecksum, clientName); // 創(chuàng)建一個(gè)遠(yuǎn)程的BlockReader對(duì)象return chosenNode;} catch (IOException ex) {if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {refetchToken--;fetchBlockAt(target);} else {LOG.warn("Failed to connect to " + targetAddr+ ", add to deadNodes and continue" + ex);if (LOG.isDebugEnabled()) {LOG.debug("Connection failure", ex);}// Put chosen node into dead list, continueaddToDeadNodes(chosenNode); // 讀取失敗,會(huì)將選擇的Datanode加入到Client的dead node列表,為下次讀取選擇合適的Datanode讀取文件數(shù)據(jù)提供參考元數(shù)據(jù)信息}if (s != null) {try {s.close();} catch (IOException iex) { }}s = null;}}}
上面代碼中,主要包括如下幾個(gè)要點(diǎn):
選擇合適的Datanode節(jié)點(diǎn),提高讀取效率在讀取文件的時(shí)候,首先會(huì)從Namenode獲取文件對(duì)應(yīng)的block列表元數(shù)據(jù),返回的block列表是按照Datanode的網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)進(jìn)行排序過的(本地節(jié)點(diǎn)優(yōu)先,其次是同一機(jī)架節(jié)點(diǎn)),而且,Client還維護(hù)了一個(gè)dead node列表,只要此時(shí)bock對(duì)應(yīng)的Datanode列表中節(jié)點(diǎn)不出現(xiàn)在dead node列表中就會(huì)被返回,用來(lái)作為讀取數(shù)據(jù)的Datanode節(jié)點(diǎn)。
如果Client為集群Datanode節(jié)點(diǎn),嘗試從本地讀取block通過調(diào)用chooseDataNode方法返回一個(gè)Datanode結(jié)點(diǎn),通過判斷,如果該節(jié)點(diǎn)地址是本地地址,并且該節(jié)點(diǎn)上對(duì)應(yīng)的block元數(shù)據(jù)信息的狀態(tài)不是正在創(chuàng)建的狀態(tài),則滿足從本地讀取數(shù)據(jù)塊的條件,然后會(huì)創(chuàng)建一個(gè)LocalBlockReader對(duì)象,直接從本地讀取。在創(chuàng)建LocalBlockReader對(duì)象的過程中,會(huì)先從緩存中查找一個(gè)本地Datanode相關(guān)的LocalDatanodeInfo對(duì)象,該對(duì)象定義了與從本地Datanode讀取數(shù)據(jù)的重要信息,以及緩存了待讀取block對(duì)應(yīng)的本地路徑信息,可以從LocalDatanodeInfo類定義的屬性來(lái)說明:
private ClientDatanodeProtocol proxy = null;private final Map<Block, BlockLocalPathInfo> cache;
如果緩存中存在待讀取的block的相關(guān)信息,可以直接進(jìn)行讀??;否則,會(huì)創(chuàng)建一個(gè)proxy對(duì)象,以及計(jì)算待讀取block的路徑信息BlockLocalPathInfo,最后再加入到緩存,為后續(xù)可能的讀取加速。我們看一下如果沒有從緩存中找到LocalDatanodeInfo信息(尤其是BlockLocalPathInfo),則會(huì)執(zhí)行如下邏輯:
// make RPC to local datanode to find local pathnames of blockspathinfo = proxy.getBlockLocalPathInfo(blk, token);
上面proxy為ClientDatanodeProtocol類型,Client與Datanode進(jìn)行RPC通信的協(xié)議,RPC調(diào)用getBlockLocalPathInfo獲取block對(duì)應(yīng)的本地路徑信息,可以在Datanode類中查看具體實(shí)現(xiàn),如下所示:
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
Datanode調(diào)用FSDataset(實(shí)現(xiàn)接口FSDatasetInterface)的getBlockLocalPathInfo,如下所示:
@Override //FSDatasetInterfacepublic BlockLocalPathInfo getBlockLocalPathInfo(Block block)throws IOException {File datafile = getBlockFile(block); // 獲取本地block在本地Datanode文件系統(tǒng)中的文件路徑File metafile = getMetaFile(datafile, block); // 獲取本地block在本地Datanode文件系統(tǒng)中的元數(shù)據(jù)的文件路徑BlockLocalPathInfo info = new BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath());return info;}
接著可以直接去讀取該block文件(如果需要檢查校驗(yàn)和文件,會(huì)讀取block的元數(shù)據(jù)文件metafile):
... // BlockReaderLocal類的newBlockReader靜態(tài)方法// get a local file systemFile blkfile = new File(pathinfo.getBlockPath());dataIn = new FileInputStream(blkfile);if (!skipChecksum) { // 如果檢查block的校驗(yàn)和// get the metadata fileFile metafile = new File(pathinfo.getMetaPath());checksumIn = new FileInputStream(metafile);// read and handle the common header here. For now just a versionBlockMetadataHeader header = BlockMetadataHeader.readHeader(new DataInputStream(checksumIn));short version = header.getVersion();if (version != FSDataset.METADATA_VERSION) {LOG.warn("Wrong version (" + version + ") for metadata file for " + blk + " ignoring ...");}DataChecksum checksum = header.getChecksum();localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum, true, dataIn, checksumIn);} else {localBlockReader = new BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);}
在上面代碼中,返回了BlockLocalPathInfo,但是很可能在這個(gè)過程中block被刪除了,在刪除block的時(shí)候,Namenode會(huì)調(diào)度指派該Datanode刪除該block,恰好在這個(gè)時(shí)間間隔內(nèi)block對(duì)應(yīng)的BlockLocalPathInfo信息已經(jīng)失效(文件已經(jīng)被刪除),所以上面這段代碼再try中會(huì)拋出異常,并在catch中捕獲到IO異常,會(huì)從緩存中再清除掉失效的block到BlockLocalPathInfo的映射信息。
如果Client非集群Datanode節(jié)點(diǎn),遠(yuǎn)程讀取block如果Client不是Datanode本地節(jié)點(diǎn),則只能跨網(wǎng)絡(luò)節(jié)點(diǎn)遠(yuǎn)程讀取,首先創(chuàng)建Socket連接:
s = socketFactory.createSocket();LOG.debug("Connecting to " + targetAddr);NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);s.setSoTimeout(socketTimeout);
建立Client到目標(biāo)Datanode(targetAddr)的連接,然后同樣也是創(chuàng)建一個(gè)遠(yuǎn)程BlockReader對(duì)象RemoteBlockReader來(lái)輔助讀取block數(shù)據(jù)。創(chuàng)建RemoteBlockReader過程中,首先向目標(biāo)Datanode發(fā)送RPC請(qǐng)求:
// in and out will be closed when sock is closed (by the caller)DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));//write the header.out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client與Datanode之間傳輸數(shù)據(jù)的版本號(hào)out.write( DataTransferProtocol.OP_READ_BLOCK ); // 傳輸操作類型:讀取blockout.writeLong( blockId ); // block IDout.writeLong( genStamp ); // 時(shí)間戳信息out.writeLong( startOffset ); // block起始偏移量out.writeLong( len ); // block長(zhǎng)度Text.writeString(out, clientName); // 客戶端標(biāo)識(shí)accessToken.write(out);out.flush();
然后獲取到DataInputStream對(duì)象來(lái)讀取Datanode的響應(yīng)信息:
DataInputStream in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(sock), bufferSize));
最后,返回一個(gè)對(duì)象RemoteBlockReader:
return new RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock);
借助BlockReader來(lái)讀取block字節(jié)
我們?cè)倩氐絙lockSeekTo方法中,待讀取block所在的Datanode信息、BlockReader信息都已經(jīng)具備,接著就可以從包含輸入流(InputStream)對(duì)象的BlockReader中讀取數(shù)據(jù)塊中一個(gè)字節(jié)數(shù)據(jù):
int result = readBuffer(buf, off, realLen);
將block數(shù)據(jù)中一個(gè)字節(jié)讀取到buf中,如下所示:
private synchronized int readBuffer(byte buf[], int off, int len) throws IOException {IOException ioe;boolean retryCurrentNode = true;while (true) {// retry as many times as seekToNewSource allows.try {return blockReader.read(buf, off, len); // 調(diào)用blockReader的read方法讀取字節(jié)數(shù)據(jù)到buf中} catch ( ChecksumException ce ) {LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode.getName() + " at " + ce.getPos());reportChecksumFailure(src, currentBlock, currentNode);ioe = ce;retryCurrentNode = false; // 只嘗試讀取當(dāng)前選擇的Datanode一次,失敗的話就會(huì)被加入到Client的dead node列表中} catch ( IOException e ) {if (!retryCurrentNode) {LOG.warn("Exception while reading from " + currentBlock + " of " + src + " from " + currentNode + ": " + StringUtils.stringifyException(e));}ioe = e;}boolean sourceFound = false;if (retryCurrentNode) {/* possibly retry the same node so that transient errors don't* result in application level failures (e.g. Datanode could have* closed the connection because the client is idle for too long).*/sourceFound = seekToBlockSource(pos);} else {addToDeadNodes(currentNode); // 加入到Client的dead node列表中sourceFound = seekToNewSource(pos); // 從當(dāng)前選擇的Datanode上讀取數(shù)據(jù)失敗,會(huì)再次選擇一個(gè)Datanode,這里seekToNewSource方法內(nèi)部調(diào)用了blockSeekTo方法去選擇一個(gè)Datanode}if (!sourceFound) {throw ioe;}retryCurrentNode = false;}}
通過BlockReaderLocal或者RemoteBlockReader來(lái)讀取block數(shù)據(jù),邏輯非常類似,主要是控制讀取字節(jié)的偏移量,記錄偏移量的狀態(tài)信息,詳細(xì)可以查看它們的源碼。(原創(chuàng)時(shí)延軍(包含鏈接:http://shiyanjun.cn))
DataNode節(jié)點(diǎn)處理讀文件Block請(qǐng)求
我們可以在DataNode端看一下,如何處理一個(gè)讀取Block的請(qǐng)求。如果Client與DataNode不是同一個(gè)節(jié)點(diǎn),則為遠(yuǎn)程讀取文件Block,首先Client需要發(fā)送一個(gè)請(qǐng)求頭信息,代碼如下所示:
//write the header.out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); // Client與Datanode之間傳輸數(shù)據(jù)的版本號(hào)out.write( DataTransferProtocol.OP_READ_BLOCK ); // 傳輸操作類型:讀取blockout.writeLong( blockId ); // block IDout.writeLong( genStamp ); // 時(shí)間戳信息out.writeLong( startOffset ); // block起始偏移量out.writeLong( len ); // block長(zhǎng)度Text.writeString(out, clientName); // 客戶端標(biāo)識(shí)accessToken.write(out);out.flush();
DataNode節(jié)點(diǎn)端通過驗(yàn)證數(shù)據(jù)傳輸版本號(hào)(DataTransferProtocol.DATA_TRANSFER_VERSION)一致以后,會(huì)判斷傳輸操作類型,如果是讀操作DataTransferProtocol.OP_READ_BLOCK,則會(huì)通過Client建立的Socket來(lái)創(chuàng)建一個(gè)OutputStream對(duì)象,然后通過BlockSender向Client發(fā)送Block數(shù)據(jù),代碼如下所示:
try {blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); // 創(chuàng)建BlockSender對(duì)象} catch(IOException e) {out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);throw e;}out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // 回復(fù)一個(gè)響應(yīng)Header信息:成功狀態(tài)long read = blockSender.sendBlock(out, baseStream, null); // 發(fā)送請(qǐng)求的Block數(shù)據(jù)覺得文章不錯(cuò)的話,可以轉(zhuǎn)發(fā)文章關(guān)注一下小編,之后持續(xù)更新干貨文章~~
希望能夠幫助到大家的學(xué)習(xí)。
以上就是關(guān)于pos機(jī)架,帶你HDFS讀文件過程分析的知識(shí),后面我們會(huì)繼續(xù)為大家整理關(guān)于pos機(jī)架的知識(shí),希望能夠幫助到大家!
