diff mbox

[RFC,lttng-tools] relayd: optimize receive throughput

Message ID 1464899256-23032-1-git-send-email-mathieu.desnoyers@efficios.com
State Accepted, archived
Headers show

Commit Message

Mathieu Desnoyers June 2, 2016, 8:27 p.m. UTC
For channels configured with large sub-buffer size, the relayd copies
the entire trace sub-buffer (trace packet) into a large buffer, and then
copies the large buffer to disk. It is inefficient from a point of view
of cache locality.

Use a 64k buffer on the stack instead, and move the data piece-wise.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
---
 src/bin/lttng-relayd/main.c | 69 ++++++++++++++++++++++-----------------------
 1 file changed, 33 insertions(+), 36 deletions(-)

Comments

Jérémie Galarneau June 3, 2016, 6:29 a.m. UTC | #1
Merged in master, stable-2.8 and stable-2.7.

Thanks!
Jérémie

On Thu, Jun 2, 2016 at 4:27 PM, Mathieu Desnoyers
<mathieu.desnoyers at efficios.com> wrote:
> For channels configured with large sub-buffer size, the relayd copies
> the entire trace sub-buffer (trace packet) into a large buffer, and then
> copies the large buffer to disk. It is inefficient from a point of view
> of cache locality.
>
> Use a 64k buffer on the stack instead, and move the data piece-wise.
>
> Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com>
> ---
>  src/bin/lttng-relayd/main.c | 69 ++++++++++++++++++++++-----------------------
>  1 file changed, 33 insertions(+), 36 deletions(-)
>
> diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
> index b5b56aa..a1e94dc 100644
> --- a/src/bin/lttng-relayd/main.c
> +++ b/src/bin/lttng-relayd/main.c
> @@ -81,6 +81,10 @@ static int opt_daemon, opt_background;
>   */
>  #define NR_LTTNG_RELAY_READY   3
>  static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
> +
> +/* Size of receive buffer. */
> +#define RECV_DATA_BUFFER_SIZE          65536
> +
>  static int recv_child_signal;  /* Set to 1 when a SIGUSR1 signal is received. */
>  static pid_t child_ppid;       /* Internal parent PID use with daemonize. */
>
> @@ -2244,6 +2248,9 @@ static int relay_process_data(struct relay_connection *conn)
>         uint32_t data_size;
>         struct relay_session *session;
>         bool new_stream = false, close_requested = false;
> +       size_t chunk_size = RECV_DATA_BUFFER_SIZE;
> +       size_t recv_off = 0;
> +       char data_buffer[chunk_size];
>
>         ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
>                         sizeof(struct lttcomm_relayd_data_hdr), 0);
> @@ -2267,36 +2274,11 @@ static int relay_process_data(struct relay_connection *conn)
>         }
>         session = stream->trace->session;
>         data_size = be32toh(data_hdr.data_size);
> -       if (data_buffer_size < data_size) {
> -               char *tmp_data_ptr;
> -
> -               tmp_data_ptr = realloc(data_buffer, data_size);
> -               if (!tmp_data_ptr) {
> -                       ERR("Allocating data buffer");
> -                       free(data_buffer);
> -                       ret = -1;
> -                       goto end_stream_put;
> -               }
> -               data_buffer = tmp_data_ptr;
> -               data_buffer_size = data_size;
> -       }
> -       memset(data_buffer, 0, data_size);
>
>         net_seq_num = be64toh(data_hdr.net_seq_num);
>
>         DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
>                 data_size, stream_id, net_seq_num);
> -       ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
> -       if (ret <= 0) {
> -               if (ret == 0) {
> -                       /* Orderly shutdown. Not necessary to print an error. */
> -                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
> -               } else {
> -                       ERR("Socket %d error %d", conn->sock->fd, ret);
> -               }
> -               ret = -1;
> -               goto end_stream_put;
> -       }
>
>         pthread_mutex_lock(&stream->lock);
>
> @@ -2342,16 +2324,33 @@ static int relay_process_data(struct relay_connection *conn)
>                 }
>         }
>
> -       /* Write data to stream output fd. */
> -       size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
> -       if (size_ret < data_size) {
> -               ERR("Relay error writing data to file");
> -               ret = -1;
> -               goto end_stream_unlock;
> -       }
> +       for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
> +               size_t recv_size = min(data_size - recv_off, chunk_size);
>
> -       DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
> -                       size_ret, stream->stream_handle);
> +               ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
> +               if (ret <= 0) {
> +                       if (ret == 0) {
> +                               /* Orderly shutdown. Not necessary to print an error. */
> +                               DBG("Socket %d did an orderly shutdown", conn->sock->fd);
> +                       } else {
> +                               ERR("Socket %d error %d", conn->sock->fd, ret);
> +                       }
> +                       ret = -1;
> +                       goto end_stream_unlock;
> +               }
> +
> +               /* Write data to stream output fd. */
> +               size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
> +                               recv_size);
> +               if (size_ret < recv_size) {
> +                       ERR("Relay error writing data to file");
> +                       ret = -1;
> +                       goto end_stream_unlock;
> +               }
> +
> +               DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
> +                               size_ret, stream->stream_handle);
> +       }
>
>         ret = write_padding_to_file(stream->stream_fd->fd,
>                         be32toh(data_hdr.padding_size));
> @@ -2380,7 +2379,6 @@ end_stream_unlock:
>                 uatomic_set(&session->new_streams, 1);
>                 pthread_mutex_unlock(&session->lock);
>         }
> -end_stream_put:
>         stream_put(stream);
>  end:
>         return ret;
> @@ -2698,7 +2696,6 @@ relay_connections_ht_error:
>                 DBG("Thread exited with error");
>         }
>         DBG("Worker thread cleanup complete");
> -       free(data_buffer);
>  error_testpoint:
>         if (err) {
>                 health_error();
> --
> 2.1.4
>
diff mbox

Patch

diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c
index b5b56aa..a1e94dc 100644
--- a/src/bin/lttng-relayd/main.c
+++ b/src/bin/lttng-relayd/main.c
@@ -81,6 +81,10 @@  static int opt_daemon, opt_background;
  */
 #define NR_LTTNG_RELAY_READY	3
 static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
+
+/* Size of receive buffer. */
+#define RECV_DATA_BUFFER_SIZE		65536
+
 static int recv_child_signal;	/* Set to 1 when a SIGUSR1 signal is received. */
 static pid_t child_ppid;	/* Internal parent PID use with daemonize. */
 
@@ -2244,6 +2248,9 @@  static int relay_process_data(struct relay_connection *conn)
 	uint32_t data_size;
 	struct relay_session *session;
 	bool new_stream = false, close_requested = false;
+	size_t chunk_size = RECV_DATA_BUFFER_SIZE;
+	size_t recv_off = 0;
+	char data_buffer[chunk_size];
 
 	ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
 			sizeof(struct lttcomm_relayd_data_hdr), 0);
@@ -2267,36 +2274,11 @@  static int relay_process_data(struct relay_connection *conn)
 	}
 	session = stream->trace->session;
 	data_size = be32toh(data_hdr.data_size);
-	if (data_buffer_size < data_size) {
-		char *tmp_data_ptr;
-
-		tmp_data_ptr = realloc(data_buffer, data_size);
-		if (!tmp_data_ptr) {
-			ERR("Allocating data buffer");
-			free(data_buffer);
-			ret = -1;
-			goto end_stream_put;
-		}
-		data_buffer = tmp_data_ptr;
-		data_buffer_size = data_size;
-	}
-	memset(data_buffer, 0, data_size);
 
 	net_seq_num = be64toh(data_hdr.net_seq_num);
 
 	DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
 		data_size, stream_id, net_seq_num);
-	ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
-	if (ret <= 0) {
-		if (ret == 0) {
-			/* Orderly shutdown. Not necessary to print an error. */
-			DBG("Socket %d did an orderly shutdown", conn->sock->fd);
-		} else {
-			ERR("Socket %d error %d", conn->sock->fd, ret);
-		}
-		ret = -1;
-		goto end_stream_put;
-	}
 
 	pthread_mutex_lock(&stream->lock);
 
@@ -2342,16 +2324,33 @@  static int relay_process_data(struct relay_connection *conn)
 		}
 	}
 
-	/* Write data to stream output fd. */
-	size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
-	if (size_ret < data_size) {
-		ERR("Relay error writing data to file");
-		ret = -1;
-		goto end_stream_unlock;
-	}
+	for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
+		size_t recv_size = min(data_size - recv_off, chunk_size);
 
-	DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
-			size_ret, stream->stream_handle);
+		ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
+		if (ret <= 0) {
+			if (ret == 0) {
+				/* Orderly shutdown. Not necessary to print an error. */
+				DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+			} else {
+				ERR("Socket %d error %d", conn->sock->fd, ret);
+			}
+			ret = -1;
+			goto end_stream_unlock;
+		}
+
+		/* Write data to stream output fd. */
+		size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+				recv_size);
+		if (size_ret < recv_size) {
+			ERR("Relay error writing data to file");
+			ret = -1;
+			goto end_stream_unlock;
+		}
+
+		DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+				size_ret, stream->stream_handle);
+	}
 
 	ret = write_padding_to_file(stream->stream_fd->fd,
 			be32toh(data_hdr.padding_size));
@@ -2380,7 +2379,6 @@  end_stream_unlock:
 		uatomic_set(&session->new_streams, 1);
 		pthread_mutex_unlock(&session->lock);
 	}
-end_stream_put:
 	stream_put(stream);
 end:
 	return ret;
@@ -2698,7 +2696,6 @@  relay_connections_ht_error:
 		DBG("Thread exited with error");
 	}
 	DBG("Worker thread cleanup complete");
-	free(data_buffer);
 error_testpoint:
 	if (err) {
 		health_error();