VirtualBox

Changeset 26409 in vbox


Ignore:
Timestamp:
Feb 10, 2010 2:28:17 PM (15 years ago)
Author:
vboxsync
Message:

Webservice: dynamic thread allocation with a cap of 100 threads; prefix log messages with thread ID

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/VBox/Main/webservice/vboxweb.cpp

    r26398 r26409  
    102102unsigned int            g_uBindToPort = 18083;          // port
    103103unsigned int            g_uBacklog = 100;               // backlog = max queue size for requests
    104 unsigned int            g_cWorkerThreads = 10;          // no. of worker threads
     104unsigned int            g_cMaxWorkerThreads = 100;      // max. no. of worker threads
    105105
    106106bool                    g_fVerbose = false;             // be verbose
     
    117117 ****************************************************************************/
    118118
     119// The one global SOAP queue created by main().
     120class SoapQ;
     121SoapQ               *g_pSoapQ = NULL;
     122
    119123// this mutex protects the auth lib and authentication
    120124util::RWLockHandle  *g_pAuthLibLockHandle;
     
    127131ULONG64             g_cManagedObjects = 0;
    128132
     133// Threads map, so we can quickly map an RTTHREAD struct to a logger prefix
     134typedef std::map<RTTHREAD, com::Utf8Str> ThreadsMap;
     135ThreadsMap          g_mapThreads;
     136
    129137/****************************************************************************
    130138 *
    131  *  main
     139 *  Command line help
    132140 *
    133141 ****************************************************************************/
     
    187195
    188196            case 'T':
    189                 pcszDescr = "Number of worker threads to run in parallel (5).";
     197                pcszDescr = "Maximum number of worker threads to run in parallel (100).";
    190198            break;
    191199
     
    205213        RTStrmPrintf(g_pStdErr, "%-23s%s\n", str.c_str(), pcszDescr);
    206214    }
    207 }
    208 
    209 /**
    210  * Implementation for WEBLOG macro defined in vboxweb.h; this prints a message
    211  * to the console and optionally to the file that may have been given to the
    212  * vboxwebsrv command line.
    213  * @param pszFormat
    214  */
    215 void WebLog(const char *pszFormat, ...)
    216 {
    217     va_list args;
    218     va_start(args, pszFormat);
    219     char *psz = NULL;
    220     RTStrAPrintfV(&psz, pszFormat, args);
    221     va_end(args);
    222 
    223     // terminal
    224     RTPrintf("%s", psz);
    225 
    226     // log file
    227     if (g_pstrLog)
    228     {
    229         RTStrmPrintf(g_pstrLog, "%s", psz);
    230         RTStrmFlush(g_pstrLog);
    231     }
    232 
    233     // logger instance
    234     RTLogLoggerEx(LOG_INSTANCE, RTLOGGRPFLAGS_DJ, LOG_GROUP, "%s", psz);
    235 
    236     RTStrFree(psz);
    237 }
    238 
    239 /**
    240  * Helper for printing SOAP error messages.
    241  * @param soap
    242  */
    243 void WebLogSoapError(struct soap *soap)
    244 {
    245     if (soap_check_state(soap))
    246     {
    247         WebLog("Error: soap struct not initialized\n");
    248         return;
    249     }
    250 
    251     const char *pcszFaultString = *soap_faultstring(soap);
    252     const char **ppcszDetail = soap_faultcode(soap);
    253     WebLog("#### SOAP FAULT: %s [%s]\n",
    254            pcszFaultString ? pcszFaultString : "[no fault string available]",
    255            (ppcszDetail && *ppcszDetail) ? *ppcszDetail : "no details available");
    256215}
    257216
     
    297256    void process();
    298257
     258    /**
     259     * Static function that can be passed to RTThreadCreate and that calls
     260     * process() on the SoapThread instance passed as the thread parameter.
     261     * @param pThread
     262     * @param pvThread
     263     * @return
     264     */
    299265    static int fntWrapper(RTTHREAD pThread, void *pvThread)
    300266    {
     
    304270    }
    305271
    306 private:
    307272    size_t      m_u;            // thread number
    308     SoapQ       *m_pQ;
    309     struct soap *m_soap;
    310     RTTHREAD    m_pThread;
     273    SoapQ       *m_pQ;          // the single SOAP queue that all the threads service
     274    struct soap *m_soap;        // copy of the soap structure for this thread (from soap_copy())
     275    RTTHREAD    m_pThread;      // IPRT thread struct for this thread
    311276};
    312277
    313278/**
    314  * SOAP queue encapsulation. add() adds an item to the queue,
    315  * get() fetches one.
     279 * SOAP queue encapsulation. There is only one instance of this, to
     280 * which add() adds a queue item (called on the main thread),
     281 * and from which get() fetch items, called from each queue thread.
    316282 */
    317283class SoapQ
    318284{
    319285public:
    320     SoapQ(size_t cThreads, const struct soap *pSoap)
    321         : m_mutex(util::LOCKCLASS_OBJECTSTATE),
     286
     287    /**
     288     * Constructor. Creates the soap queue.
     289     * @param pSoap
     290     */
     291    SoapQ(const struct soap *pSoap)
     292        : m_soap(pSoap),
     293          m_mutex(util::LOCKCLASS_OBJECTSTATE),
    322294          m_cIdleThreads(0)
    323295    {
    324296        RTSemEventMultiCreate(&m_event);
    325 
    326         // create cThreads threads
    327         for (size_t u = 0; u < cThreads; ++u)
    328         {
    329             SoapThread *pst = new SoapThread(u + 1,
    330                                              *this,
    331                                              pSoap);
    332             m_llAllThreads.push_back(pst);
    333             ++m_cIdleThreads;
    334         }
    335297    }
    336298
     
    343305     * Adds the given socket to the SOAP queue and posts the
    344306     * member event sem to wake up the workers. Called on the main thread
    345      * whenever a socket has work to do.
     307     * whenever a socket has work to do. Creates a new SOAP thread on the
     308     * first call or when all existing threads are busy.
    346309     * @param s Socket from soap_accept() which has work to do.
    347310     */
     
    349312    {
    350313        uint32_t cItems;
    351         // enqueue the socket of this connection and post eventsem so
    352         // that one of our threads can pick it up
    353314        util::AutoWriteLock qlock(m_mutex COMMA_LOCKVAL_SRC_POS);
     315
     316        // if no threads have yet been created, or if all threads are busy,
     317        // create a new SOAP thread
     318        if (    !m_cIdleThreads
     319                // but only if we're not exceeding the global maximum (default is 100)
     320             && (m_llAllThreads.size() < g_cMaxWorkerThreads)
     321           )
     322        {
     323            SoapThread *pst = new SoapThread(m_llAllThreads.size() + 1,
     324                                             *this,
     325                                             m_soap);
     326            m_llAllThreads.push_back(pst);
     327            g_mapThreads[pst->m_pThread] = com::Utf8StrFmt("[%3u]", pst->m_u);
     328            ++m_cIdleThreads;
     329        }
     330
     331        // enqueue the socket of this connection and post eventsem so that
     332        // one of the threads (possibly the one just creatd) can pick it up
    354333        m_llSocketsQ.push_back(s);
    355334        cItems = m_llSocketsQ.size();
     
    367346     * by one, and the caller MUST call done() when it's done processing.
    368347     * Called from the worker threads.
     348     * @param cIdleThreads out: no. of threads which are currently idle (not counting the caller)
     349     * @param cThreads out: total no. of SOAP threads running
    369350     * @return
    370351     */
    371     int get(size_t &cIdleThreads)
     352    int get(size_t &cIdleThreads, size_t &cThreads)
    372353    {
    373354        while (1)
     
    382363                m_llSocketsQ.pop_front();
    383364                cIdleThreads = --m_cIdleThreads;
     365                cThreads = m_llAllThreads.size();
    384366
    385367                // reset the multi event only if the queue is now empty; otherwise
     
    408390    }
    409391
     392    const struct soap       *m_soap;            // soap structure created by main(), passed to constructor
     393
    410394    util::WriteLockHandle   m_mutex;
    411395    RTSEMEVENTMULTI         m_event;            // posted by add(), blocked on by get()
     
    427411void SoapThread::process()
    428412{
    429     WebLog("Started thread %d\n", m_u);
     413    WebLog("New SOAP thread started\n");
    430414
    431415    while (1)
    432416    {
    433417        // wait for a socket to arrive on the queue
    434         size_t cIdleThreads;
    435         m_soap->socket = m_pQ->get(cIdleThreads);
    436 
    437         WebLog("T%d handles connection from IP=%lu.%lu.%lu.%lu socket=%d (%d threads idle)\n",
    438                 m_u,
    439                 (m_soap->ip>>24)&0xFF,
    440                 (m_soap->ip>>16)&0xFF,
    441                 (m_soap->ip>>8)&0xFF,
    442                 m_soap->ip&0xFF,
    443                 m_soap->socket,
    444                 cIdleThreads);
     418        size_t cIdleThreads, cThreads;
     419        m_soap->socket = m_pQ->get(cIdleThreads, cThreads);
     420
     421        WebLog("Processing connection from IP=%lu.%lu.%lu.%lu socket=%d (%d out of %d threads idle)\n",
     422               m_u,
     423               (m_soap->ip >> 24) & 0xFF,
     424               (m_soap->ip >> 16) & 0xFF,
     425               (m_soap->ip >> 8)  & 0xFF,
     426               m_soap->ip         & 0xFF,
     427               m_soap->socket,
     428               cIdleThreads,
     429               cThreads);
    445430
    446431        // process the request; this goes into the COM code in methodmaps.cpp
     
    455440}
    456441
     442/**
     443 * Implementation for WEBLOG macro defined in vboxweb.h; this prints a message
     444 * to the console and optionally to the file that may have been given to the
     445 * vboxwebsrv command line.
     446 * @param pszFormat
     447 */
     448void WebLog(const char *pszFormat, ...)
     449{
     450    va_list args;
     451    va_start(args, pszFormat);
     452    char *psz = NULL;
     453    RTStrAPrintfV(&psz, pszFormat, args);
     454    va_end(args);
     455
     456    const char *pcszPrefix = "[   ]";
     457    ThreadsMap::iterator it = g_mapThreads.find(RTThreadSelf());
     458    if (it != g_mapThreads.end())
     459        pcszPrefix = it->second.c_str();
     460
     461    // terminal
     462    RTPrintf("%s %s", pcszPrefix, psz);
     463
     464    // log file
     465    if (g_pstrLog)
     466    {
     467        RTStrmPrintf(g_pstrLog, "%s %s", pcszPrefix, psz);
     468        RTStrmFlush(g_pstrLog);
     469    }
     470
     471    // logger instance
     472    RTLogLoggerEx(LOG_INSTANCE, RTLOGGRPFLAGS_DJ, LOG_GROUP, "%s %s", pcszPrefix, psz);
     473
     474    RTStrFree(psz);
     475}
     476
     477/**
     478 * Helper for printing SOAP error messages.
     479 * @param soap
     480 */
     481void WebLogSoapError(struct soap *soap)
     482{
     483    if (soap_check_state(soap))
     484    {
     485        WebLog("Error: soap struct not initialized\n");
     486        return;
     487    }
     488
     489    const char *pcszFaultString = *soap_faultstring(soap);
     490    const char **ppcszDetail = soap_faultcode(soap);
     491    WebLog("#### SOAP FAULT: %s [%s]\n",
     492           pcszFaultString ? pcszFaultString : "[no fault string available]",
     493           (ppcszDetail && *ppcszDetail) ? *ppcszDetail : "no details available");
     494}
     495
    457496/****************************************************************************
    458497 *
    459  * Main
     498 * SOAP queue pumper thread
    460499 *
    461500 ****************************************************************************/
    462501
    463 /**
    464  * Called from main(). This implements the loop that takes SOAP calls
    465  * from HTTP and serves them by handing sockets to the SOAP queue
    466  * worker threads.
    467  */
    468 void beginProcessing()
     502void doQueuesLoop()
    469503{
    470504    // set up gSOAP
     
    489523               m);
    490524
    491         // initialize thread queue, mutex and eventsem, create worker threads
    492         SoapQ soapq(g_cWorkerThreads, &soap);
     525        // initialize thread queue, mutex and eventsem
     526        g_pSoapQ = new SoapQ(&soap);
     527
     528        // start the SOAP processing thread
    493529
    494530        for (uint64_t i = 1;
     
    506542            // add the socket to the queue and tell worker threads to
    507543            // pick up the jobn
    508             size_t cItemsOnQ = soapq.add(s);
     544            size_t cItemsOnQ = g_pSoapQ->add(s);
    509545            WebLog("Request %llu on socket %d queued for processing (%d items on Q)\n", i, s, cItemsOnQ);
    510546
    511             // we have to process main event queue
     547            // process the COM event Q
    512548            int vrc = com::EventQueue::getMainEventQueue()->processEventQueue(0);
    513549        }
     
    515551    soap_done(&soap); // close master socket and detach environment
    516552}
     553
     554/**
     555 * Thread function for the "queue pumper" thread started from main(). This implements
     556 * the loop that takes SOAP calls from HTTP and serves them by handing sockets to the
     557 * SOAP queue worker threads.
     558 */
     559// int fntQPumper(RTTHREAD ThreadSelf, void *pvUser)
     560// {
     561//     // store a log prefix for this thread
     562//     g_mapThreads[RTThreadSelf()] = "[ P ]";
     563//
     564//     doQueuesLoop();
     565//
     566//     return 0;
     567// }
    517568
    518569/**
     
    532583    // intialize runtime
    533584    RTR3Init();
     585
     586    // store a log prefix for this thread
     587    g_mapThreads[RTThreadSelf()] = "[M  ]";
    534588
    535589    RTStrmPrintf(g_pStdErr, VBOX_PRODUCT " web service version " VBOX_VERSION_STRING "\n"
     
    576630
    577631            case 'T':
    578                 g_cWorkerThreads = ValueUnion.u32;
     632                g_cMaxWorkerThreads = ValueUnion.u32;
    579633            break;
    580634
     
    666720    g_pSessionsLockHandle = new util::RWLockHandle(util::LOCKCLASS_OBJECTSTATE);
    667721
     722    // SOAP queue pumper thread
     723//     RTTHREAD  tQPumper;
     724//     if (RTThreadCreate(&tQPumper,
     725//                        fntQPumper,
     726//                        NULL,        // pvUser
     727//                        0,           // cbStack (default)
     728//                        RTTHREADTYPE_MAIN_WORKER,
     729//                        0,           // flags
     730//                        "SoapQPumper"))
     731//     {
     732//         RTStrmPrintf(g_pStdErr, "[!] Cannot start SOAP queue pumper thread\n");
     733//         exit(1);
     734//     }
     735
     736    // watchdog thread
    668737    if (g_iWatchdogTimeoutSecs > 0)
    669738    {
     
    673742                           fntWatchdog,
    674743                           NULL,
    675                            32*1024,
     744                           0,
    676745                           RTTHREADTYPE_MAIN_WORKER,
    677746                           0,
     
    683752    }
    684753
    685     beginProcessing();
     754    doQueuesLoop();
     755
     756//     while (1)
     757//     {
     758//         // we have to process main event queue
     759//         WebLog("Pumping COM event queue\n");
     760//         int vrc = com::EventQueue::getMainEventQueue()->processEventQueue(RT_INDEFINITE_WAIT);
     761//     }
    686762
    687763    com::Shutdown();
     764
     765    return 0;
    688766}
    689767
     
    703781int fntWatchdog(RTTHREAD ThreadSelf, void *pvUser)
    704782{
     783    // store a log prefix for this thread
     784    g_mapThreads[RTThreadSelf()] = "[W  ]";
     785
    705786    WEBDEBUG(("Watchdog thread started\n"));
    706787
Note: See TracChangeset for help on using the changeset viewer.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette