diff mbox series

[RFC,v2,12/13] Fix: delay termination on consumerd to allow metadata flushing

Message ID 20170918225206.17725-13-jonathan.rajotte-julien@efficios.com
State Superseded, archived
Delegated to: Jérémie Galarneau
Headers show
Series Sessiond teardown overhaul | expand

Commit Message

Jonathan Rajotte Sept. 18, 2017, 10:52 p.m. UTC
Move consumerd ownership to thread_manage_consumer to scope the lifetime
on the consumerd to its manager thread.

"thread_manage_consumer" is responsible for signaling and waiting the
termination of its consumerd.

All thread_manage_consumer threads now wait on a unique quit pipe
different from the global thread quit pipe. This allow control over its
lifetime.

The termination notification is sent during sessiond_cleanup after the
destroy session command to ensure that no session are still active at
the moment the consumerds are terminated.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien at efficios.com>
---
 src/bin/lttng-sessiond/main.c | 174 +++++++++++++++++++++++++++---------------
 1 file changed, 112 insertions(+), 62 deletions(-)

Comments

Jérémie Galarneau Dec. 14, 2017, 1:57 a.m. UTC | #1
On 18 September 2017 at 18:52, Jonathan Rajotte
<jonathan.rajotte-julien at efficios.com> wrote:
> Move consumerd ownership to thread_manage_consumer to scope the lifetime
> on the consumerd to its manager thread.
>
> "thread_manage_consumer" is responsible for signaling and waiting the
> termination of its consumerd.
>
> All thread_manage_consumer threads now wait on a unique quit pipe
> different from the global thread quit pipe. This allow control over its
> lifetime.
>
> The termination notification is sent during sessiond_cleanup after the
> destroy session command to ensure that no session are still active at
> the moment the consumerds are terminated.
>
> Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien at efficios.com>
> ---
>  src/bin/lttng-sessiond/main.c | 174 +++++++++++++++++++++++++++---------------
>  1 file changed, 112 insertions(+), 62 deletions(-)
>
> diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c
> index a840e8de..fb58ab4b 100644
> --- a/src/bin/lttng-sessiond/main.c
> +++ b/src/bin/lttng-sessiond/main.c
> @@ -206,6 +206,7 @@ static int kernel_poll_pipe[2] = { -1, -1 };
>  static int thread_quit_pipe[2] = { -1, -1 };
>  static int thread_health_teardown_trigger_pipe[2] = { -1, -1 };
>  static int thread_apps_teardown_trigger_pipe[2] = { -1, -1 };
> +static int thread_consumers_teardown_trigger_pipe[2] = { -1, -1 };
>  int thread_apps_notify_teardown_trigger_pipe[2] = { -1, -1 };
>
>  /*
> @@ -495,6 +496,11 @@ static int init_thread_apps_notify_teardown_trigger_pipe(void)
>         return __init_thread_quit_pipe(thread_apps_notify_teardown_trigger_pipe);
>  }
>
> +static int init_thread_consumers_teardown_trigger_pipe(void)
> +{
> +       return __init_thread_quit_pipe(thread_consumers_teardown_trigger_pipe);
> +}
> +
>  /*
>   * Stop first wave threads by closing the thread quit pipe.
>   *  - kernel thread
> @@ -601,14 +607,15 @@ static int generate_lock_file_path(char *path, size_t len)
>  /*
>   * Wait on consumer process termination.
>   *
> - * Need to be called with the consumer data lock held or from a context
> - * ensuring no concurrent access to data (e.g: cleanup).
> + * Need to be called with the consumer data lock held.
>   */
>  static void wait_consumer(struct consumer_data *consumer_data)
>  {
>         pid_t ret;
>         int status;
>
> +       assert(consumer_data);
> +
>         if (consumer_data->pid <= 0) {
>                 return;
>         }
> @@ -626,6 +633,52 @@ static void wait_consumer(struct consumer_data *consumer_data)
>  }
>
>  /*
> + * Signal to the consumer process to terminate.
> + *
> + * Need to be called with the consumer data lock held.
> + */
> +static void kill_consumer(struct consumer_data *consumer_data)
> +{
> +       int ret;
> +
> +       assert(consumer_data);
> +
> +       /* Consumer pid must be a real one. */
> +       if (consumer_data->pid <= 0) {
> +               goto end;
> +       }
> +
> +       ret = kill(consumer_data->pid, SIGTERM);
> +       if (ret) {
> +               PERROR("Error killing consumer daemon");
> +               goto end;
> +       }
> +end:
> +       return;
> +}
> +
> +static int join_thread_consumer(struct consumer_data *consumer_data)
> +{
> +       int ret;
> +       void *status;
> +
> +       assert(consumer_data);
> +
> +       /* Consumer pid must be a real one. */
> +       if (consumer_data->pid <= 0) {
> +               ret = 0;
> +               goto end;
> +       }
> +
> +       ret = pthread_join(consumer_data->thread, &status);
> +       if (ret) {
> +               ERR("Joining consumer thread pid %d", consumer_data->pid);
> +       }
> +end:
> +       return ret;
> +}
> +
> +/*
>   * Cleanup the session daemon's data structures.
>   */
>  static void sessiond_cleanup(void)
> @@ -707,7 +760,6 @@ static void sessiond_cleanup(void)
>         (void) rmdir(path);
>
>         DBG("Cleaning up all sessions");
> -
>         /* Destroy session list mutex */
>         if (session_list_ptr != NULL) {
>                 pthread_mutex_destroy(&session_list_ptr->lock);
> @@ -719,9 +771,35 @@ static void sessiond_cleanup(void)
>                 }
>         }
>
> -       wait_consumer(&kconsumer_data);
> -       wait_consumer(&ustconsumer64_data);
> -       wait_consumer(&ustconsumer32_data);
> +       /*
> +        * Delay the termination of manage_consumer_thread threads to allow
> +        * proper metadata flushing, following the session destroy. Use a
> +        * barrier to ensure that all call_rcu are executed at this point.
> +        */
> +       DBG("Teardown consurmer thread");
> +       rcu_barrier();
> +       ret = notify_thread_pipe(thread_consumers_teardown_trigger_pipe[1]);
> +       if (ret < 0) {
> +               ERR("write error on thread consumer quit pipe");
> +       }
> +
> +       ret = join_thread_consumer(&kconsumer_data);
> +       if (ret) {
> +               errno = ret;
> +               PERROR("join_consumer kernel");
> +       }
> +
> +       ret = join_thread_consumer(&ustconsumer32_data);
> +       if (ret) {
> +               errno = ret;
> +               PERROR("join_consumer ust32");
> +       }
> +
> +       ret = join_thread_consumer(&ustconsumer64_data);
> +       if (ret) {
> +               errno = ret;
> +               PERROR("join_consumer ust64");
> +       }
>
>         DBG("Cleaning up all agent apps");
>         agent_app_ht_clean();
> @@ -1289,14 +1367,20 @@ static void *thread_manage_consumer(void *data)
>         health_code_update();
>
>         /*
> -        * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
> +        * Pass 3 as size here for the thread consumer quit pipe, consumerd_err_sock and the
>          * metadata_sock. Nothing more will be added to this poll set.
>          */
> -       ret = sessiond_set_thread_pollset(&events, 3);
> +       ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
>         if (ret < 0) {
>                 goto error_poll;
>         }
>
> +       /* Add quit pipe */
> +       ret = lttng_poll_add(&events, thread_consumers_teardown_trigger_pipe[0], LPOLLIN | LPOLLERR);
> +       if (ret < 0) {
> +               goto error;
> +       }
> +
>         /*
>          * The error socket here is already in a listening state which was done
>          * just before spawning this thread to avoid a race between the consumer
> @@ -1344,7 +1428,7 @@ restart:
>                 }
>
>                 /* Thread quit pipe has been closed. Killing thread. */
> -               ret = sessiond_check_thread_quit_pipe(pollfd, revents);
> +               ret = (pollfd == thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0;

I don't mind ternaries, but that's a really long expression.
I think this would be more readable using a good-old 'if'.

Also, is LPOLLERR checked somewhere?

>                 if (ret) {
>                         err = 0;
>                         goto exit;
> @@ -1509,7 +1593,7 @@ restart_poll:
>                          * but continue the current loop to handle potential data from
>                          * consumer.
>                          */
> -                       should_quit = sessiond_check_thread_quit_pipe(pollfd, revents);
> +                       should_quit = (pollfd == thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0;

Same comment applies here.

>
>                         if (pollfd == sock) {
>                                 /* Event on the consumerd socket */
> @@ -1552,11 +1636,6 @@ restart_poll:
>
>  exit:
>  error:
> -       /*
> -        * We lock here because we are about to close the sockets and some other
> -        * thread might be using them so get exclusive access which will abort all
> -        * other consumer command by other threads.
> -        */
>         pthread_mutex_lock(&consumer_data->lock);
>
>         /* Immediately set the consumerd state to stopped */
> @@ -1570,6 +1649,13 @@ error:
>                 assert(0);
>         }
>
> +       /*
> +        * This thread is responsible for its consumerd. Make sure the
> +        * consumerd teardown is complete before proceding.
> +        */
> +       kill_consumer(consumer_data);
> +       wait_consumer(consumer_data);
> +
>         if (consumer_data->err_sock >= 0) {
>                 ret = close(consumer_data->err_sock);
>                 if (ret) {
> @@ -1600,13 +1686,15 @@ error:
>
>         unlink(consumer_data->err_unix_sock_path);
>         unlink(consumer_data->cmd_unix_sock_path);
> -       pthread_mutex_unlock(&consumer_data->lock);
>
>         /* Cleanup metadata socket mutex. */
>         if (consumer_data->metadata_sock.lock) {
>                 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
>                 free(consumer_data->metadata_sock.lock);
>         }
> +
> +       pthread_mutex_unlock(&consumer_data->lock);
> +
>         lttng_poll_clean(&events);
>
>         if (cmd_socket_wrapper) {
> @@ -2560,27 +2648,6 @@ error:
>  }
>
>  /*
> - * Join consumer thread
> - */
> -static int join_consumer_thread(struct consumer_data *consumer_data)
> -{
> -       void *status;
> -
> -       /* Consumer pid must be a real one. */
> -       if (consumer_data->pid > 0) {
> -               int ret;
> -               ret = kill(consumer_data->pid, SIGTERM);
> -               if (ret) {
> -                       PERROR("Error killing consumer daemon");
> -                       return ret;
> -               }
> -               return pthread_join(consumer_data->thread, &status);
> -       } else {
> -               return 0;
> -       }
> -}
> -
> -/*
>   * Fork and exec a consumer daemon (consumerd).
>   *
>   * Return pid if successful else -1.
> @@ -4741,27 +4808,6 @@ error_create_poll:
>
>         rcu_unregister_thread();
>
> -       /*
> -        * Since we are creating the consumer threads, we own them, so we need
> -        * to join them before our thread exits.
> -        */
> -       ret = join_consumer_thread(&kconsumer_data);
> -       if (ret) {
> -               errno = ret;
> -               PERROR("join_consumer");
> -       }
> -
> -       ret = join_consumer_thread(&ustconsumer32_data);
> -       if (ret) {
> -               errno = ret;
> -               PERROR("join_consumer ust32");
> -       }
> -
> -       ret = join_consumer_thread(&ustconsumer64_data);
> -       if (ret) {
> -               errno = ret;
> -               PERROR("join_consumer ust64");
> -       }
>         return NULL;
>  }
>
> @@ -5785,6 +5831,11 @@ int main(int argc, char **argv)
>                 goto exit_init_data;
>         }
>
> +       if (init_thread_consumers_teardown_trigger_pipe()) {
> +               retval = -1;
> +               goto exit_init_data;
> +       }
> +
>         /* Check if daemon is UID = 0 */
>         is_root = !getuid();
>
> @@ -6406,11 +6457,8 @@ exit_init_data:
>          * perform lookups in those structures.
>          */
>         rcu_barrier();
> -       /*
> -        * sessiond_cleanup() is called when no other thread is running, except
> -        * the ht_cleanup thread, which is needed to destroy the hash tables.
> -        */
>         rcu_thread_online();
> +
>         sessiond_cleanup();
>
>         /*
> @@ -6461,6 +6509,8 @@ exit_init_data:
>                 retval = -1;
>         }
>
> +       /* Consumers thread teardown pipe cleanup */
> +       utils_close_pipe(thread_consumers_teardown_trigger_pipe);
>         /* Health thread teardown pipe cleanup */
>         utils_close_pipe(thread_health_teardown_trigger_pipe);
>         /* Apps thread teardown pipe cleanup */
> --
> 2.11.0
>
diff mbox series

Patch

diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c
index a840e8de..fb58ab4b 100644
--- a/src/bin/lttng-sessiond/main.c
+++ b/src/bin/lttng-sessiond/main.c
@@ -206,6 +206,7 @@  static int kernel_poll_pipe[2] = { -1, -1 };
 static int thread_quit_pipe[2] = { -1, -1 };
 static int thread_health_teardown_trigger_pipe[2] = { -1, -1 };
 static int thread_apps_teardown_trigger_pipe[2] = { -1, -1 };
+static int thread_consumers_teardown_trigger_pipe[2] = { -1, -1 };
 int thread_apps_notify_teardown_trigger_pipe[2] = { -1, -1 };
 
 /*
@@ -495,6 +496,11 @@  static int init_thread_apps_notify_teardown_trigger_pipe(void)
 	return __init_thread_quit_pipe(thread_apps_notify_teardown_trigger_pipe);
 }
 
+static int init_thread_consumers_teardown_trigger_pipe(void)
+{
+	return __init_thread_quit_pipe(thread_consumers_teardown_trigger_pipe);
+}
+
 /*
  * Stop first wave threads by closing the thread quit pipe.
  *  - kernel thread
@@ -601,14 +607,15 @@  static int generate_lock_file_path(char *path, size_t len)
 /*
  * Wait on consumer process termination.
  *
- * Need to be called with the consumer data lock held or from a context
- * ensuring no concurrent access to data (e.g: cleanup).
+ * Need to be called with the consumer data lock held.
  */
 static void wait_consumer(struct consumer_data *consumer_data)
 {
 	pid_t ret;
 	int status;
 
+	assert(consumer_data);
+
 	if (consumer_data->pid <= 0) {
 		return;
 	}
@@ -626,6 +633,52 @@  static void wait_consumer(struct consumer_data *consumer_data)
 }
 
 /*
+ * Signal to the consumer process to terminate.
+ *
+ * Need to be called with the consumer data lock held.
+ */
+static void kill_consumer(struct consumer_data *consumer_data)
+{
+	int ret;
+
+	assert(consumer_data);
+
+	/* Consumer pid must be a real one. */
+	if (consumer_data->pid <= 0) {
+		goto end;
+	}
+
+	ret = kill(consumer_data->pid, SIGTERM);
+	if (ret) {
+		PERROR("Error killing consumer daemon");
+		goto end;
+	}
+end:
+	return;
+}
+
+static int join_thread_consumer(struct consumer_data *consumer_data)
+{
+	int ret;
+	void *status;
+
+	assert(consumer_data);
+
+	/* Consumer pid must be a real one. */
+	if (consumer_data->pid <= 0) {
+		ret = 0;
+		goto end;
+	}
+
+	ret = pthread_join(consumer_data->thread, &status);
+	if (ret) {
+		ERR("Joining consumer thread pid %d", consumer_data->pid);
+	}
+end:
+	return ret;
+}
+
+/*
  * Cleanup the session daemon's data structures.
  */
 static void sessiond_cleanup(void)
@@ -707,7 +760,6 @@  static void sessiond_cleanup(void)
 	(void) rmdir(path);
 
 	DBG("Cleaning up all sessions");
-
 	/* Destroy session list mutex */
 	if (session_list_ptr != NULL) {
 		pthread_mutex_destroy(&session_list_ptr->lock);
@@ -719,9 +771,35 @@  static void sessiond_cleanup(void)
 		}
 	}
 
-	wait_consumer(&kconsumer_data);
-	wait_consumer(&ustconsumer64_data);
-	wait_consumer(&ustconsumer32_data);
+	/*
+	 * Delay the termination of manage_consumer_thread threads to allow
+	 * proper metadata flushing, following the session destroy. Use a
+	 * barrier to ensure that all call_rcu are executed at this point.
+	 */
+	DBG("Teardown consurmer thread");
+	rcu_barrier();
+	ret = notify_thread_pipe(thread_consumers_teardown_trigger_pipe[1]);
+	if (ret < 0) {
+		ERR("write error on thread consumer quit pipe");
+	}
+
+	ret = join_thread_consumer(&kconsumer_data);
+	if (ret) {
+		errno = ret;
+		PERROR("join_consumer kernel");
+	}
+
+	ret = join_thread_consumer(&ustconsumer32_data);
+	if (ret) {
+		errno = ret;
+		PERROR("join_consumer ust32");
+	}
+
+	ret = join_thread_consumer(&ustconsumer64_data);
+	if (ret) {
+		errno = ret;
+		PERROR("join_consumer ust64");
+	}
 
 	DBG("Cleaning up all agent apps");
 	agent_app_ht_clean();
@@ -1289,14 +1367,20 @@  static void *thread_manage_consumer(void *data)
 	health_code_update();
 
 	/*
-	 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
+	 * Pass 3 as size here for the thread consumer quit pipe, consumerd_err_sock and the
 	 * metadata_sock. Nothing more will be added to this poll set.
 	 */
-	ret = sessiond_set_thread_pollset(&events, 3);
+	ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
 	if (ret < 0) {
 		goto error_poll;
 	}
 
+	/* Add quit pipe */
+	ret = lttng_poll_add(&events, thread_consumers_teardown_trigger_pipe[0], LPOLLIN | LPOLLERR);
+	if (ret < 0) {
+		goto error;
+	}
+
 	/*
 	 * The error socket here is already in a listening state which was done
 	 * just before spawning this thread to avoid a race between the consumer
@@ -1344,7 +1428,7 @@  restart:
 		}
 
 		/* Thread quit pipe has been closed. Killing thread. */
-		ret = sessiond_check_thread_quit_pipe(pollfd, revents);
+		ret = (pollfd == thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0;
 		if (ret) {
 			err = 0;
 			goto exit;
@@ -1509,7 +1593,7 @@  restart_poll:
 			 * but continue the current loop to handle potential data from
 			 * consumer.
 			 */
-			should_quit = sessiond_check_thread_quit_pipe(pollfd, revents);
+			should_quit = (pollfd == thread_consumers_teardown_trigger_pipe[0] && (revents & LPOLLIN)) ? 1 : 0;
 
 			if (pollfd == sock) {
 				/* Event on the consumerd socket */
@@ -1552,11 +1636,6 @@  restart_poll:
 
 exit:
 error:
-	/*
-	 * We lock here because we are about to close the sockets and some other
-	 * thread might be using them so get exclusive access which will abort all
-	 * other consumer command by other threads.
-	 */
 	pthread_mutex_lock(&consumer_data->lock);
 
 	/* Immediately set the consumerd state to stopped */
@@ -1570,6 +1649,13 @@  error:
 		assert(0);
 	}
 
+	/*
+	 * This thread is responsible for its consumerd. Make sure the
+	 * consumerd teardown is complete before proceding.
+	 */
+	kill_consumer(consumer_data);
+	wait_consumer(consumer_data);
+
 	if (consumer_data->err_sock >= 0) {
 		ret = close(consumer_data->err_sock);
 		if (ret) {
@@ -1600,13 +1686,15 @@  error:
 
 	unlink(consumer_data->err_unix_sock_path);
 	unlink(consumer_data->cmd_unix_sock_path);
-	pthread_mutex_unlock(&consumer_data->lock);
 
 	/* Cleanup metadata socket mutex. */
 	if (consumer_data->metadata_sock.lock) {
 		pthread_mutex_destroy(consumer_data->metadata_sock.lock);
 		free(consumer_data->metadata_sock.lock);
 	}
+
+	pthread_mutex_unlock(&consumer_data->lock);
+
 	lttng_poll_clean(&events);
 
 	if (cmd_socket_wrapper) {
@@ -2560,27 +2648,6 @@  error:
 }
 
 /*
- * Join consumer thread
- */
-static int join_consumer_thread(struct consumer_data *consumer_data)
-{
-	void *status;
-
-	/* Consumer pid must be a real one. */
-	if (consumer_data->pid > 0) {
-		int ret;
-		ret = kill(consumer_data->pid, SIGTERM);
-		if (ret) {
-			PERROR("Error killing consumer daemon");
-			return ret;
-		}
-		return pthread_join(consumer_data->thread, &status);
-	} else {
-		return 0;
-	}
-}
-
-/*
  * Fork and exec a consumer daemon (consumerd).
  *
  * Return pid if successful else -1.
@@ -4741,27 +4808,6 @@  error_create_poll:
 
 	rcu_unregister_thread();
 
-	/*
-	 * Since we are creating the consumer threads, we own them, so we need
-	 * to join them before our thread exits.
-	 */
-	ret = join_consumer_thread(&kconsumer_data);
-	if (ret) {
-		errno = ret;
-		PERROR("join_consumer");
-	}
-
-	ret = join_consumer_thread(&ustconsumer32_data);
-	if (ret) {
-		errno = ret;
-		PERROR("join_consumer ust32");
-	}
-
-	ret = join_consumer_thread(&ustconsumer64_data);
-	if (ret) {
-		errno = ret;
-		PERROR("join_consumer ust64");
-	}
 	return NULL;
 }
 
@@ -5785,6 +5831,11 @@  int main(int argc, char **argv)
 		goto exit_init_data;
 	}
 
+	if (init_thread_consumers_teardown_trigger_pipe()) {
+		retval = -1;
+		goto exit_init_data;
+	}
+
 	/* Check if daemon is UID = 0 */
 	is_root = !getuid();
 
@@ -6406,11 +6457,8 @@  exit_init_data:
 	 * perform lookups in those structures.
 	 */
 	rcu_barrier();
-	/*
-	 * sessiond_cleanup() is called when no other thread is running, except
-	 * the ht_cleanup thread, which is needed to destroy the hash tables.
-	 */
 	rcu_thread_online();
+
 	sessiond_cleanup();
 
 	/*
@@ -6461,6 +6509,8 @@  exit_init_data:
 		retval = -1;
 	}
 
+	/* Consumers thread teardown pipe cleanup */
+	utils_close_pipe(thread_consumers_teardown_trigger_pipe);
 	/* Health thread teardown pipe cleanup */
 	utils_close_pipe(thread_health_teardown_trigger_pipe);
 	/* Apps thread teardown pipe cleanup */