133 строки
4.7 KiB
Go
133 строки
4.7 KiB
Go
// Copyright (c) Microsoft. All rights reserved.
|
|
// Licensed under the MIT license. See LICENSE file in the project root for details.
|
|
package main
|
|
|
|
import (
|
|
"bazil.org/fuse"
|
|
"errors"
|
|
"golang.org/x/net/context"
|
|
"io"
|
|
)
|
|
|
|
// Encapsulates state and routines for reading data from the file handle
|
|
// FileHandleReader implements simple two-buffer scheme which allows to efficiently
|
|
// handle unordered reads which aren't far away from each other, so backend stream can
|
|
// be read sequentially without seek
|
|
type FileHandleReader struct {
|
|
Handle *FileHandle // File handle
|
|
HdfsReader ReadSeekCloser // Backend reader
|
|
Offset int64 // Current offset for backend reader
|
|
Buffer1 *FileFragment // Most recent fragment from the backend reader
|
|
Buffer2 *FileFragment // Least recent fragment read from the backend
|
|
Holes int64 // tracks number of encountered "holes" TODO: find better name
|
|
CacheHits int64 // tracks number of cache hits (read requests from buffer)
|
|
Seeks int64 // tracks number of seeks performed on the backend stream
|
|
}
|
|
|
|
// Opens the reader (creates backend reader)
|
|
func NewFileHandleReader(handle *FileHandle) (*FileHandleReader, error) {
|
|
this := &FileHandleReader{Handle: handle}
|
|
var err error
|
|
this.HdfsReader, err = handle.File.FileSystem.HdfsAccessor.OpenRead(handle.File.AbsolutePath())
|
|
if err != nil {
|
|
Error.Println("[", handle.File.AbsolutePath(), "] Opening: ", err)
|
|
return nil, err
|
|
}
|
|
this.Buffer1 = &FileFragment{}
|
|
this.Buffer2 = &FileFragment{}
|
|
return this, nil
|
|
}
|
|
|
|
// Responds on FUSE Read request. Note: If FUSE requested to read N bytes it expects exactly N, unless EOF
|
|
func (this *FileHandleReader) Read(handle *FileHandle, ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
|
totalRead := 0
|
|
buf := resp.Data[0:req.Size]
|
|
fileOffset := req.Offset
|
|
var nr int
|
|
var err error
|
|
for len(buf) > 0 {
|
|
nr, err = this.ReadPartial(handle, fileOffset, buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
totalRead += nr
|
|
fileOffset += int64(nr)
|
|
buf = buf[nr:]
|
|
}
|
|
resp.Data = resp.Data[0:totalRead]
|
|
if err == io.EOF {
|
|
// EOF isn't a error, reporting successful read to FUSE
|
|
return nil
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var BLOCKSIZE int = 65536
|
|
|
|
// Reads chunk of data (satisfies part of FUSE read request)
|
|
func (this *FileHandleReader) ReadPartial(handle *FileHandle, fileOffset int64, buf []byte) (int, error) {
|
|
// First checking whether we can satisfy request from buffered file fragments
|
|
var nr int
|
|
if this.Buffer1.ReadFromBuffer(fileOffset, buf, &nr) || this.Buffer2.ReadFromBuffer(fileOffset, buf, &nr) {
|
|
this.CacheHits++
|
|
return nr, nil
|
|
}
|
|
|
|
// None of the buffers has the data to satisfy the request, we're going to read more data from backend into Buffer1
|
|
|
|
// Before doing that, swapping buffers to keep MRU/LRU invariant
|
|
this.Buffer2, this.Buffer1 = this.Buffer1, this.Buffer2
|
|
|
|
maxBytesToRead := len(buf)
|
|
minBytesToRead := 1
|
|
|
|
if fileOffset != this.Offset {
|
|
// We're reading not from the offset expected by the backend stream
|
|
// we need to decide whether we do Seek(), or read the skipped data (refered as "hole" below)
|
|
if fileOffset > this.Offset && fileOffset-this.Offset <= int64(BLOCKSIZE*2) {
|
|
holeSize := int(fileOffset - this.Offset)
|
|
this.Holes++
|
|
maxBytesToRead += holeSize // we're going to read the "hole"
|
|
minBytesToRead = holeSize + 1 // we need to read at least one byte starting from requested offset
|
|
} else {
|
|
this.Seeks++
|
|
err := this.HdfsReader.Seek(fileOffset)
|
|
// If seek error happens, return err. Seek to the end of the file is not an error.
|
|
if err != nil && this.Offset > fileOffset{
|
|
Error.Println("[seek", handle.File.AbsolutePath(), " @offset:", this.Offset, "] Seek error to", fileOffset, "(file offset):", err.Error())
|
|
return 0, err
|
|
}
|
|
this.Offset = fileOffset
|
|
}
|
|
}
|
|
|
|
// Ceiling to the nearest BLOCKSIZE
|
|
maxBytesToRead = (maxBytesToRead + BLOCKSIZE - 1) / BLOCKSIZE * BLOCKSIZE
|
|
|
|
// Reading from backend into Buffer1
|
|
err := this.Buffer1.ReadFromBackend(this.HdfsReader, &this.Offset, minBytesToRead, maxBytesToRead)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
Warning.Println("[", handle.File.AbsolutePath(), "] EOF @", this.Offset)
|
|
return 0, err
|
|
}
|
|
return 0, err
|
|
}
|
|
// Now Buffer1 has the data to satisfy request
|
|
if !this.Buffer1.ReadFromBuffer(fileOffset, buf, &nr) {
|
|
return 0, errors.New("INTERNAL ERROR: FileFragment invariant")
|
|
}
|
|
return nr, nil
|
|
}
|
|
|
|
// Closes the reader
|
|
func (this *FileHandleReader) Close() error {
|
|
if this.HdfsReader != nil {
|
|
Info.Println("[", this.Handle.File.AbsolutePath(), "] ReadStats: holes:", this.Holes, ", cache hits:", this.CacheHits, ", hard seeks:", this.Seeks)
|
|
this.HdfsReader.Close()
|
|
this.HdfsReader = nil
|
|
}
|
|
return nil
|
|
}
|