This commit is contained in:
Mathias Stearn 2009-11-11 13:43:44 -05:00
Родитель 783bc0ec1f
Коммит 8903dd3753
3 изменённых файлов: 97 добавлений и 16 удалений

Просмотреть файл

@ -10,6 +10,8 @@
#include <stdlib.h>
#include <unistd.h>
#define CHECK(x) if (!(x)) return 0
/* ----------------------------
message stuff
------------------------------ */
@ -48,8 +50,7 @@ void mongo_message_send(const int sock, const mongo_message* mm){
mongo_message * mongo_message_create( int len , int id , int responseTo , int op ){
mongo_message * mm = (mongo_message*)malloc( len );
if ( ! mm )
return 0;
CHECK(mm);
if (!id)
id = rand();
@ -146,6 +147,11 @@ char * mongo_data_append32( char * start , const void * data){
return start + 4;
}
char * mongo_data_append64( char * start , const void * data){
bson_swap_endian64( start , data );
return start + 8;
}
mongo_reply * mongo_read_response( mongo_connection * conn ){
mongo_header head; /* header from network */
mongo_reply_fields fields; /* header from network */
@ -156,14 +162,14 @@ mongo_reply * mongo_read_response( mongo_connection * conn ){
looping_read(conn->sock, &fields, sizeof(fields));
bson_swap_endian32(&len, &head.len);
out = (mongo_reply*)malloc(len);
if (out) return NULL;
CHECK(out = (mongo_reply*)malloc(len));
out->head.len = len;
bson_swap_endian32(&out->head.id, &head.id);
bson_swap_endian32(&out->head.responseTo, &head.responseTo);
bson_swap_endian32(&out->head.op, &head.op);
bson_swap_endian32(&out->fields.flag, &fields.flag);
bson_swap_endian32(&out->fields.cursorID, &fields.cursorID);
bson_swap_endian32(&out->fields.start, &fields.start);
bson_swap_endian32(&out->fields.num, &fields.num);
@ -173,15 +179,18 @@ mongo_reply * mongo_read_response( mongo_connection * conn ){
return out;
}
void mongo_query( mongo_connection * conn , const char * ns , bson * query , bson * fields , int nToReturn , int nToSkip , int options ){
mongo_cursor* mongo_query(mongo_connection* conn, const char* ns, bson* query, bson* fields, int nToReturn, int nToSkip, int options){
int sl;
mongo_cursor * cursor;
char * data;
mongo_message * mm = mongo_message_create( 16 +
4 + /* options */
strlen( ns ) + 1 + /* ns */
4 + 4 + /* skip,return */
bson_size( query ) +
bson_size( fields ) ,
0 , 0 , mongo_op_query );
mongo_message * mm = mongo_message_create( 16 + /* header */
4 + /* options */
strlen( ns ) + 1 + /* ns */
4 + 4 + /* skip,return */
bson_size( query ) +
bson_size( fields ) ,
0 , 0 , mongo_op_query );
data = &mm->data;
data = mongo_data_append32( data , &options );
@ -196,6 +205,26 @@ void mongo_query( mongo_connection * conn , const char * ns , bson * query , bso
mongo_message_send( conn->sock , mm );
free(mm);
CHECK(cursor = malloc(sizeof(cursor)));
cursor->mm = mongo_read_response(conn);
if (!cursor->mm){
free(cursor);
return 0;
}
sl = strlen(ns)+1;
cursor->ns = malloc(sl);
if (!cursor->ns){
free(cursor->mm);
free(cursor);
return 0;
}
memcpy((void*)cursor->ns, ns, sl); /* cast needed to silence GCC warning */
cursor->conn = conn;
cursor->index = -1;
return cursor;
}
int mongo_disconnect( mongo_connection * conn ){
@ -221,3 +250,50 @@ void mongo_exit_on_error( int ret ){
printf( "unexpeted error: %d\n" , ret );
exit(ret);
}
bson_bool_t mongo_cursor_get_more(mongo_cursor* cursor){
char* data;
int sl = strlen(cursor->ns)+1;
int zero = 0;
mongo_message * mm = mongo_message_create(16 /*header*/
+4 /*ZERO*/
+sl
+4 /*numToReturn*/
+8 /*cursorID*/
, 0, 0, mongo_op_get_more);
CHECK(mm);
data = &mm->data;
data = mongo_data_append32(data, &zero);
data = mongo_data_append(data, cursor->ns, sl);
data = mongo_data_append32(data, &zero);
data = mongo_data_append64(data, &cursor->mm->fields.cursorID);
mongo_message_send(cursor->conn->sock, mm);
free(mm);
free(cursor->mm);
CHECK(cursor->mm = mongo_read_response(cursor->conn));
return cursor->mm->fields.num;
}
bson_bool_t mongo_cursor_next(mongo_cursor* cursor){
if (cursor->mm->fields.num == 0)
return 0;
if (cursor->index == (cursor->mm->fields.start + cursor->mm->fields.num)){
if (!mongo_cursor_get_more(cursor))
return 0;
bson_init(&cursor->current, &cursor->mm->objs, 0);
}
if (cursor->index == -1){
bson_init(&cursor->current, &cursor->mm->objs, 0);
}else{
char* bson_addr = cursor->current.data + bson_size(&cursor->current);
bson_init(&cursor->current, bson_addr, 0);
}
cursor->index++;
return 1;
}

Просмотреть файл

@ -23,6 +23,7 @@ typedef struct {
int connected;
} mongo_connection;
#pragma pack(1)
typedef struct {
int len;
int id;
@ -36,6 +37,7 @@ typedef struct {
} mongo_message;
typedef struct {
int flag; /* non-zero on failure */
int64_t cursorID;
int start;
int num;
@ -46,11 +48,14 @@ typedef struct {
mongo_reply_fields fields;
char objs;
} mongo_reply;
#pragma pack()
typedef struct {
mongo_reply * mm; /* message is owned by cursor */
mongo_connection * conn; /* connection is *not* owned by cursor */
const char* ns; /* owned by cursor */
bson current;
int index;
} mongo_cursor;
enum mongo_operations {
@ -84,7 +89,7 @@ int mongo_destory( mongo_connection * conn );
int mongo_insert( mongo_connection * conn , const char * ns , bson * data );
int mongo_insert_batch( mongo_connection * conn , const char * ns , bson ** data , int num );
void mongo_query(mongo_connection* conn, const char* ns, bson* query, bson* fields ,int nToReturn ,int nToSkip, int options);
mongo_cursor* mongo_query(mongo_connection* conn, const char* ns, bson* query, bson* fields ,int nToReturn ,int nToSkip, int options);
/* ----------------------------
HIGHER LEVEL - indexes - command helpers eval

Просмотреть файл

@ -20,11 +20,11 @@ int main(){
ASSERT(sizeof(int64_t) == 8);
ASSERT(sizeof(double) == 8);
ASSERT(sizeof(mongo_header) == 16);
ASSERT(sizeof(mongo_reply_fields) == 16);
ASSERT(sizeof(mongo_header) == 4+4+4+4);
ASSERT(sizeof(mongo_reply_fields) == 4+8+4+4);
/* field offset of obj in mongo_reply */
ASSERT((&mr.objs - (char*)&mr) == 32);
ASSERT((&mr.objs - (char*)&mr) == (4+4+4+4 + 4+8+4+4));
return 0;
}