Message ID | 1464899256-23032-1-git-send-email-mathieu.desnoyers@efficios.com |
---|---|
State | Accepted, archived |
Headers | show |
Merged in master, stable-2.8 and stable-2.7. Thanks! Jérémie On Thu, Jun 2, 2016 at 4:27 PM, Mathieu Desnoyers <mathieu.desnoyers at efficios.com> wrote: > For channels configured with large sub-buffer size, the relayd copies > the entire trace sub-buffer (trace packet) into a large buffer, and then > copies the large buffer to disk. It is inefficient from a point of view > of cache locality. > > Use a 64k buffer on the stack instead, and move the data piece-wise. > > Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com> > --- > src/bin/lttng-relayd/main.c | 69 ++++++++++++++++++++++----------------------- > 1 file changed, 33 insertions(+), 36 deletions(-) > > diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c > index b5b56aa..a1e94dc 100644 > --- a/src/bin/lttng-relayd/main.c > +++ b/src/bin/lttng-relayd/main.c > @@ -81,6 +81,10 @@ static int opt_daemon, opt_background; > */ > #define NR_LTTNG_RELAY_READY 3 > static int lttng_relay_ready = NR_LTTNG_RELAY_READY; > + > +/* Size of receive buffer. */ > +#define RECV_DATA_BUFFER_SIZE 65536 > + > static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */ > static pid_t child_ppid; /* Internal parent PID use with daemonize. */ > > @@ -2244,6 +2248,9 @@ static int relay_process_data(struct relay_connection *conn) > uint32_t data_size; > struct relay_session *session; > bool new_stream = false, close_requested = false; > + size_t chunk_size = RECV_DATA_BUFFER_SIZE; > + size_t recv_off = 0; > + char data_buffer[chunk_size]; > > ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, > sizeof(struct lttcomm_relayd_data_hdr), 0); > @@ -2267,36 +2274,11 @@ static int relay_process_data(struct relay_connection *conn) > } > session = stream->trace->session; > data_size = be32toh(data_hdr.data_size); > - if (data_buffer_size < data_size) { > - char *tmp_data_ptr; > - > - tmp_data_ptr = realloc(data_buffer, data_size); > - if (!tmp_data_ptr) { > - ERR("Allocating data buffer"); > - free(data_buffer); > - ret = -1; > - goto end_stream_put; > - } > - data_buffer = tmp_data_ptr; > - data_buffer_size = data_size; > - } > - memset(data_buffer, 0, data_size); > > net_seq_num = be64toh(data_hdr.net_seq_num); > > DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, > data_size, stream_id, net_seq_num); > - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); > - if (ret <= 0) { > - if (ret == 0) { > - /* Orderly shutdown. Not necessary to print an error. */ > - DBG("Socket %d did an orderly shutdown", conn->sock->fd); > - } else { > - ERR("Socket %d error %d", conn->sock->fd, ret); > - } > - ret = -1; > - goto end_stream_put; > - } > > pthread_mutex_lock(&stream->lock); > > @@ -2342,16 +2324,33 @@ static int relay_process_data(struct relay_connection *conn) > } > } > > - /* Write data to stream output fd. */ > - size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size); > - if (size_ret < data_size) { > - ERR("Relay error writing data to file"); > - ret = -1; > - goto end_stream_unlock; > - } > + for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) { > + size_t recv_size = min(data_size - recv_off, chunk_size); > > - DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, > - size_ret, stream->stream_handle); > + ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0); > + if (ret <= 0) { > + if (ret == 0) { > + /* Orderly shutdown. Not necessary to print an error. */ > + DBG("Socket %d did an orderly shutdown", conn->sock->fd); > + } else { > + ERR("Socket %d error %d", conn->sock->fd, ret); > + } > + ret = -1; > + goto end_stream_unlock; > + } > + > + /* Write data to stream output fd. */ > + size_ret = lttng_write(stream->stream_fd->fd, data_buffer, > + recv_size); > + if (size_ret < recv_size) { > + ERR("Relay error writing data to file"); > + ret = -1; > + goto end_stream_unlock; > + } > + > + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, > + size_ret, stream->stream_handle); > + } > > ret = write_padding_to_file(stream->stream_fd->fd, > be32toh(data_hdr.padding_size)); > @@ -2380,7 +2379,6 @@ end_stream_unlock: > uatomic_set(&session->new_streams, 1); > pthread_mutex_unlock(&session->lock); > } > -end_stream_put: > stream_put(stream); > end: > return ret; > @@ -2698,7 +2696,6 @@ relay_connections_ht_error: > DBG("Thread exited with error"); > } > DBG("Worker thread cleanup complete"); > - free(data_buffer); > error_testpoint: > if (err) { > health_error(); > -- > 2.1.4 >
diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index b5b56aa..a1e94dc 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -81,6 +81,10 @@ static int opt_daemon, opt_background; */ #define NR_LTTNG_RELAY_READY 3 static int lttng_relay_ready = NR_LTTNG_RELAY_READY; + +/* Size of receive buffer. */ +#define RECV_DATA_BUFFER_SIZE 65536 + static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */ static pid_t child_ppid; /* Internal parent PID use with daemonize. */ @@ -2244,6 +2248,9 @@ static int relay_process_data(struct relay_connection *conn) uint32_t data_size; struct relay_session *session; bool new_stream = false, close_requested = false; + size_t chunk_size = RECV_DATA_BUFFER_SIZE; + size_t recv_off = 0; + char data_buffer[chunk_size]; ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2267,36 +2274,11 @@ static int relay_process_data(struct relay_connection *conn) } session = stream->trace->session; data_size = be32toh(data_hdr.data_size); - if (data_buffer_size < data_size) { - char *tmp_data_ptr; - - tmp_data_ptr = realloc(data_buffer, data_size); - if (!tmp_data_ptr) { - ERR("Allocating data buffer"); - free(data_buffer); - ret = -1; - goto end_stream_put; - } - data_buffer = tmp_data_ptr; - data_buffer_size = data_size; - } - memset(data_buffer, 0, data_size); net_seq_num = be64toh(data_hdr.net_seq_num); DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64, data_size, stream_id, net_seq_num); - ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0); - if (ret <= 0) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", conn->sock->fd); - } else { - ERR("Socket %d error %d", conn->sock->fd, ret); - } - ret = -1; - goto end_stream_put; - } pthread_mutex_lock(&stream->lock); @@ -2342,16 +2324,33 @@ static int relay_process_data(struct relay_connection *conn) } } - /* Write data to stream output fd. */ - size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size); - if (size_ret < data_size) { - ERR("Relay error writing data to file"); - ret = -1; - goto end_stream_unlock; - } + for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) { + size_t recv_size = min(data_size - recv_off, chunk_size); - DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, - size_ret, stream->stream_handle); + ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0); + if (ret <= 0) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Socket %d error %d", conn->sock->fd, ret); + } + ret = -1; + goto end_stream_unlock; + } + + /* Write data to stream output fd. */ + size_ret = lttng_write(stream->stream_fd->fd, data_buffer, + recv_size); + if (size_ret < recv_size) { + ERR("Relay error writing data to file"); + ret = -1; + goto end_stream_unlock; + } + + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, + size_ret, stream->stream_handle); + } ret = write_padding_to_file(stream->stream_fd->fd, be32toh(data_hdr.padding_size)); @@ -2380,7 +2379,6 @@ end_stream_unlock: uatomic_set(&session->new_streams, 1); pthread_mutex_unlock(&session->lock); } -end_stream_put: stream_put(stream); end: return ret; @@ -2698,7 +2696,6 @@ relay_connections_ht_error: DBG("Thread exited with error"); } DBG("Worker thread cleanup complete"); - free(data_buffer); error_testpoint: if (err) { health_error();
For channels configured with large sub-buffer size, the relayd copies the entire trace sub-buffer (trace packet) into a large buffer, and then copies the large buffer to disk. It is inefficient from a point of view of cache locality. Use a 64k buffer on the stack instead, and move the data piece-wise. Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers at efficios.com> --- src/bin/lttng-relayd/main.c | 69 ++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 36 deletions(-)