This commit is contained in:
Colin Marc 2014-10-07 00:41:15 +02:00
Родитель 88179e7921
Коммит b0abebbdf2
6 изменённых файлов: 256 добавлений и 287 удалений

Просмотреть файл

@ -10,7 +10,7 @@ import (
// A Client represents a connection to an HDFS cluster
type Client struct {
namenode *rpc.Connection
namenode *rpc.NamenodeConnection
}
// New returns a connected Client, or an error if it can't connect
@ -20,7 +20,7 @@ func New(address string) (*Client, error) {
return nil, err
}
namenode, err := rpc.NewConnection(address, currentUser.Username)
namenode, err := rpc.NewNamenodeConnection(address, currentUser.Username)
if err != nil {
return nil, err
}

Просмотреть файл

@ -2,9 +2,9 @@ package hdfs
import (
"github.com/stretchr/testify/assert"
"log"
"os"
"testing"
"log"
)
func getClient(t *testing.T) *Client {

Просмотреть файл

@ -9,7 +9,7 @@ import (
// FileInfo implements os.FileInfo, and provides information about a file or
// directory in HDFS.
type FileInfo struct {
name string
name string
status *hdfs.HdfsFileStatusProto
}
@ -26,7 +26,7 @@ func (fi *FileInfo) Mode() os.FileMode {
}
func (fi *FileInfo) ModTime() time.Time {
return time.Unix(int64(fi.status.GetModificationTime()) / 1000, 0)
return time.Unix(int64(fi.status.GetModificationTime())/1000, 0)
}
func (fi *FileInfo) IsDir() bool {

Просмотреть файл

@ -1,94 +0,0 @@
package rpc
import (
"bufio"
"code.google.com/p/goprotobuf/proto"
"fmt"
"net"
"sync"
"time"
)
const connectionTimeout = 5 * time.Second
type Connection struct {
callId int
user string
conn net.Conn
reqLock sync.Mutex
}
// NewConnection creates a new connection to a Namenode, and preforms an initial
// handshake.
//
// You probably want to use hdfs.New instead, which provides a higher-level
// interface.
func NewConnection(address string, user string) (*Connection, error) {
conn, err := net.DialTimeout("tcp", address, connectionTimeout)
if err != nil {
return nil, err
}
return WrapConnection(conn, user)
}
// WrapConnection wraps an existing net.Conn to a Namenode, and preforms an
// initial handshake.
//
// You probably want to use hdfs.New instead, which provides a higher-level
// interface.
func WrapConnection(conn net.Conn, user string) (*Connection, error) {
c := &Connection{
user: user,
conn: conn,
}
err := c.handshake()
if err != nil {
conn.Close()
return nil, fmt.Errorf("Error performing handshake: %s", err)
}
return c, nil
}
// Execute performs an rpc call. It does this by sending req over the wire and
// unmarshaling the result into resp.
func (c *Connection) Execute(method string, req proto.Message, resp proto.Message) error {
c.reqLock.Lock()
defer c.reqLock.Unlock()
c.callId = (c.callId + 1) % 9
reqBytes, err := makeRequest(c.callId, method, req)
if err != nil {
return err
}
_, err = c.conn.Write(reqBytes)
if err != nil {
c.conn.Close()
return err
}
err = readResponse(c.callId, bufio.NewReader(c.conn), resp)
if err != nil {
c.conn.Close() // TODO don't close on RPC failure
return err
}
return nil
}
func (c *Connection) handshake() error {
handshakeBytes, err := makeConnectionHandshake(c.user)
if err != nil {
return err
}
_, err = c.conn.Write(handshakeBytes)
if err != nil {
return err
}
return nil
}

213
rpc/namenode.go Normal file
Просмотреть файл

@ -0,0 +1,213 @@
package rpc
import (
"code.google.com/p/goprotobuf/proto"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
hadoop "github.com/colinmarc/hdfs/protocol/hadoop_common"
"net"
"sync"
"time"
)
const (
rpcVersion = 0x09
serviceClass = 0x0
authProtocol = 0x0
protocolClass = "org.apache.hadoop.hdfs.protocol.ClientProtocol"
protocolClassVersion = 1
handshakeCallId = -3
connectionTimeout = 5 * time.Second
)
var clientId = randomClientId()
type NamenodeConnection struct {
currentRequestId int
user string
conn net.Conn
reqLock sync.Mutex
}
// NewNamenodeConnection creates a new connection to a Namenode, and preforms an initial
// handshake.
//
// You probably want to use hdfs.New instead, which provides a higher-level
// interface.
func NewNamenodeConnection(address, user string) (*NamenodeConnection, error) {
conn, err := net.DialTimeout("tcp", address, connectionTimeout)
if err != nil {
return nil, err
}
return WrapNamenodeConnection(conn, user)
}
// WrapNamenodeConnection wraps an existing net.Conn to a Namenode, and preforms an
// initial handshake.
//
// You probably want to use hdfs.New instead, which provides a higher-level
// interface.
func WrapNamenodeConnection(conn net.Conn, user string) (*NamenodeConnection, error) {
c := &NamenodeConnection{
user: user,
conn: conn,
}
err := c.writeNamenodeHandshake()
if err != nil {
conn.Close()
return nil, fmt.Errorf("Error performing handshake: %s", err)
}
return c, nil
}
// Execute performs an rpc call. It does this by sending req over the wire and
// unmarshaling the result into resp.
func (c *NamenodeConnection) Execute(method string, req proto.Message, resp proto.Message) error {
c.reqLock.Lock()
defer c.reqLock.Unlock()
c.currentRequestId = (c.currentRequestId + 1) % 9
err := c.writeRequest(method, req)
if err != nil {
c.conn.Close()
return err
}
err = c.readResponse(resp)
if err != nil {
c.conn.Close() // TODO don't close on RPC failure
return err
}
return nil
}
// RPC definitions
// A request packet:
// +-----------------------------------------------------------+
// | uint32 length of the next three parts |
// +-----------------------------------------------------------+
// | varint length + RpcRequestHeaderProto |
// +-----------------------------------------------------------+
// | varint length + RequestHeaderProto |
// +-----------------------------------------------------------+
// | varint length + Request |
// +-----------------------------------------------------------+
func (c *NamenodeConnection) writeRequest(method string, req proto.Message) error {
rrh := newRpcRequestHeader(c.currentRequestId)
rh := newRequestHeader(method)
reqBytes, err := makePacket(rrh, rh, req)
if err != nil {
return err
}
_, err = c.conn.Write(reqBytes)
return err
}
// A response from the namenode:
// +-----------------------------------------------------------+
// | uint32 length of the next two parts |
// +-----------------------------------------------------------+
// | varint length + RpcResponseHeaderProto |
// +-----------------------------------------------------------+
// | varint length + Response |
// +-----------------------------------------------------------+
func (c *NamenodeConnection) readResponse(resp proto.Message) error {
var packetLength uint32
err := binary.Read(c.conn, binary.BigEndian, &packetLength)
if err != nil {
return err
}
packet := make([]byte, packetLength)
_, err = c.conn.Read(packet)
if err != nil {
return err
}
rrh := &hadoop.RpcResponseHeaderProto{}
err = parsePacket(packet, rrh, resp)
if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS {
return errors.New("TODO failed rpc call")
} else if int(rrh.GetCallId()) != c.currentRequestId {
return errors.New("Error reading response: unexpected sequence number")
}
return nil
}
// A handshake packet:
// +-----------------------------------------------------------+
// | Header, 4 bytes ("hrpc") |
// +-----------------------------------------------------------+
// | Version, 1 byte (default verion 0x09) |
// +-----------------------------------------------------------+
// | RPC service class, 1 byte (0x00) |
// +-----------------------------------------------------------+
// | Auth protocol, 1 byte (Auth method None = 0x00) |
// +-----------------------------------------------------------+
// | uint32 length of the next two parts |
// +-----------------------------------------------------------+
// | varint length + RpcRequestHeaderProto |
// +-----------------------------------------------------------+
// | varint length + IpcConnectionContextProto |
// +-----------------------------------------------------------+
func (c *NamenodeConnection) writeNamenodeHandshake() error {
rpcHeader := []byte{
0x68, 0x72, 0x70, 0x63, // "hrpc"
rpcVersion, serviceClass, authProtocol,
}
rrh := newRpcRequestHeader(handshakeCallId)
cc := newConnectionContext(c.user)
packet, err := makePacket(rrh, cc)
if err != nil {
return err
}
_, err = c.conn.Write(append(rpcHeader, packet...))
return err
}
func newRpcRequestHeader(id int) *hadoop.RpcRequestHeaderProto {
return &hadoop.RpcRequestHeaderProto{
RpcKind: hadoop.RpcKindProto_RPC_PROTOCOL_BUFFER.Enum(),
RpcOp: hadoop.RpcRequestHeaderProto_RPC_FINAL_PACKET.Enum(),
CallId: proto.Int32(int32(id)),
ClientId: clientId,
}
}
func newRequestHeader(methodName string) *hadoop.RequestHeaderProto {
return &hadoop.RequestHeaderProto{
MethodName: proto.String(methodName),
DeclaringClassProtocolName: proto.String(protocolClass),
ClientProtocolVersion: proto.Uint64(uint64(protocolClassVersion)),
}
}
func newConnectionContext(user string) *hadoop.IpcConnectionContextProto {
return &hadoop.IpcConnectionContextProto{
UserInfo: &hadoop.UserInformationProto{
EffectiveUser: proto.String(user),
},
Protocol: proto.String(protocolClass),
}
}
func randomClientId() []byte {
uuid := make([]byte, 16)
rand.Read(uuid)
return uuid
}

Просмотреть файл

@ -3,210 +3,60 @@ package rpc
import (
"bytes"
"code.google.com/p/goprotobuf/proto"
"crypto/rand"
"encoding/binary"
"errors"
hadoop "github.com/colinmarc/hdfs/protocol/hadoop_common"
"io"
)
const (
rpcVersion = 0x09
serviceClass = 0x0
authProtocol = 0x0
protocolClass = "org.apache.hadoop.hdfs.protocol.ClientProtocol"
protocolClassVersion = 1
handshakeCallId = -3
)
const clientName = "go-hdfs"
var clientId = randomClientId()
// A request packet:
// +---------------------------------------------------------------------+
// | uint32 length of the next three parts |
// +---------------------------------------------------------------------+
// | varint length + RpcRequestHeaderProto |
// +---------------------------------------------------------------------+
// | varint length + RequestHeaderProto |
// +---------------------------------------------------------------------+
// | varint length + Request |
// +---------------------------------------------------------------------+
func makeRequest(callId int, method string, msg proto.Message) ([]byte, error) {
rrh, err := makeRpcRequestHeader(callId)
func makeDelimitedMsg(msg proto.Message) ([]byte, error) {
msgBytes, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
rh, err := makeRequestHeader(method)
if err != nil {
return nil, err
}
req, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
rrhLength := proto.EncodeVarint(uint64(len(rrh)))
rhLength := proto.EncodeVarint(uint64(len(rh)))
reqLength := proto.EncodeVarint(uint64(len(req)))
lengthTotal := len(rrhLength) + len(rrh) +
len(rhLength) + len(rh) +
len(reqLength) + len(req)
packetLength := make([]byte, 4)
binary.BigEndian.PutUint32(packetLength, uint32(lengthTotal))
buf := new(bytes.Buffer)
buf.Grow(lengthTotal + 4)
buf.Write(packetLength)
buf.Write(rrhLength)
buf.Write(rrh)
buf.Write(rhLength)
buf.Write(rh)
buf.Write(reqLength)
buf.Write(req)
return buf.Bytes(), nil
lengthBytes := make([]byte, 10)
n := binary.PutUvarint(lengthBytes, uint64(len(msgBytes)))
return append(lengthBytes[:n], msgBytes...), nil
}
// A response from the namenode:
// +-----------------------------------------------------------+
// | Length of the RPC resonse (4 bytes/32 bit int) |
// +-----------------------------------------------------------+
// | varint length + RpcResponseHeaderProto |
// +-----------------------------------------------------------+
// | varint length + Response |
// +-----------------------------------------------------------+
func readResponse(callId int, reader io.Reader, msg proto.Message) error {
var responseLength uint32
err := binary.Read(reader, binary.BigEndian, &responseLength)
if err != nil {
return err
func makePacket(msgs ...proto.Message) ([]byte, error) {
packet := make([]byte, 4, 128)
length := 0
for _, msg := range msgs {
b, err := makeDelimitedMsg(msg)
if err != nil {
return nil, err
}
packet = append(packet, b...)
length += len(b)
}
response := make([]byte, responseLength)
_, err = reader.Read(response)
if err != nil {
return err
}
binary.BigEndian.PutUint32(packet, uint32(length))
return packet, nil
}
rrhLength, n := proto.DecodeVarint(response)
if n == 0 {
return errors.New("Error reading response: unexpected EOF")
}
// Doesn't include the uint32 length
func parsePacket(b []byte, msgs ...proto.Message) error {
reader := bytes.NewReader(b)
for _, msg := range msgs {
msgLength, err := binary.ReadUvarint(reader)
if err != nil {
return err
}
rrh := &hadoop.RpcResponseHeaderProto{}
err = proto.Unmarshal(response[n:int(rrhLength)+n], rrh)
if err != nil {
return err
} else if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS {
return errors.New("TODO failed rpc call")
} else if int(rrh.GetCallId()) != callId {
return errors.New("Error reading response: unexpected sequence number")
}
msgBytes := make([]byte, msgLength)
_, err = reader.Read(msgBytes)
if err != nil {
return err
}
response = response[int(rrhLength)+n:]
msgLength, n := proto.DecodeVarint(response)
if n == 0 {
return errors.New("Error reading response: unexpected EOF")
}
err = proto.Unmarshal(response[n:int(msgLength)+n], msg)
if err != nil {
return err
err = proto.Unmarshal(msgBytes, msg)
if err != nil {
return err
}
}
return nil
}
// A handshake packet:
// +---------------------------------------------------------------------+
// | Header, 4 bytes ("hrpc") |
// +---------------------------------------------------------------------+
// | Version, 1 byte (default verion 9) |
// +---------------------------------------------------------------------+
// | RPC service class, 1 byte (0x00) |
// +---------------------------------------------------------------------+
// | Auth protocol, 1 byte (Auth method None = 0x0) |
// +---------------------------------------------------------------------+
// | uint32 length of the next two parts |
// +---------------------------------------------------------------------+
// | varint length + RpcRequestHeaderProto |
// +---------------------------------------------------------------------+
// | varint length + IpcConnectionContextProto |
// +---------------------------------------------------------------------+
func makeConnectionHandshake(user string) ([]byte, error) {
rpcHeader := []byte{
0x68, 0x72, 0x70, 0x63, // "hrpc"
rpcVersion, serviceClass, authProtocol,
}
buf := bytes.NewBuffer(rpcHeader)
rrh, err := makeRpcRequestHeader(handshakeCallId)
if err != nil {
return nil, err
}
cc, err := makeConnectionContext(user)
if err != nil {
return nil, err
}
rrhLength := proto.EncodeVarint(uint64(len(rrh)))
ccLength := proto.EncodeVarint(uint64(len(cc)))
lengthTotal := len(rrhLength) + len(rrh) + len(ccLength) + len(cc)
packetLength := make([]byte, 4)
binary.BigEndian.PutUint32(packetLength, uint32(lengthTotal))
buf.Grow(lengthTotal + 4)
buf.Write(packetLength)
buf.Write(rrhLength)
buf.Write(rrh)
buf.Write(ccLength)
buf.Write(cc)
return buf.Bytes(), nil
}
func makeRpcRequestHeader(callId int) ([]byte, error) {
rrh := &hadoop.RpcRequestHeaderProto{
RpcKind: hadoop.RpcKindProto_RPC_PROTOCOL_BUFFER.Enum(),
RpcOp: hadoop.RpcRequestHeaderProto_RPC_FINAL_PACKET.Enum(),
CallId: proto.Int32(int32(callId)),
ClientId: clientId,
}
return proto.Marshal(rrh)
}
func makeRequestHeader(methodName string) ([]byte, error) {
rh := &hadoop.RequestHeaderProto{
MethodName: proto.String(methodName),
DeclaringClassProtocolName: proto.String(protocolClass),
ClientProtocolVersion: proto.Uint64(uint64(protocolClassVersion)),
}
return proto.Marshal(rh)
}
func makeConnectionContext(user string) ([]byte, error) {
cc := &hadoop.IpcConnectionContextProto{
UserInfo: &hadoop.UserInformationProto{
EffectiveUser: proto.String(user),
},
Protocol: proto.String(protocolClass),
}
return proto.Marshal(cc)
}
func randomClientId() []byte {
uuid := make([]byte, 16)
rand.Read(uuid)
return uuid
}