[libvirt] [PATCH 3/5] rpc: invoke the message dispatch callback with client unlocked

Daniel P. Berrangé posted 5 patches 7 years, 2 months ago
[libvirt] [PATCH 3/5] rpc: invoke the message dispatch callback with client unlocked
Posted by Daniel P. Berrangé 7 years, 2 months ago
Currently if the virNetServer instance is created with max_workers==0 to
request a non-threaded dispatch process, we deadlock during dispatch

  #0  0x00007fb845f6f42d in __lll_lock_wait () from /lib64/libpthread.so.0
  #1  0x00007fb845f681d3 in pthread_mutex_lock () from /lib64/libpthread.so.0
  #2  0x000055a6628bb305 in virMutexLock (m=<optimized out>) at util/virthread.c:89
  #3  0x000055a6628a984b in virObjectLock (anyobj=<optimized out>) at util/virobject.c:435
  #4  0x000055a66286fcde in virNetServerClientIsAuthenticated (client=client@entry=0x55a663a7b960)
      at rpc/virnetserverclient.c:1565
  #5  0x000055a66286cc17 in virNetServerProgramDispatchCall (msg=0x55a663a7bc50, client=0x55a663a7b960,
      server=0x55a663a77550, prog=0x55a663a78020) at rpc/virnetserverprogram.c:407
  #6  virNetServerProgramDispatch (prog=prog@entry=0x55a663a78020, server=server@entry=0x55a663a77550,
      client=client@entry=0x55a663a7b960, msg=msg@entry=0x55a663a7bc50) at rpc/virnetserverprogram.c:307
  #7  0x000055a662871d56 in virNetServerProcessMsg (msg=0x55a663a7bc50, prog=0x55a663a78020, client=0x55a663a7b960,
      srv=0x55a663a77550) at rpc/virnetserver.c:148
  #8  virNetServerDispatchNewMessage (client=0x55a663a7b960, msg=0x55a663a7bc50, opaque=0x55a663a77550)
      at rpc/virnetserver.c:227
  #9  0x000055a66286e4c0 in virNetServerClientDispatchRead (client=client@entry=0x55a663a7b960)
      at rpc/virnetserverclient.c:1322
  #10 0x000055a66286e813 in virNetServerClientDispatchEvent (sock=<optimized out>, events=1, opaque=0x55a663a7b960)
      at rpc/virnetserverclient.c:1507
  #11 0x000055a662899be0 in virEventPollDispatchHandles (fds=0x55a663a7bdc0, nfds=<optimized out>)
      at util/vireventpoll.c:508
  #12 virEventPollRunOnce () at util/vireventpoll.c:657
  #13 0x000055a6628982f1 in virEventRunDefaultImpl () at util/virevent.c:327
  #14 0x000055a6628716d5 in virNetDaemonRun (dmn=0x55a663a771b0) at rpc/virnetdaemon.c:858
  #15 0x000055a662864c1d in main (argc=<optimized out>, argv=0x7ffd105b4838) at logging/log_daemon.c:1235

Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
---
 src/rpc/virnetserverclient.c | 79 ++++++++++++++++++++++++++++++--------------
 1 file changed, 55 insertions(+), 24 deletions(-)

diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c
index ea0d5abdee..fb4775235d 100644
--- a/src/rpc/virnetserverclient.c
+++ b/src/rpc/virnetserverclient.c
@@ -143,7 +143,7 @@ VIR_ONCE_GLOBAL_INIT(virNetServerClient)
 
 static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque);
 static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
-static void virNetServerClientDispatchRead(virNetServerClientPtr client);
+static virNetMessagePtr virNetServerClientDispatchRead(virNetServerClientPtr client);
 static int virNetServerClientSendMessageLocked(virNetServerClientPtr client,
                                                virNetMessagePtr msg);
 
@@ -340,18 +340,41 @@ virNetServerClientCheckAccess(virNetServerClientPtr client)
 }
 #endif
 
+static void virNetServerClientDispatchMessage(virNetServerClientPtr client,
+                                              virNetMessagePtr msg)
+{
+    virObjectLock(client);
+    if (!client->dispatchFunc) {
+        virNetMessageFree(msg);
+        client->wantClose = true;
+        virObjectUnlock(client);
+    } else {
+        virObjectUnlock(client);
+        /* Accessing 'client' is safe, because virNetServerClientSetDispatcher
+         * only permits setting 'dispatchFunc' once, so if non-NULL, it will
+         * never change again
+         */
+        client->dispatchFunc(client, msg, client->dispatchOpaque);
+    }
+}
+
 
 static void virNetServerClientSockTimerFunc(int timer,
                                             void *opaque)
 {
     virNetServerClientPtr client = opaque;
+    virNetMessagePtr msg = NULL;
     virObjectLock(client);
     virEventUpdateTimeout(timer, -1);
     /* Although client->rx != NULL when this timer is enabled, it might have
      * changed since the client was unlocked in the meantime. */
     if (client->rx)
-        virNetServerClientDispatchRead(client);
+        msg = virNetServerClientDispatchRead(client);
     virObjectUnlock(client);
+
+    if (msg) {
+        virNetServerClientDispatchMessage(client, msg);
+    }
 }
 
 
@@ -950,8 +973,13 @@ void virNetServerClientSetDispatcher(virNetServerClientPtr client,
                                      void *opaque)
 {
     virObjectLock(client);
-    client->dispatchFunc = func;
-    client->dispatchOpaque = opaque;
+    /* Only set dispatcher if not already set, to avoid race
+     * with dispatch code that runs without locks held
+     */
+    if (!client->dispatchFunc) {
+        client->dispatchFunc = func;
+        client->dispatchOpaque = opaque;
+    }
     virObjectUnlock(client);
 }
 
@@ -1196,26 +1224,32 @@ static ssize_t virNetServerClientRead(virNetServerClientPtr client)
 
 
 /*
- * Read data until we get a complete message to process
+ * Read data until we get a complete message to process.
+ * If a complete message is available, it will be returned
+ * from this method, for dispatch by the caller.
+ *
+ * Returns a complete message for dispatch, or NULL if none is
+ * yet available, or an error occurred. On error, the wantClose
+ * flag will be set.
  */
-static void virNetServerClientDispatchRead(virNetServerClientPtr client)
+static virNetMessagePtr virNetServerClientDispatchRead(virNetServerClientPtr client)
 {
  readmore:
     if (client->rx->nfds == 0) {
         if (virNetServerClientRead(client) < 0) {
             client->wantClose = true;
-            return; /* Error */
+            return NULL; /* Error */
         }
     }
 
     if (client->rx->bufferOffset < client->rx->bufferLength)
-        return; /* Still not read enough */
+        return NULL; /* Still not read enough */
 
     /* Either done with length word header */
     if (client->rx->bufferLength == VIR_NET_MESSAGE_LEN_MAX) {
         if (virNetMessageDecodeLength(client->rx) < 0) {
             client->wantClose = true;
-            return;
+            return NULL;
         }
 
         virNetServerClientUpdateEvent(client);
@@ -1236,7 +1270,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
             virNetMessageQueueServe(&client->rx);
             virNetMessageFree(msg);
             client->wantClose = true;
-            return;
+            return NULL;
         }
 
         /* Now figure out if we need to read more data to get some
@@ -1246,7 +1280,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
                 virNetMessageQueueServe(&client->rx);
                 virNetMessageFree(msg);
                 client->wantClose = true;
-                return; /* Error */
+                return NULL; /* Error */
             }
 
             /* Try getting the file descriptors (may fail if blocking) */
@@ -1256,7 +1290,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
                     virNetMessageQueueServe(&client->rx);
                     virNetMessageFree(msg);
                     client->wantClose = true;
-                    return;
+                    return NULL;
                 }
                 if (rv == 0) /* Blocking */
                     break;
@@ -1270,7 +1304,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
                  * again next time we run this method
                  */
                 client->rx->bufferOffset = client->rx->bufferLength;
-                return;
+                return NULL;
             }
         }
 
@@ -1313,16 +1347,6 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
             }
         }
 
-        /* Send off to for normal dispatch to workers */
-        if (msg) {
-            if (!client->dispatchFunc) {
-                virNetMessageFree(msg);
-                client->wantClose = true;
-            } else {
-                client->dispatchFunc(client, msg, client->dispatchOpaque);
-            }
-        }
-
         /* Possibly need to create another receive buffer */
         if (client->nrequests < client->nrequests_max) {
             if (!(client->rx = virNetMessageNew(true))) {
@@ -1338,6 +1362,8 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
             }
         }
         virNetServerClientUpdateEvent(client);
+
+        return msg;
     }
 }
 
@@ -1482,6 +1508,7 @@ static void
 virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
 {
     virNetServerClientPtr client = opaque;
+    virNetMessagePtr msg = NULL;
 
     virObjectLock(client);
 
@@ -1504,7 +1531,7 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
                 virNetServerClientDispatchWrite(client);
             if (events & VIR_EVENT_HANDLE_READABLE &&
                 client->rx)
-                virNetServerClientDispatchRead(client);
+                msg = virNetServerClientDispatchRead(client);
 #if WITH_GNUTLS
         }
 #endif
@@ -1517,6 +1544,10 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
         client->wantClose = true;
 
     virObjectUnlock(client);
+
+    if (msg) {
+        virNetServerClientDispatchMessage(client, msg);
+    }
 }
 
 
-- 
2.14.3

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 3/5] rpc: invoke the message dispatch callback with client unlocked
Posted by John Ferlan 7 years, 2 months ago

On 03/06/2018 12:58 PM, Daniel P. Berrangé wrote:
> Currently if the virNetServer instance is created with max_workers==0 to
> request a non-threaded dispatch process, we deadlock during dispatch
> 
>   #0  0x00007fb845f6f42d in __lll_lock_wait () from /lib64/libpthread.so.0
>   #1  0x00007fb845f681d3 in pthread_mutex_lock () from /lib64/libpthread.so.0
>   #2  0x000055a6628bb305 in virMutexLock (m=<optimized out>) at util/virthread.c:89
>   #3  0x000055a6628a984b in virObjectLock (anyobj=<optimized out>) at util/virobject.c:435
>   #4  0x000055a66286fcde in virNetServerClientIsAuthenticated (client=client@entry=0x55a663a7b960)
>       at rpc/virnetserverclient.c:1565
>   #5  0x000055a66286cc17 in virNetServerProgramDispatchCall (msg=0x55a663a7bc50, client=0x55a663a7b960,
>       server=0x55a663a77550, prog=0x55a663a78020) at rpc/virnetserverprogram.c:407
>   #6  virNetServerProgramDispatch (prog=prog@entry=0x55a663a78020, server=server@entry=0x55a663a77550,
>       client=client@entry=0x55a663a7b960, msg=msg@entry=0x55a663a7bc50) at rpc/virnetserverprogram.c:307
>   #7  0x000055a662871d56 in virNetServerProcessMsg (msg=0x55a663a7bc50, prog=0x55a663a78020, client=0x55a663a7b960,
>       srv=0x55a663a77550) at rpc/virnetserver.c:148
>   #8  virNetServerDispatchNewMessage (client=0x55a663a7b960, msg=0x55a663a7bc50, opaque=0x55a663a77550)
>       at rpc/virnetserver.c:227
>   #9  0x000055a66286e4c0 in virNetServerClientDispatchRead (client=client@entry=0x55a663a7b960)
>       at rpc/virnetserverclient.c:1322
>   #10 0x000055a66286e813 in virNetServerClientDispatchEvent (sock=<optimized out>, events=1, opaque=0x55a663a7b960)
>       at rpc/virnetserverclient.c:1507
>   #11 0x000055a662899be0 in virEventPollDispatchHandles (fds=0x55a663a7bdc0, nfds=<optimized out>)
>       at util/vireventpoll.c:508
>   #12 virEventPollRunOnce () at util/vireventpoll.c:657
>   #13 0x000055a6628982f1 in virEventRunDefaultImpl () at util/virevent.c:327
>   #14 0x000055a6628716d5 in virNetDaemonRun (dmn=0x55a663a771b0) at rpc/virnetdaemon.c:858
>   #15 0x000055a662864c1d in main (argc=<optimized out>, argv=0x7ffd105b4838) at logging/log_daemon.c:1235
> 
> Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
> ---
>  src/rpc/virnetserverclient.c | 79 ++++++++++++++++++++++++++++++--------------
>  1 file changed, 55 insertions(+), 24 deletions(-)
> 

I had the same syntax-check notes that Jim had.

Beyond that since both callers Unlock prior to calling
virNetServerClientDispatchMessage and the code relocks right away, would
calling with client lock'd still be prudent/possible?  Callers would
then be able to

    if (msg)
        virNetServerClientDispatchMessage(...)
    else
        Unlock(client)

Once we get to virNetServerProgramDispatch it expects an unlocked
client. Maybe we should comment some of the other callers in the path to
indicate whether server/client need to be locked. Not required though -
only helpful for future spelunkers of this code.

[...]

> @@ -1313,16 +1347,6 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
>              }
>          }
>  
> -        /* Send off to for normal dispatch to workers */
> -        if (msg) {
> -            if (!client->dispatchFunc) {
> -                virNetMessageFree(msg);
> -                client->wantClose = true;
> -            } else {
> -                client->dispatchFunc(client, msg, client->dispatchOpaque);
> -            }
> -        }
> -

I thought about the order change here w/r/t the following code being run
before the dispatch options above; however, that would perhaps more of a
concern for the path where we have no workers. If we have a worker, then
it gets queued and the following would then occur first anyway, right?
So, IOW, I think the reordering here is fine. It would happen before the
worker got around to handling the client and moving it around to
virNetServerClientDispatchMessage could get ugly w/r/t locks.

Reviewed-by: John Ferlan <jferlan@redhat.com>

John


>          /* Possibly need to create another receive buffer */
>          if (client->nrequests < client->nrequests_max) {
>              if (!(client->rx = virNetMessageNew(true))) {
> @@ -1338,6 +1362,8 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
>              }
>          }
>          virNetServerClientUpdateEvent(client);
> +
> +        return msg;
>      }
>  }
>  

[...]

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 3/5] rpc: invoke the message dispatch callback with client unlocked
Posted by Daniel P. Berrangé 7 years, 2 months ago
On Wed, Mar 07, 2018 at 06:48:25PM -0500, John Ferlan wrote:
> 
> 
> On 03/06/2018 12:58 PM, Daniel P. Berrangé wrote:
> > Currently if the virNetServer instance is created with max_workers==0 to
> > request a non-threaded dispatch process, we deadlock during dispatch
> > 
> >   #0  0x00007fb845f6f42d in __lll_lock_wait () from /lib64/libpthread.so.0
> >   #1  0x00007fb845f681d3 in pthread_mutex_lock () from /lib64/libpthread.so.0
> >   #2  0x000055a6628bb305 in virMutexLock (m=<optimized out>) at util/virthread.c:89
> >   #3  0x000055a6628a984b in virObjectLock (anyobj=<optimized out>) at util/virobject.c:435
> >   #4  0x000055a66286fcde in virNetServerClientIsAuthenticated (client=client@entry=0x55a663a7b960)
> >       at rpc/virnetserverclient.c:1565
> >   #5  0x000055a66286cc17 in virNetServerProgramDispatchCall (msg=0x55a663a7bc50, client=0x55a663a7b960,
> >       server=0x55a663a77550, prog=0x55a663a78020) at rpc/virnetserverprogram.c:407
> >   #6  virNetServerProgramDispatch (prog=prog@entry=0x55a663a78020, server=server@entry=0x55a663a77550,
> >       client=client@entry=0x55a663a7b960, msg=msg@entry=0x55a663a7bc50) at rpc/virnetserverprogram.c:307
> >   #7  0x000055a662871d56 in virNetServerProcessMsg (msg=0x55a663a7bc50, prog=0x55a663a78020, client=0x55a663a7b960,
> >       srv=0x55a663a77550) at rpc/virnetserver.c:148
> >   #8  virNetServerDispatchNewMessage (client=0x55a663a7b960, msg=0x55a663a7bc50, opaque=0x55a663a77550)
> >       at rpc/virnetserver.c:227
> >   #9  0x000055a66286e4c0 in virNetServerClientDispatchRead (client=client@entry=0x55a663a7b960)
> >       at rpc/virnetserverclient.c:1322
> >   #10 0x000055a66286e813 in virNetServerClientDispatchEvent (sock=<optimized out>, events=1, opaque=0x55a663a7b960)
> >       at rpc/virnetserverclient.c:1507
> >   #11 0x000055a662899be0 in virEventPollDispatchHandles (fds=0x55a663a7bdc0, nfds=<optimized out>)
> >       at util/vireventpoll.c:508
> >   #12 virEventPollRunOnce () at util/vireventpoll.c:657
> >   #13 0x000055a6628982f1 in virEventRunDefaultImpl () at util/virevent.c:327
> >   #14 0x000055a6628716d5 in virNetDaemonRun (dmn=0x55a663a771b0) at rpc/virnetdaemon.c:858
> >   #15 0x000055a662864c1d in main (argc=<optimized out>, argv=0x7ffd105b4838) at logging/log_daemon.c:1235
> > 
> > Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
> > ---
> >  src/rpc/virnetserverclient.c | 79 ++++++++++++++++++++++++++++++--------------
> >  1 file changed, 55 insertions(+), 24 deletions(-)
> > 
> 
> I had the same syntax-check notes that Jim had.
> 
> Beyond that since both callers Unlock prior to calling
> virNetServerClientDispatchMessage and the code relocks right away, would
> calling with client lock'd still be prudent/possible?  Callers would
> then be able to
> 
>     if (msg)
>         virNetServerClientDispatchMessage(...)
>     else
>         Unlock(client)
> 
> Once we get to virNetServerProgramDispatch it expects an unlocked
> client. Maybe we should comment some of the other callers in the path to
> indicate whether server/client need to be locked. Not required though -
> only helpful for future spelunkers of this code.

I generally like to avoid situations where one method locks the object and
then calls another method which unlocks it. IMHO, it leads to harder to
understand code where the lock/unlock calls are spread out.

Since we own the reference on client, I think unlocking & locking again
is harmless here.

> > @@ -1313,16 +1347,6 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
> >              }
> >          }
> >  
> > -        /* Send off to for normal dispatch to workers */
> > -        if (msg) {
> > -            if (!client->dispatchFunc) {
> > -                virNetMessageFree(msg);
> > -                client->wantClose = true;
> > -            } else {
> > -                client->dispatchFunc(client, msg, client->dispatchOpaque);
> > -            }
> > -        }
> > -
> 
> I thought about the order change here w/r/t the following code being run
> before the dispatch options above; however, that would perhaps more of a
> concern for the path where we have no workers. If we have a worker, then
> it gets queued and the following would then occur first anyway, right?
> So, IOW, I think the reordering here is fine. It would happen before the
> worker got around to handling the client and moving it around to
> virNetServerClientDispatchMessage could get ugly w/r/t locks.

Yeah, that was my rationale too - since worker threads are asynchronous
we're effectively running the second block first regardless, so the
ordering should be harmless.

> 
> Reviewed-by: John Ferlan <jferlan@redhat.com>
> 
> John
> 
> 
> >          /* Possibly need to create another receive buffer */
> >          if (client->nrequests < client->nrequests_max) {
> >              if (!(client->rx = virNetMessageNew(true))) {
> > @@ -1338,6 +1362,8 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
> >              }
> >          }
> >          virNetServerClientUpdateEvent(client);
> > +
> > +        return msg;
> >      }
> >  }
> >  
> 
> [...]

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list
Re: [libvirt] [PATCH 3/5] rpc: invoke the message dispatch callback with client unlocked
Posted by Jim Fehlig 7 years, 2 months ago
On 03/06/2018 10:58 AM, Daniel P. Berrangé wrote:
> Currently if the virNetServer instance is created with max_workers==0 to
> request a non-threaded dispatch process, we deadlock during dispatch
> 
>    #0  0x00007fb845f6f42d in __lll_lock_wait () from /lib64/libpthread.so.0
>    #1  0x00007fb845f681d3 in pthread_mutex_lock () from /lib64/libpthread.so.0
>    #2  0x000055a6628bb305 in virMutexLock (m=<optimized out>) at util/virthread.c:89
>    #3  0x000055a6628a984b in virObjectLock (anyobj=<optimized out>) at util/virobject.c:435
>    #4  0x000055a66286fcde in virNetServerClientIsAuthenticated (client=client@entry=0x55a663a7b960)
>        at rpc/virnetserverclient.c:1565
>    #5  0x000055a66286cc17 in virNetServerProgramDispatchCall (msg=0x55a663a7bc50, client=0x55a663a7b960,
>        server=0x55a663a77550, prog=0x55a663a78020) at rpc/virnetserverprogram.c:407
>    #6  virNetServerProgramDispatch (prog=prog@entry=0x55a663a78020, server=server@entry=0x55a663a77550,
>        client=client@entry=0x55a663a7b960, msg=msg@entry=0x55a663a7bc50) at rpc/virnetserverprogram.c:307
>    #7  0x000055a662871d56 in virNetServerProcessMsg (msg=0x55a663a7bc50, prog=0x55a663a78020, client=0x55a663a7b960,
>        srv=0x55a663a77550) at rpc/virnetserver.c:148
>    #8  virNetServerDispatchNewMessage (client=0x55a663a7b960, msg=0x55a663a7bc50, opaque=0x55a663a77550)
>        at rpc/virnetserver.c:227
>    #9  0x000055a66286e4c0 in virNetServerClientDispatchRead (client=client@entry=0x55a663a7b960)
>        at rpc/virnetserverclient.c:1322
>    #10 0x000055a66286e813 in virNetServerClientDispatchEvent (sock=<optimized out>, events=1, opaque=0x55a663a7b960)
>        at rpc/virnetserverclient.c:1507
>    #11 0x000055a662899be0 in virEventPollDispatchHandles (fds=0x55a663a7bdc0, nfds=<optimized out>)
>        at util/vireventpoll.c:508
>    #12 virEventPollRunOnce () at util/vireventpoll.c:657
>    #13 0x000055a6628982f1 in virEventRunDefaultImpl () at util/virevent.c:327
>    #14 0x000055a6628716d5 in virNetDaemonRun (dmn=0x55a663a771b0) at rpc/virnetdaemon.c:858
>    #15 0x000055a662864c1d in main (argc=<optimized out>, argv=0x7ffd105b4838) at logging/log_daemon.c:1235
> 
> Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
> ---
>   src/rpc/virnetserverclient.c | 79 ++++++++++++++++++++++++++++++--------------
>   1 file changed, 55 insertions(+), 24 deletions(-)
> 
> diff --git a/src/rpc/virnetserverclient.c b/src/rpc/virnetserverclient.c
> index ea0d5abdee..fb4775235d 100644
> --- a/src/rpc/virnetserverclient.c
> +++ b/src/rpc/virnetserverclient.c
> @@ -143,7 +143,7 @@ VIR_ONCE_GLOBAL_INIT(virNetServerClient)
>   
>   static void virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque);
>   static void virNetServerClientUpdateEvent(virNetServerClientPtr client);
> -static void virNetServerClientDispatchRead(virNetServerClientPtr client);
> +static virNetMessagePtr virNetServerClientDispatchRead(virNetServerClientPtr client);
>   static int virNetServerClientSendMessageLocked(virNetServerClientPtr client,
>                                                  virNetMessagePtr msg);
>   
> @@ -340,18 +340,41 @@ virNetServerClientCheckAccess(virNetServerClientPtr client)
>   }
>   #endif
>   
> +static void virNetServerClientDispatchMessage(virNetServerClientPtr client,
> +                                              virNetMessagePtr msg)
> +{
> +    virObjectLock(client);
> +    if (!client->dispatchFunc) {
> +        virNetMessageFree(msg);
> +        client->wantClose = true;
> +        virObjectUnlock(client);
> +    } else {
> +        virObjectUnlock(client);
> +        /* Accessing 'client' is safe, because virNetServerClientSetDispatcher
> +         * only permits setting 'dispatchFunc' once, so if non-NULL, it will
> +         * never change again
> +         */
> +        client->dispatchFunc(client, msg, client->dispatchOpaque);
> +    }
> +}
> +
>   
>   static void virNetServerClientSockTimerFunc(int timer,
>                                               void *opaque)
>   {
>       virNetServerClientPtr client = opaque;
> +    virNetMessagePtr msg = NULL;
>       virObjectLock(client);
>       virEventUpdateTimeout(timer, -1);
>       /* Although client->rx != NULL when this timer is enabled, it might have
>        * changed since the client was unlocked in the meantime. */
>       if (client->rx)
> -        virNetServerClientDispatchRead(client);
> +        msg = virNetServerClientDispatchRead(client);
>       virObjectUnlock(client);
> +
> +    if (msg) {
> +        virNetServerClientDispatchMessage(client, msg);
> +    }

syntax-check complains about curly brackets around single-line body.

>   }
>   
>   
> @@ -950,8 +973,13 @@ void virNetServerClientSetDispatcher(virNetServerClientPtr client,
>                                        void *opaque)
>   {
>       virObjectLock(client);
> -    client->dispatchFunc = func;
> -    client->dispatchOpaque = opaque;
> +    /* Only set dispatcher if not already set, to avoid race
> +     * with dispatch code that runs without locks held
> +     */
> +    if (!client->dispatchFunc) {
> +        client->dispatchFunc = func;
> +        client->dispatchOpaque = opaque;
> +    }
>       virObjectUnlock(client);
>   }
>   
> @@ -1196,26 +1224,32 @@ static ssize_t virNetServerClientRead(virNetServerClientPtr client)
>   
>   
>   /*
> - * Read data until we get a complete message to process
> + * Read data until we get a complete message to process.
> + * If a complete message is available, it will be returned
> + * from this method, for dispatch by the caller.
> + *
> + * Returns a complete message for dispatch, or NULL if none is
> + * yet available, or an error occurred. On error, the wantClose
> + * flag will be set.
>    */
> -static void virNetServerClientDispatchRead(virNetServerClientPtr client)
> +static virNetMessagePtr virNetServerClientDispatchRead(virNetServerClientPtr client)
>   {
>    readmore:
>       if (client->rx->nfds == 0) {
>           if (virNetServerClientRead(client) < 0) {
>               client->wantClose = true;
> -            return; /* Error */
> +            return NULL; /* Error */
>           }
>       }
>   
>       if (client->rx->bufferOffset < client->rx->bufferLength)
> -        return; /* Still not read enough */
> +        return NULL; /* Still not read enough */
>   
>       /* Either done with length word header */
>       if (client->rx->bufferLength == VIR_NET_MESSAGE_LEN_MAX) {
>           if (virNetMessageDecodeLength(client->rx) < 0) {
>               client->wantClose = true;
> -            return;
> +            return NULL;
>           }
>   
>           virNetServerClientUpdateEvent(client);
> @@ -1236,7 +1270,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
>               virNetMessageQueueServe(&client->rx);
>               virNetMessageFree(msg);
>               client->wantClose = true;
> -            return;
> +            return NULL;
>           }
>   
>           /* Now figure out if we need to read more data to get some
> @@ -1246,7 +1280,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
>                   virNetMessageQueueServe(&client->rx);
>                   virNetMessageFree(msg);
>                   client->wantClose = true;
> -                return; /* Error */
> +                return NULL; /* Error */
>               }
>   
>               /* Try getting the file descriptors (may fail if blocking) */
> @@ -1256,7 +1290,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
>                       virNetMessageQueueServe(&client->rx);
>                       virNetMessageFree(msg);
>                       client->wantClose = true;
> -                    return;
> +                    return NULL;
>                   }
>                   if (rv == 0) /* Blocking */
>                       break;
> @@ -1270,7 +1304,7 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
>                    * again next time we run this method
>                    */
>                   client->rx->bufferOffset = client->rx->bufferLength;
> -                return;
> +                return NULL;
>               }
>           }
>   
> @@ -1313,16 +1347,6 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
>               }
>           }
>   
> -        /* Send off to for normal dispatch to workers */
> -        if (msg) {
> -            if (!client->dispatchFunc) {
> -                virNetMessageFree(msg);
> -                client->wantClose = true;
> -            } else {
> -                client->dispatchFunc(client, msg, client->dispatchOpaque);
> -            }
> -        }
> -
>           /* Possibly need to create another receive buffer */
>           if (client->nrequests < client->nrequests_max) {
>               if (!(client->rx = virNetMessageNew(true))) {
> @@ -1338,6 +1362,8 @@ static void virNetServerClientDispatchRead(virNetServerClientPtr client)
>               }
>           }
>           virNetServerClientUpdateEvent(client);
> +
> +        return msg;
>       }
>   }
>   
> @@ -1482,6 +1508,7 @@ static void
>   virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
>   {
>       virNetServerClientPtr client = opaque;
> +    virNetMessagePtr msg = NULL;
>   
>       virObjectLock(client);
>   
> @@ -1504,7 +1531,7 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
>                   virNetServerClientDispatchWrite(client);
>               if (events & VIR_EVENT_HANDLE_READABLE &&
>                   client->rx)
> -                virNetServerClientDispatchRead(client);
> +                msg = virNetServerClientDispatchRead(client);
>   #if WITH_GNUTLS
>           }
>   #endif
> @@ -1517,6 +1544,10 @@ virNetServerClientDispatchEvent(virNetSocketPtr sock, int events, void *opaque)
>           client->wantClose = true;
>   
>       virObjectUnlock(client);
> +
> +    if (msg) {
> +        virNetServerClientDispatchMessage(client, msg);
> +    }

Same here. Looks good otherwise, but again I haven't spent a lot of time in this 
area of the code.

Reviewed-by: Jim Fehlig <jfehlig@suse.com>

Regards,
Jim

--
libvir-list mailing list
libvir-list@redhat.com
https://www.redhat.com/mailman/listinfo/libvir-list