On Fri, Jul 03, 2009 at 01:50:33PM +0100, Matthew Booth wrote:
This is a repost of my previous patch, rebased against the current
head.
Matt
--
Matthew Booth, RHCA, RHCSS
Red Hat Engineering, Virtualisation Team
M: +44 (0)7977 267231
GPG ID: D33C3490
GPG FPR: 3733 612D 2D05 5458 8A8A 1600 3441 EA19 D33C 3490
>From 5f57439a6139fa03560cb3a5351eedc2ebe42e19 Mon Sep 17 00:00:00
2001
From: Matthew Booth <mbooth(a)redhat.com>
Date: Sat, 27 Jun 2009 22:05:48 +0100
Subject: [PATCH] Remove receive callbacks
This patch fixes a class of race conditions characterised by the
following sequence of events:
LIBRARY DAEMON
send download request
receive download request
respond with download response
start sending file chunks
set reply callback to 'download'
run main loop
At this stage the download reply callback receives both the download
reply and some file chunks. The current architecture doesn't provide a clean way
to prevent this from happening.
This patch fixes the above problem by changing the socket receive
handler to do nothing but buffering, and provides 2 new apis:
guestfs_get_reply
guestfs_free_reply
These will always de-queue exactly 1 message, which is always what is
wanted.
---
src/generator.ml | 211 ++++++++----------
src/guestfs.c | 672 +++++++++++++++++++++++++-----------------------------
src/guestfs.h | 13 +-
3 files changed, 417 insertions(+), 479 deletions(-)
diff --git a/src/generator.ml b/src/generator.ml
index c65e717..c64b8c7 100755
--- a/src/generator.ml
+++ b/src/generator.ml
@@ -3734,83 +3734,6 @@ check_state (guestfs_h *g, const char *caller)
List.iter (
fun (shortname, style, _, _, _, _, _) ->
let name = "guestfs_" ^ shortname in
-
- (* Generate the context struct which stores the high-level
- * state between callback functions.
- *)
- pr "struct %s_ctx {\n" shortname;
- pr " /* This flag is set by the callbacks, so we know we've
done\n";
- pr " * the callbacks as expected, and in the right sequence.\n";
- pr " * 0 = not called, 1 = reply_cb called.\n";
- pr " */\n";
- pr " int cb_sequence;\n";
- pr " struct guestfs_message_header hdr;\n";
- pr " struct guestfs_message_error err;\n";
- (match fst style with
- | RErr -> ()
- | RConstString _ ->
- failwithf "RConstString cannot be returned from a daemon function"
- | RInt _ | RInt64 _
- | RBool _ | RString _ | RStringList _
- | RIntBool _
- | RPVList _ | RVGList _ | RLVList _
- | RStat _ | RStatVFS _
- | RHashtable _
- | RDirentList _ ->
- pr " struct %s_ret ret;\n" name
- );
- pr "};\n";
- pr "\n";
-
- (* Generate the reply callback function. *)
- pr "static void %s_reply_cb (guestfs_h *g, void *data, XDR *xdr)\n"
shortname;
- pr "{\n";
- pr " guestfs_main_loop *ml = guestfs_get_main_loop (g);\n";
- pr " struct %s_ctx *ctx = (struct %s_ctx *) data;\n" shortname
shortname;
- pr "\n";
- pr " /* This should definitely not happen. */\n";
- pr " if (ctx->cb_sequence != 0) {\n";
- pr " ctx->cb_sequence = 9999;\n";
- pr " error (g, \"%%s: internal error: reply callback called
twice\", \"%s\");\n" name;
- pr " return;\n";
- pr " }\n";
- pr "\n";
- pr " ml->main_loop_quit (ml, g);\n";
- pr "\n";
- pr " if (!xdr_guestfs_message_header (xdr, &ctx->hdr)) {\n";
- pr " error (g, \"%%s: failed to parse reply header\",
\"%s\");\n" name;
- pr " return;\n";
- pr " }\n";
- pr " if (ctx->hdr.status == GUESTFS_STATUS_ERROR) {\n";
- pr " if (!xdr_guestfs_message_error (xdr, &ctx->err)) {\n";
- pr " error (g, \"%%s: failed to parse reply error\",
\"%s\");\n"
- name;
- pr " return;\n";
- pr " }\n";
- pr " goto done;\n";
- pr " }\n";
-
- (match fst style with
- | RErr -> ()
- | RConstString _ ->
- failwithf "RConstString cannot be returned from a daemon function"
- | RInt _ | RInt64 _
- | RBool _ | RString _ | RStringList _
- | RIntBool _
- | RPVList _ | RVGList _ | RLVList _
- | RStat _ | RStatVFS _
- | RHashtable _
- | RDirentList _ ->
- pr " if (!xdr_%s_ret (xdr, &ctx->ret)) {\n" name;
- pr " error (g, \"%%s: failed to parse reply\",
\"%s\");\n" name;
- pr " return;\n";
- pr " }\n";
- );
-
- pr " done:\n";
- pr " ctx->cb_sequence = 1;\n";
- pr "}\n\n";
-
(* Generate the action stub. *)
generate_prototype ~extern:false ~semicolon:false ~newline:true
~handle:"g" name style;
@@ -3834,15 +3757,27 @@ check_state (guestfs_h *g, const char *caller)
| _ -> pr " struct %s_args args;\n" name
);
- pr " struct %s_ctx ctx;\n" shortname;
- pr " guestfs_main_loop *ml = guestfs_get_main_loop (g);\n";
+ pr " struct guestfs_message_header hdr = {};\n";
+ pr " struct guestfs_message_error err = {};\n";
+ (match fst style with
+ | RErr -> ()
+ | RConstString _ ->
+ failwithf "RConstString cannot be returned from a daemon function"
+ | RInt _ | RInt64 _
+ | RBool _ | RString _ | RStringList _
+ | RIntBool _
+ | RPVList _ | RVGList _ | RLVList _
+ | RStat _ | RStatVFS _
+ | RHashtable _
+ | RDirentList _ ->
+ pr " struct %s_ret ret = {};\n" name
+ );
+
pr " int serial;\n";
pr "\n";
pr " if (check_state (g, \"%s\") == -1) return %s;\n" name
error_code;
pr " guestfs_set_busy (g);\n";
pr "\n";
- pr " memset (&ctx, 0, sizeof ctx);\n";
- pr "\n";
(* Send the main header and arguments. *)
(match snd style with
@@ -3877,7 +3812,6 @@ check_state (guestfs_h *g, const char *caller)
pr "\n";
(* Send any additional files (FileIn) requested. *)
- let need_read_reply_label = ref false in
List.iter (
function
| FileIn n ->
@@ -3889,83 +3823,130 @@ check_state (guestfs_h *g, const char *caller)
pr " guestfs_end_busy (g);\n";
pr " return %s;\n" error_code;
pr " }\n";
- pr " if (r == -2) /* daemon cancelled */\n";
- pr " goto read_reply;\n";
- need_read_reply_label := true;
pr " }\n";
pr "\n";
| _ -> ()
) (snd style);
(* Wait for the reply from the remote end. *)
- if !need_read_reply_label then pr " read_reply:\n";
- pr " guestfs__switch_to_receiving (g);\n";
- pr " ctx.cb_sequence = 0;\n";
- pr " guestfs_set_reply_callback (g, %s_reply_cb, &ctx);\n"
shortname;
- pr " (void) ml->main_loop_run (ml, g);\n";
- pr " guestfs_set_reply_callback (g, NULL, NULL);\n";
- pr " if (ctx.cb_sequence != 1) {\n";
- pr " error (g, \"%%s reply failed, see earlier error messages\",
\"%s\");\n" name;
- pr " guestfs_end_busy (g);\n";
- pr " return %s;\n" error_code;
+ pr " guestfs_reply_t reply;\n";
+ pr "\n";
+ pr " for (;;) {\n";
+ pr " guestfs_get_reply (g, &reply, 1);\n";
+ pr "\n";
+ pr " if (GUESTFS_CANCEL_FLAG == reply.len) {\n";
+ pr " /* This message was delayed from a previous file transaction.
*/\n";
+ pr " continue;\n";
+ pr " }\n";
+ pr "\n";
+ pr " if (GUESTFS_LAUNCH_FLAG == reply.len) {\n";
+ pr " error (g, \"%%s reply failed, received unexpected launch
message\",\n";
+ pr " \"%s\");\n" name;
+ pr " guestfs_end_busy (g);\n";
+ pr " return %s;\n" error_code;
+ pr " }\n";
+ pr "\n";
+ pr " if (0 == reply.len) {\n";
+ pr " error (g, \"%%s reply failed, see earlier error
messages\", \"%s\");\n" name;
+ pr " guestfs_end_busy (g);\n";
+ pr " return %s;\n" error_code;
+ pr " }\n";
+ pr "\n";
+ pr " break;\n";
pr " }\n";
pr "\n";
- pr " if (check_reply_header (g, &ctx.hdr, GUESTFS_PROC_%s, serial) ==
-1) {\n"
+ pr " if (!xdr_guestfs_message_header (&reply.xdr, &hdr)) {\n";
+ pr " error (g, \"%%s: failed to parse reply header\",
\"%s\");\n" name;
+ pr " goto recv_error;\n";
+ pr " }\n";
+ pr "\n";
+ pr " if (hdr.status == GUESTFS_STATUS_ERROR) {\n";
+ pr " if (!xdr_guestfs_message_error (&reply.xdr, &err))
{\n";
+ pr " error (g, \"%%s: failed to parse reply error\",
\"%s\");\n"
+ name;
+ pr " goto recv_error;\n";
+ pr " }\n";
+ pr " }\n";
+
+ (match fst style with
+ | RErr -> ()
+ | RConstString _ ->
+ failwithf "RConstString cannot be returned from a daemon function"
+ | RInt _ | RInt64 _
+ | RBool _ | RString _ | RStringList _
+ | RIntBool _
+ | RPVList _ | RVGList _ | RLVList _
+ | RStat _ | RStatVFS _
+ | RHashtable _
+ | RDirentList _ ->
+ pr " else if (!xdr_%s_ret (&reply.xdr, &ret)) {\n" name;
+ pr " error (g, \"%%s: failed to parse reply\",
\"%s\");\n" name;
+ pr " goto recv_error;\n";
+ pr " }\n";
+ );
+
+ pr " if (check_reply_header (g, &hdr, GUESTFS_PROC_%s, serial) == -1)
{\n"
(String.uppercase shortname);
- pr " guestfs_end_busy (g);\n";
- pr " return %s;\n" error_code;
+ pr " goto recv_error;\n";
pr " }\n";
pr "\n";
- pr " if (ctx.hdr.status == GUESTFS_STATUS_ERROR) {\n";
- pr " error (g, \"%%s\", ctx.err.error_message);\n";
- pr " free (ctx.err.error_message);\n";
- pr " guestfs_end_busy (g);\n";
- pr " return %s;\n" error_code;
+ pr " if (hdr.status == GUESTFS_STATUS_ERROR) {\n";
+ pr " error (g, \"%%s\", err.error_message);\n";
+ pr " free (err.error_message);\n";
+ pr " goto recv_error;\n";
pr " }\n";
pr "\n";
+ pr " guestfs_free_reply (g, &reply);\n\n";
+
(* Expecting to receive further files (FileOut)? *)
List.iter (
function
| FileOut n ->
pr " if (guestfs__receive_file_sync (g, %s) == -1) {\n" n;
- pr " guestfs_end_busy (g);\n";
- pr " return %s;\n" error_code;
+ pr " guestfs_end_busy (g);\n";
+ pr " return %s;\n" error_code;
pr " }\n";
pr "\n";
| _ -> ()
) (snd style);
- pr " guestfs_end_busy (g);\n";
+ pr " guestfs_end_busy (g);\n\n";
(match fst style with
| RErr -> pr " return 0;\n"
| RInt n | RInt64 n | RBool n ->
- pr " return ctx.ret.%s;\n" n
+ pr " return ret.%s;\n" n
| RConstString _ ->
failwithf "RConstString cannot be returned from a daemon function"
| RString n ->
- pr " return ctx.ret.%s; /* caller will free */\n" n
+ pr " return ret.%s; /* caller will free */\n" n
| RStringList n | RHashtable n ->
pr " /* caller will free this, but we need to add a NULL entry */\n";
- pr " ctx.ret.%s.%s_val =\n" n n;
- pr " safe_realloc (g, ctx.ret.%s.%s_val,\n" n n;
- pr " sizeof (char *) * (ctx.ret.%s.%s_len + 1));\n"
+ pr " ret.%s.%s_val =\n" n n;
+ pr " safe_realloc (g, ret.%s.%s_val,\n" n n;
+ pr " sizeof (char *) * (ret.%s.%s_len + 1));\n"
n n;
- pr " ctx.ret.%s.%s_val[ctx.ret.%s.%s_len] = NULL;\n" n n n n;
- pr " return ctx.ret.%s.%s_val;\n" n n
+ pr " ret.%s.%s_val[ret.%s.%s_len] = NULL;\n" n n n n;
+ pr " return ret.%s.%s_val;\n" n n
| RIntBool _ ->
pr " /* caller with free this */\n";
- pr " return safe_memdup (g, &ctx.ret, sizeof (ctx.ret));\n"
+ pr " return safe_memdup (g, &ret, sizeof (ret));\n"
| RPVList n | RVGList n | RLVList n
| RStat n | RStatVFS n
| RDirentList n ->
pr " /* caller will free this */\n";
- pr " return safe_memdup (g, &ctx.ret.%s, sizeof (ctx.ret.%s));\n" n
n
+ pr " return safe_memdup (g, &ret.%s, sizeof (ret.%s));\n" n n
);
+ pr "\n";
+ pr " recv_error:\n";
+ pr " guestfs_free_reply (g, &reply);\n";
+ pr " guestfs_end_busy (g);\n";
+ pr " return %s;\n" error_code;
+
pr "}\n\n"
) daemon_functions
diff --git a/src/guestfs.c b/src/guestfs.c
index 350d848..79251ca 100644
--- a/src/guestfs.c
+++ b/src/guestfs.c
@@ -21,6 +21,7 @@
#define _BSD_SOURCE /* for mkdtemp, usleep */
#define _GNU_SOURCE /* for vasprintf, GNU strerror_r, strchrnul */
+#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
@@ -78,8 +79,10 @@
static void default_error_cb (guestfs_h *g, void *data, const char *msg);
static void stdout_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int
watch, int fd, int events);
-static void sock_read_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int
watch, int fd, int events);
-static void sock_write_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data,
int watch, int fd, int events);
+static void sock_event (struct guestfs_main_loop *ml, guestfs_h *g, void *data, int
watch, int fd, int events);
+static void sock_read (guestfs_h *g);
+static void sock_write (guestfs_h *g);
+static int sock_update_events (guestfs_h *g);
static void close_handles (void);
@@ -161,6 +164,9 @@ struct guestfs_h
int stdout_watch; /* Watches qemu stdout for log messages. */
int sock_watch; /* Watches daemon comm socket. */
+ int sock_events; /* events we're listening for on the comm
+ socket */
+
char *tmpdir; /* Temporary directory containing socket. */
char *qemu_help, *qemu_version; /* Output of qemu -help, qemu -version. */
@@ -185,21 +191,18 @@ struct guestfs_h
void * error_cb_data;
guestfs_send_cb send_cb;
void * send_cb_data;
- guestfs_reply_cb reply_cb;
- void * reply_cb_data;
guestfs_log_message_cb log_message_cb;
void * log_message_cb_data;
guestfs_subprocess_quit_cb subprocess_quit_cb;
void * subprocess_quit_cb_data;
- guestfs_launch_done_cb launch_done_cb;
- void * launch_done_cb_data;
/* Main loop used by this handle. */
guestfs_main_loop *main_loop;
/* Messages sent and received from the daemon. */
char *msg_in;
- int msg_in_size, msg_in_allocated;
+ size_t msg_in_size, msg_in_pos, msg_in_consumed, msg_in_len;
+
char *msg_out;
int msg_out_size, msg_out_pos;
@@ -228,6 +231,8 @@ guestfs_create (void)
g->stdout_watch = -1;
g->sock_watch = -1;
+ g->sock_events = 0;
+
g->abort_cb = abort;
g->error_cb = default_error_cb;
g->error_cb_data = NULL;
@@ -265,6 +270,11 @@ guestfs_create (void)
} else
g->memsize = 500;
+ /* Initialise the message receive buffer */
+ g->msg_in_size = GUESTFS_MESSAGE_MAX + sizeof (g->msg_in_len);
As in my comment on the previous revision of this patch, why
is this not '+ 4'?
+ g->msg_in = safe_malloc (g, g->msg_in_size);
+ g->msg_in_pos = g->msg_in_consumed = 0;
+
g->main_loop = guestfs_get_default_main_loop ();
/* Start with large serial numbers so they are easy to spot
@@ -290,9 +300,10 @@ guestfs_create (void)
return g;
error:
- free (g->path);
- free (g->qemu);
- free (g->append);
+ if (g->msg_in) free (g->msg_in);
+ if (g->path) free (g->path);
+ if (g->qemu) free (g->qemu);
+ if (g->append) free (g->append);
As in my previous comment, this is wrong. free (NULL) is fine,
so you don't need the if statements.
Anyway, -1, but I will try out this patch myself next week.
Rich.
--
Richard Jones, Emerging Technologies, Red Hat
http://et.redhat.com/~rjones
virt-p2v converts physical machines to virtual machines. Boot with a
live CD or over the network (PXE) and turn machines into Xen guests.
http://et.redhat.com/~rjones/virt-p2v