This commit is contained in:
MarcoF1 2021-08-21 00:14:00 +00:00
Родитель 8561d89d5e
Коммит 19b9ce2424
5 изменённых файлов: 311 добавлений и 27 удалений

Просмотреть файл

@ -86,3 +86,8 @@
#define DEFAULT_CONSOLE_LOG_FILE_NAME "ntttcp-for-linux-log.log"
#define DEFAULT_XML_LOG_FILE_NAME "ntttcp-for-linux-log.xml"
#define DEFAULT_JSON_LOG_FILE_NAME "ntttcp-for-linux-log.json"
/* io_uring constants */
#define QUEUE_DEPTH 256
#define READ_SZ 8192
#define MAX_THREADS 512

Просмотреть файл

@ -185,6 +185,55 @@ int run_ntttcp_sender(struct ntttcp_test_endpoint *tep)
return err_code;
}
struct thread_count {
struct ntttcp_test *test;
uint t; // thread currently on
uint *threads_created;
};
static struct ntttcp_stream_server *create_server_stream(struct ntttcp_test_endpoint *tep, struct thread_count *counter, int first_work_queue_fd, pthread_barrier_t *init_barrier)
{
struct ntttcp_test *test = counter->test;
uint t = counter->t;
uint* threads_created = counter->threads_created;
int err_code = NO_ERROR;
struct ntttcp_stream_server *ss = malloc(sizeof(struct ntttcp_stream_server));
int rc;
ss = tep->server_streams[t];
ss->server_port = test->server_base_port + t;
ss->stream_server_num = t;
ss->first_work_queue_fd = first_work_queue_fd;
if(ss->stream_server_num > 0) {
ss->rings[0].ring_fd = first_work_queue_fd;
}
ss->init_barrier_pt = init_barrier;
if (test->protocol == TCP) {
rc = pthread_create(&tep->threads[t],
NULL,
run_ntttcp_receiver_tcp_stream,
(void*)ss);
}
else {
pthread_barrier_wait(&ss->init_barrier);
rc = pthread_create(&tep->threads[t],
NULL,
run_ntttcp_receiver_udp_stream,
(void*)ss);
}
if (rc) {
PRINT_ERR("pthread_create() create thread failed");
err_code = ERROR_PTHREAD_CREATE;
//continue;
}
(*threads_created)++;
return ss;
}
int run_ntttcp_receiver(struct ntttcp_test_endpoint *tep)
{
int err_code = NO_ERROR;
@ -195,28 +244,37 @@ int run_ntttcp_receiver(struct ntttcp_test_endpoint *tep)
struct ntttcp_stream_server *ss;
int rc;
struct thread_count counter = { .test = tep->test, .threads_created = &threads_created }; // initialize params for method call
if (!check_is_ip_addr_valid_local(test->domain, test->bind_address)) {
PRINT_ERR("cannot listen on the IP address specified");
return ERROR_ARGS;
}
/* create test threads */
int first_work_queue_fd = -1;
/* pthread barrier meant for io_uring since we want the first thread to be created before the next ones to share a work queue */
pthread_barrier_t init_barrier;
pthread_barrier_init(&init_barrier, NULL, 2);
/* create test threads */
for (t = 0; t < test->server_ports; t++) {
ss = tep->server_streams[t];
ss->server_port = test->server_base_port + t;
if (test->protocol == TCP) {
rc = pthread_create(&tep->threads[t], NULL, run_ntttcp_receiver_tcp_stream, (void *)ss);
} else {
rc = pthread_create(&tep->threads[t], NULL, run_ntttcp_receiver_udp_stream, (void *)ss);
counter.t = t;
struct ntttcp_stream_server *ss = create_server_stream(tep, &counter, first_work_queue_fd, &init_barrier);
if (!ss) {
PRINT_ERR("ntttcp_stream server creation failed..." );
}
if (rc) {
PRINT_ERR("pthread_create() create thread failed");
err_code = ERROR_PTHREAD_CREATE;
continue;
if (t == 0) {
pthread_barrier_wait(&init_barrier);
first_work_queue_fd = ss->rings[0].ring_fd;
}
else {
ss->first_work_queue_fd = first_work_queue_fd;
}
threads_created++;
}
/* create synch thread; and put it to the end of the thread array */
@ -387,4 +445,4 @@ int main(int argc, char **argv)
free_ntttcp_test_endpoint_and_test(tep);
return err_code;
}
}

Просмотреть файл

@ -15,6 +15,7 @@
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <liburing.h> /* io_uring library */
#include "const.h"
#include "logger.h"
@ -127,8 +128,15 @@ struct ntttcp_stream_server{
bool verbose;
bool use_epoll;
/* io_uring parameters */
/* io_uring stuff here */
uint stream_server_num;
struct io_uring rings[MAX_THREADS];
int work_queue_fd, first_work_queue_fd;
bool use_iouring;
/* blocking barrier so that all subsequent threads are mad after the first one */
pthread_barrier_t init_barrier;
pthread_barrier_t *init_barrier_pt;
int listener; /* this is the socket to listen on port to accept new connections */
int max_fd; /* track the max socket fd */

Просмотреть файл

@ -319,13 +319,16 @@ int ntttcp_server_listen(struct ntttcp_stream_server *ss)
close(sockfd);
return -1;
}
if (set_socket_non_blocking(sockfd) == -1) {
ASPRINTF(&log, "cannot set socket as non-blocking: %d", sockfd);
PRINT_ERR_FREE(log);
freeaddrinfo(serv_info);
free(local_addr_str);
close(sockfd);
return -1;
/* can't set a nonblocking socket with io_uring */
if(!ss->use_iouring) {
if (set_socket_non_blocking(sockfd) == -1) {
ASPRINTF(&log, "cannot set socket as non-blocking: %d", sockfd);
PRINT_ERR_FREE(log);
freeaddrinfo(serv_info);
free(local_addr_str);
close(sockfd);
return -1;
}
}
if ((i = bind(sockfd, p->ai_addr, p->ai_addrlen)) < 0) {
ASPRINTF(&log,
@ -372,6 +375,11 @@ int ntttcp_server_listen(struct ntttcp_stream_server *ss)
int ntttcp_server_epoll(struct ntttcp_stream_server *ss)
{
/* unblock the other threads */
if(ss->stream_server_num == 0){
pthread_barrier_wait(ss->init_barrier_pt);
}
int err_code = NO_ERROR;
char *log = NULL;
@ -532,6 +540,11 @@ int ntttcp_server_epoll(struct ntttcp_stream_server *ss)
int ntttcp_server_select(struct ntttcp_stream_server *ss)
{
/* unblock the other threads */
if(ss->stream_server_num == 0){
pthread_barrier_wait(ss->init_barrier_pt);
}
int err_code = NO_ERROR;
char *log = NULL;
@ -660,6 +673,204 @@ int ntttcp_server_select(struct ntttcp_stream_server *ss)
return err_code;
}
int add_accept_request(struct ntttcp_stream_server *ss, struct sockaddr_in *client_addr, socklen_t *client_addr_len) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ss->rings[ss->stream_server_num]);
io_uring_prep_accept(sqe, ss->listener, (struct sockaddr *) client_addr, client_addr_len, 0);
io_uring_sqe_set_data(sqe, (void*)(intptr_t)ss->listener);
//return io_uring_submit(&ss->rings[ss->stream_server_num]);
return 1;
}
int add_read_request(struct ntttcp_stream_server *ss, int client_socket, const struct iovec *buffer_iov) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ss->rings[ss->stream_server_num]);
/* Linux kernel 5.5 has support for readv, but not for recv() or read() */
//io_uring_prep_recv(sqe, client_socket, buffer_iov->iov_base, buffer_iov->iov_len, 0);
io_uring_prep_read_fixed(sqe, client_socket, buffer_iov->iov_base, buffer_iov->iov_len, 0, 0);
io_uring_sqe_set_data(sqe, (void*)(intptr_t)client_socket);
//return io_uring_submit(&ss->rings[ss->stream_server_num]);
return 1;
}
static int* init_registerfiles(struct ntttcp_stream_server *ss)
{
struct rlimit r;
int i, ret;
static int *files;
static int *registered_files;
ret = getrlimit(RLIMIT_NOFILE, &r);
if (ret < 0) {
fprintf(stderr, "getrlimit: %s\n", strerror(errno));
exit(1);
}
if (r.rlim_max > 32768)
r.rlim_max = 32768;
files = malloc(r.rlim_max * sizeof(int));
if (!files) {
fprintf(stderr, "calloc for registered files failed\n");
exit(1);
}
for (i = 0; i < (int)r.rlim_max; i++)
files[i] = -1;
registered_files = malloc(r.rlim_max * sizeof(int));
if (!registered_files) {
fprintf(stderr, "calloc failed\n");
exit(1);
}
for (i = 0; i < (int)r.rlim_max; i++)
registered_files[i] = -1;
ret = io_uring_register_files(&ss->rings[ss->stream_server_num], files, r.rlim_max);
if (ret < 0) {
fprintf(stderr, "%s: register %d\n", __FUNCTION__, ret);
exit(1);
}
return registered_files;
}
int ntttcp_server_iouring(struct ntttcp_stream_server *ss)
{
int err_code = NO_ERROR;
char *buffer; //receive buffer
char *ip_address_str;
int ip_addr_max_size;
/* io uring params */
struct io_uring_cqe *cqe;
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
if ( (buffer = (char *)malloc(ss->recv_buf_size)) == (char *)NULL) {
PRINT_ERR("cannot allocate memory for receive buffer");
return ERROR_MEMORY_ALLOC;
}
ip_addr_max_size = (ss->domain == AF_INET? INET_ADDRSTRLEN : INET6_ADDRSTRLEN);
if ( (ip_address_str = (char *)malloc(ip_addr_max_size)) == (char *)NULL) {
PRINT_ERR("cannot allocate memory for ip address of peer");
free(buffer);
return ERROR_MEMORY_ALLOC;
}
struct iovec buffer_iov = {.iov_base = buffer, .iov_len = ss->recv_buf_size};
/* can customize params based on flags later */
struct io_uring_params p = {};
int ret;
if(ss->stream_server_num > 0) {
p.wq_fd = ss->rings[0].ring_fd; //ss->first_work_queue_fd;
p.flags |= IORING_SETUP_ATTACH_WQ;
// initialize io uring queue and return an error if something goes wrong
ret = io_uring_queue_init_params(QUEUE_DEPTH, &ss->rings[ss->stream_server_num], &p);
}
else {
// initialize io uring queue and return an error if something goes wrong
ret = io_uring_queue_init_params(QUEUE_DEPTH, &ss->rings[0], &p);
// unblock the other threads, thread 1 initialization is done
pthread_barrier_wait(ss->init_barrier_pt);
}
if(ret < 0) {
char* error_msg = strerror(-ret);
printf("%s\n", error_msg);
printf("io_uring stream %d init failed...\n", ss->stream_server_num);
exit(1);
}
int *registered_files = init_registerfiles(ss);
ret = io_uring_register_files_update(&ss->rings[ss->stream_server_num], ss->listener, &ss->listener, 1);
if (ret < 0) {
fprintf(stderr, "lege io_uring_register_files_update failed: %d %d\n", ss->listener, ret);
exit(1);
}
registered_files[ss->listener] = ss->listener;
ret = io_uring_register_buffers(&ss->rings[ss->stream_server_num], &buffer_iov, 1);
if(ret) {
fprintf(stderr, "Error registering buffers: %s", strerror(-ret));
return 1;
}
/* accept new client, receive data from client */
ret = add_accept_request(ss, &client_addr, &client_addr_len);
io_uring_submit(&ss->rings[ss->stream_server_num]);
while (1) {
if (ss->endpoint->receiver_exit_after_done &&
ss->endpoint->state == TEST_FINISHED)
break;
/* wait on next request to finish before continuring */
ret = io_uring_wait_cqe(&ss->rings[ss->stream_server_num], &cqe);
int cqe_fd = cqe->user_data;
if (ret < 0)
printf("io_uring wait cqe failed...\n");
if (cqe->res < 0) {
printf("cqe_fd: %d, stream server num: %d, cqe->res: %d\n", cqe_fd, ss->stream_server_num, cqe->res);
fprintf(stderr, "Async request failed: %s for event: %d\n",
strerror(-cqe->res), cqe_fd);
exit(1);
}
/* check if the cqe was from a listen or from a read sqe */
if (cqe_fd == ss->listener) {
int sock_conn_fd = cqe->res;
if(registered_files[sock_conn_fd] == -1) {
ret = io_uring_register_files_update(&ss->rings[ss->stream_server_num], sock_conn_fd, &sock_conn_fd, 1);
if (ret < 0) {
fprintf(stderr, "io_uring_register_files_update failed: %d %d\n", sock_conn_fd, ret);
exit(1);
}
registered_files[sock_conn_fd] = sock_conn_fd;
}
add_accept_request(ss, &client_addr, &client_addr_len);
add_read_request(ss, sock_conn_fd, &buffer_iov);
//if there is no synch thread, if any new connection coming, indicates ss started
if ( ss->no_synch )
turn_on_light();
//else, leave the sync thread to fire the trigger
}
else {
int bytes_received = cqe->res;
if (!bytes_received) {
fprintf(stderr, "Empty request!\n");
break;
}
/* add to the byte count atomically and queue the next request */
__sync_fetch_and_add(&(ss->total_bytes_transferred), bytes_received);
add_read_request(ss, cqe_fd, &buffer_iov);
}
/* Mark this request as processed */
io_uring_cqe_seen(&ss->rings[ss->stream_server_num], cqe);
io_uring_submit(&ss->rings[ss->stream_server_num]);
}
free(buffer);
free(ip_address_str);
close(ss->listener);
return err_code;
}
void *run_ntttcp_receiver_tcp_stream(void *ptr)
{
char *log = NULL;
@ -672,13 +883,14 @@ void *run_ntttcp_receiver_tcp_stream(void *ptr)
ASPRINTF(&log, "listen error at port: %d", ss->server_port);
PRINT_ERR_FREE(log);
} else {
/* Decide on which method to transfer data with (io_uring, epoll, select) */
if (ss->use_iouring == true) {
printf("io_uring activated\n");
if ( ntttcp_server_iouring(ss) != NO_ERROR ) {
ASPRINTF(&log, "select error at port: %d", ss->server_port);
PRINT_ERR_FREE(log);
}
}
else {
printf("io_uring not activated\n");
}
if (ss->use_epoll == true) {
else if (ss->use_epoll == true) {
if (ntttcp_server_epoll(ss) != NO_ERROR) {
ASPRINTF(&log, "epoll error at port: %d", ss->server_port);
PRINT_ERR_FREE(log);

Просмотреть файл

@ -22,4 +22,5 @@ void *run_ntttcp_sender_tcp_stream(void *ptr);
int ntttcp_server_listen(struct ntttcp_stream_server *ss);
int ntttcp_server_epoll(struct ntttcp_stream_server *ss);
int ntttcp_server_select(struct ntttcp_stream_server *ss);
void *run_ntttcp_receiver_tcp_stream(void *ptr);
int ntttcp_server_iouring(struct ntttcp_stream_server *ss);
void *run_ntttcp_receiver_tcp_stream(void *ptr);