ACE/FAQ/APG/ThreadPools
Материал из Wiki.crossplatform.ru
Содержание |
[править] Futures
// $Id: Futures.cpp 94312 2011-07-11 00:39:42Z schmidt $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_string.h" #include "ace/OS_NS_time.h" #include "ace/Task.h" #include "ace/Unbounded_Queue.h" #include "ace/Synch.h" #include "ace/SString.h" #include "ace/Method_Request.h" #include "ace/Future.h" #include "ace/Activation_Queue.h" #include "ace/Condition_T.h" #define OUTSTANDING_REQUESTS 20 // Listing 2 code/ch16 class CompletionCallBack: public ACE_Future_Observer<ACE_CString*> { public: virtual void update (const ACE_Future<ACE_CString*> & future) { ACE_CString *result = 0; // Block for the result. future.get (result); ACE_DEBUG ((LM_INFO, ACE_TEXT("%C\n"), result->c_str ())); delete result; } }; // Listing 2 // Listing 1 code/ch16 class LongWork : public ACE_Method_Request { public: virtual int call (void) { ACE_TRACE ("LongWork::call"); ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Attempting long work task\n"))); ACE_OS::sleep (1); char buf[1024]; ACE_OS::strcpy (buf, "Completed assigned task\n"); ACE_CString *msg; ACE_NEW_RETURN (msg, ACE_CString (buf, ACE_OS::strlen (buf) + 1), -1); result_.set (msg); return 0; } ACE_Future<ACE_CString*> &future (void) { ACE_TRACE ("LongWork::future"); return result_; } void attach (CompletionCallBack *cb) { result_.attach (cb); } private: ACE_Future<ACE_CString*> result_; }; // Listing 1 class Exit : public ACE_Method_Request { public: virtual int call (void) { ACE_TRACE ("Exit::call"); return -1; } }; class Worker; class IManager { public: virtual ~IManager (void) { } virtual int return_to_work (Worker *worker) = 0; }; // Listing 3 code/ch16 class Worker: public ACE_Task<ACE_MT_SYNCH> { public: Worker (IManager *manager) : manager_(manager), queue_ (msg_queue ()) { } int perform (ACE_Method_Request *req) { ACE_TRACE ("Worker::perform"); return this->queue_.enqueue (req); } virtual int svc (void) { thread_id_ = ACE_Thread::self (); while (1) { ACE_Method_Request *request = this->queue_.dequeue(); if (request == 0) return -1; // Invoke the request int result = request->call (); if (result == -1) break; // Return to work. this->manager_->return_to_work (this); } return 0; } ACE_thread_t thread_id (void); private: IManager *manager_; ACE_thread_t thread_id_; ACE_Activation_Queue queue_; }; // Listing 3 ACE_thread_t Worker::thread_id (void) { return thread_id_; } // Listing 4 code/ch16 class Manager : public ACE_Task_Base, private IManager { public: enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; Manager () : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_) { ACE_TRACE ("Manager"); } int perform (ACE_Method_Request *req) { ACE_TRACE ("perform"); return this->queue_.enqueue (req); } int svc (void) { ACE_TRACE ("svc"); ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); // Create pool when you get in the first time. create_worker_pool (); while (!done ()) { ACE_Time_Value tv ((long)MAX_TIMEOUT); tv += ACE_OS::time (0); // Get the next message ACE_Method_Request *request = this->queue_.dequeue (&tv); if (request == 0) { shut_down (); break; } // Choose a worker. Worker *worker = choose_worker (); // Ask the worker to do the job. worker->perform (request); } return 0; } int shut_down (void); virtual int return_to_work (Worker *worker) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Worker returning to work.\n"))); this->workers_.enqueue_tail (worker); this->workers_cond_.signal (); return 0; } private: Worker *choose_worker (void) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, 0) while (this->workers_.is_empty ()) workers_cond_.wait (); Worker *worker = 0; this->workers_.dequeue_head (worker); return worker; } int create_worker_pool (void) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); for (int i = 0; i < POOL_SIZE; i++) { Worker *worker; ACE_NEW_RETURN (worker, Worker (this), -1); this->workers_.enqueue_tail (worker); worker->activate (); } return 0; } int done (void) { return (shutdown_ == 1); } ACE_thread_t thread_id (Worker *worker) { return worker->thread_id (); } private: int shutdown_; ACE_Thread_Mutex workers_lock_; ACE_Condition<ACE_Thread_Mutex> workers_cond_; ACE_Unbounded_Queue<Worker* > workers_; ACE_Activation_Queue queue_; }; // Listing 4 int Manager::shut_down (void) { ACE_TRACE ("Manager::shut_down"); ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin (); Worker **worker_ptr = 0; do { iter.next (worker_ptr); Worker *worker = (*worker_ptr); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %d\n"), thread_id (worker))); Exit *req; ACE_NEW_RETURN (req, Exit(), -1); // Send the hangup message worker->perform (req); // Wait for the exit. worker->wait (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Worker %d shut down.\n"), thread_id (worker))); delete req; delete worker; } while (iter.advance ()); shutdown_ = 1; return 0; } // Listing 5 code/ch16 int ACE_TMAIN (int, ACE_TCHAR *[]) { Manager tp; tp.activate (); ACE_Time_Value tv; tv.msec (100); // Wait for a few seconds every time you send a message. CompletionCallBack cb; LongWork workArray[OUTSTANDING_REQUESTS]; for (int i = 0; i < OUTSTANDING_REQUESTS; i++) { workArray[i].attach (&cb); ACE_OS::sleep (tv); tp.perform (&workArray[i]); } ACE_Thread_Manager::instance ()->wait (); return 0; } // Listing 5 #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] Thread Pool
// $Id: LF_ThreadPool.cpp 94310 2011-07-09 19:10:06Z schmidt $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_string.h" #include "ace/OS_NS_sys_time.h" #include "ace/Task.h" #include "ace/Containers.h" #include "ace/Synch.h" #include "ace/Condition_T.h" // Listing 4 code/ch16 class Follower { public: Follower (ACE_Thread_Mutex &leader_lock) : cond_(leader_lock) { owner_ = ACE_Thread::self (); } //FUZZ: disable check_for_lack_ACE_OS int wait (void) { return this->cond_.wait (); } int signal (void) { return this->cond_.signal (); } //FUZZ: enable check_for_lack_ACE_OS ACE_thread_t owner (void) { return this->owner_; } private: ACE_Condition<ACE_Thread_Mutex> cond_; ACE_thread_t owner_; }; // Listing 4 // Listing 1 code/ch16 class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH> { public: LF_ThreadPool () : shutdown_(0), current_leader_(0) { ACE_TRACE ("LF_ThreadPool::TP"); } virtual int svc (void); void shut_down (void) { shutdown_ = 1; } private: int become_leader (void); Follower *make_follower (void); int elect_new_leader (void); int leader_active (void) { ACE_TRACE ("LF_ThreadPool::leader_active"); return this->current_leader_ != 0; } void leader_active (ACE_thread_t leader) { ACE_TRACE ("LF_ThreadPool::leader_active"); this->current_leader_ = leader; } void process_message (ACE_Message_Block *mb); int done (void) { return (shutdown_ == 1); } private: int shutdown_; ACE_thread_t current_leader_; ACE_Thread_Mutex leader_lock_; ACE_Unbounded_Queue<Follower*> followers_; ACE_Thread_Mutex followers_lock_; static long LONG_TIME; }; // Listing 1 // Listing 2 code/ch16 int LF_ThreadPool::svc (void) { ACE_TRACE ("LF_ThreadPool::svc"); while (!done ()) { become_leader (); // Block until this thread is the leader. ACE_Message_Block *mb = 0; ACE_Time_Value tv (LONG_TIME); tv += ACE_OS::gettimeofday (); // Get a message, elect new leader, then process message. if (this->getq (mb, &tv) < 0) { if (elect_new_leader () == 0) break; continue; } elect_new_leader (); process_message (mb); } return 0; } // Listing 2 // Listing 3 code/ch16 int LF_ThreadPool::become_leader (void) { ACE_TRACE ("LF_ThreadPool::become_leader"); ACE_GUARD_RETURN (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1); if (leader_active ()) { Follower *fw = make_follower (); { // Wait until told to do so. while (leader_active ()) fw->wait (); } delete fw; } ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader\n"))); // Mark yourself as the active leader. leader_active (ACE_Thread::self ()); return 0; } Follower* LF_ThreadPool::make_follower (void) { ACE_TRACE ("LF_ThreadPool::make_follower"); ACE_GUARD_RETURN (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0); Follower *fw; ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0); this->followers_.enqueue_tail (fw); return fw; } // Listing 3 // Listing 5 code/ch16 int LF_ThreadPool::elect_new_leader (void) { ACE_TRACE ("LF_ThreadPool::elect_new_leader"); ACE_GUARD_RETURN (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1); leader_active (0); // Wake up a follower if (!followers_.is_empty ()) { ACE_GUARD_RETURN (ACE_Thread_Mutex, follower_mon, this->followers_lock_, -1); // Get the old follower. Follower *fw; if (this->followers_.dequeue_head (fw) != 0) return -1; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Resigning and Electing %d\n"), fw->owner ())); return (fw->signal () == 0) ? 0 : -1; } else { ACE_DEBUG ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left\n"))); return -1; } } // Listing 5 void LF_ThreadPool::process_message (ACE_Message_Block *mb) { ACE_TRACE ("LF_ThreadPool::process_message"); int msgId; ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int)); mb->release (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Started processing message:%d\n"), msgId)); ACE_OS::sleep (1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Finished processing message:%d\n"), msgId)); } long LF_ThreadPool::LONG_TIME = 5L; int ACE_TMAIN (int, ACE_TCHAR *[]) { LF_ThreadPool tp; tp.activate (THR_NEW_LWP| THR_JOINABLE, 5); // Wait for a few seconds... ACE_OS::sleep (2); ACE_Time_Value tv (1L); ACE_Message_Block *mb = 0; for (int i = 0; i < 30; i++) { ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1); ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); ACE_OS::sleep (tv); // Add a new work item. tp.putq (mb); } ACE_Thread_Manager::instance ()->wait (); ACE_OS::sleep (10); return 0; } #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] Request Handler
/** * $Id: Request_Handler.h 80826 2008-03-04 14:51:23Z wotte $ * * Sample code from The ACE Programmer's Guide, * copyright 2003 Addison-Wesley. All Rights Reserved. */ #ifndef __REQUEST_HANDLER_H_ #define __REQUEST_HANDLER_H_ #include "ace/Svc_Handler.h" #include "ace/SOCK_Stream.h" ACE_BEGIN_VERSIONED_NAMESPACE_DECL class ACE_Thread_Manager; ACE_END_VERSIONED_NAMESPACE_DECL class Request_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> { // = TITLE // This class is the Svc_Handler used by <Acceptor>. public: Request_Handler (ACE_Thread_Manager *tm = 0); // The default constructor makes sure the right reactor is used. protected: virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask = 0); private: size_t nr_msgs_rcvd_; }; #endif /* __REQUEST_HANDLER_H_ */
[править] Thread Pool Reactor
// == == == == == == == == == == == == == == == == == == == == == == == // $Id: TP_Reactor.cpp 82610 2008-08-12 19:46:36Z parsons $ // Stolen from $ACE_ROOT/tests/Thread_Pool_Reactor_Test.cpp // Thread_Pool_Reactor_Test.cpp, v 1.29 2001/03/20 01:07:21 irfan Exp // = AUTHOR // Irfan Pyarali <irfan@cs.wustl.edu> and // Nanbor Wang <nanbor@cs.wustl.edu> // == == == == == == == == == == == == == == == == == == == == == == == #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_string.h" #include "ace/OS_NS_unistd.h" #include "ace/SOCK_Connector.h" #include "ace/SOCK_Acceptor.h" #include "ace/Acceptor.h" #include "ace/Thread_Manager.h" #include "ace/TP_Reactor.h" #include "ace/Truncate.h" #include "Request_Handler.h" // Accepting end point. This is actually "localhost:10010", but some // platform couldn't resolve the name so we use the IP address // directly here. static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010"); // Total number of server threads. static size_t svr_thrno = 5; // Total number of client threads. static size_t cli_runs = 2; // Total connection attemps of a client thread. static size_t cli_conn_no = 2; // Total requests a client thread sends. static size_t cli_req_no = 5; // Delay before a thread sending the next request (in msec.) static int req_delay = 50; typedef ACE_Strategy_Acceptor <Request_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR; Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr) : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr), nr_msgs_rcvd_(0) { this->reactor (ACE_Reactor::instance ()); } int Request_Handler::handle_input (ACE_HANDLE fd) { ACE_TCHAR buffer[BUFSIZ]; ACE_TCHAR len = 0; ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR)); if (result > 0 && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR)) == static_cast<ssize_t> (len * sizeof (ACE_TCHAR))) { ++this->nr_msgs_rcvd_; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) svr input; fd: 0x%x; input: %s\n"), fd, buffer)); if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0) ACE_Reactor::instance()->end_reactor_event_loop (); return 0; } else ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Request_Handler: 0x%x peer closed (0x%x)\n"), this, fd)); return -1; } int Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) svr close; fd: 0x%x, rcvd %d msgs\n"), fd, this->nr_msgs_rcvd_)); if (this->nr_msgs_rcvd_ != cli_req_no) ACE_ERROR((LM_ERROR, ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"), this, cli_req_no, this->nr_msgs_rcvd_)); this->destroy (); return 0; } // Listing 2 code/ch16 static int reactor_event_hook (ACE_Reactor *) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) handling events ....\n"))); return 0; } class ServerTP : public ACE_Task_Base { public: virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Running the event loop\n"))); int result = ACE_Reactor::instance ()->run_reactor_event_loop (&reactor_event_hook); if (result == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Error handling events")), 0); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Done handling events.\n"))); return 0; } }; // Listing 2 class Client: public ACE_Task_Base { public: Client() :addr_(rendezvous) {} virtual int svc() { ACE_OS::sleep (3); const ACE_TCHAR *msg = ACE_TEXT ("Message from Connection worker"); ACE_TCHAR buf [BUFSIZ]; buf[0] = ACE_Utils::truncate_cast<ACE_TCHAR> (ACE_OS::strlen (msg) + 1); ACE_OS::strcpy (&buf[1], msg); for (size_t i = 0; i < cli_runs; i++) send_work_to_server(buf); shut_down(); return 0; } private: void send_work_to_server(ACE_TCHAR* arg) { ACE_SOCK_Stream stream; ACE_SOCK_Connector connect; ACE_Time_Value delay (0, req_delay); size_t len = * reinterpret_cast<ACE_TCHAR *> (arg); for (size_t i = 0 ; i < cli_conn_no; i++) { if (connect.connect (stream, addr_) < 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("connect"))); continue; } for (size_t j = 0; j < cli_req_no; j++) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Sending work to server on handle 0x%x, req %d\n"), stream.get_handle (), j+1)); if (stream.send_n (arg, (len + 1) * sizeof (ACE_TCHAR)) == -1) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("send_n"))); continue; } ACE_OS::sleep (delay); } stream.close (); } } void shut_down() { ACE_SOCK_Stream stream; ACE_SOCK_Connector connect; if (connect.connect (stream, addr_) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p Error while connecting\n"), ACE_TEXT ("connect"))); const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown"); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("shutdown stream handle = %x\n"), stream.get_handle ())); if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1) ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("send_n"))); stream.close (); } private: ACE_INET_Addr addr_; }; // Listing 1 code/ch16 int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_TP_Reactor sr; ACE_Reactor new_reactor (&sr); ACE_Reactor::instance (&new_reactor); ACCEPTOR acceptor; ACE_INET_Addr accept_addr (rendezvous); if (acceptor.open (accept_addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("open")), 1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Spawning %d server threads...\n"), svr_thrno)); ServerTP serverTP; serverTP.activate (THR_NEW_LWP | THR_JOINABLE, ACE_Utils::truncate_cast<int> (svr_thrno)); Client client; client.activate (); ACE_Thread_Manager::instance ()->wait (); return 0; } // Listing 1 #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] Task Thread Pool
// $Id: Task_ThreadPool.cpp 94310 2011-07-09 19:10:06Z schmidt $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_string.h" #include "ace/OS_NS_time.h" #include "ace/Task.h" #include "ace/Synch.h" #include "ace/SString.h" // Listing 2 code/ch16 class Workers : public ACE_Task<ACE_MT_SYNCH> { public: Workers () { } virtual int svc (void) { while (1) { ACE_Message_Block *mb = 0; if (this->getq (mb) == -1) { ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Shutting down\n"))); break; } // Process the message. process_message (mb); } return 0; } // Listing 2 private: void process_message (ACE_Message_Block *mb) { ACE_TRACE ("Workers::process_message"); int msgId; ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int)); mb->release (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Started processing message %d\n"), msgId)); ACE_OS::sleep (3); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Finished processing message %d\n"), msgId)); } }; // Listing 1 code/ch16 class Manager : public ACE_Task<ACE_MT_SYNCH> { public: enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; Manager () : shutdown_(0) { ACE_TRACE ("Manager::Manager"); } int svc (void) { ACE_TRACE ("Manager::svc"); ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); // Create pool. Workers pool; pool.activate (THR_NEW_LWP | THR_JOINABLE, POOL_SIZE); while (!done ()) { ACE_Message_Block *mb = 0; ACE_Time_Value tv ((long)MAX_TIMEOUT); tv += ACE_OS::time (0); // Get a message request. if (this->getq (mb, &tv) < 0) { pool.msg_queue ()->deactivate (); pool.wait (); break; } // Ask the worker pool to do the job. pool.putq (mb); } return 0; } private: int done (void); int shutdown_; }; // Listing 1 int Manager::done (void) { return (shutdown_ == 1); } int ACE_TMAIN (int, ACE_TCHAR *[]) { Manager tp; tp.activate (); // Wait for a moment every time you send a message. ACE_Time_Value tv; tv.msec (100); ACE_Message_Block *mb = 0; for (int i = 0; i < 30; i++) { ACE_NEW_RETURN (mb, ACE_Message_Block(sizeof(int)), -1); ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); ACE_OS::sleep (tv); // Add a new work item. tp.putq (mb); } ACE_Thread_Manager::instance ()->wait (); return 0; } #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] Thread Pool
// $Id: ThreadPool.cpp 94310 2011-07-09 19:10:06Z schmidt $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_string.h" #include "ace/OS_NS_time.h" #include "ace/Task.h" #include "ace/Containers.h" #include "ace/Synch.h" #include "ace/SString.h" #include "ace/Method_Request.h" #include "ace/Future.h" #include "ace/Activation_Queue.h" #include "ace/Condition_T.h" class Worker; class IManager { public: virtual ~IManager (void) { } virtual int return_to_work (Worker *worker) = 0; }; // Listing 2 code/ch16 class Worker : public ACE_Task<ACE_MT_SYNCH> { public: Worker (IManager *manager) : manager_(manager) { } virtual int svc (void) { ACE_Thread_ID id; thread_id_ = id; while (1) { ACE_Message_Block *mb = 0; if (this->getq (mb) == -1) ACE_ERROR_BREAK ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq"))); if (mb->msg_type () == ACE_Message_Block::MB_HANGUP) { ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Shutting down\n"))); mb->release (); break; } // Process the message. process_message (mb); // Return to work. this->manager_->return_to_work (this); } return 0; } // Listing 2 const ACE_Thread_ID& thread_id (void) { return this->thread_id_; } private: void process_message (ACE_Message_Block *mb) { ACE_TRACE ("Worker::process_message"); int msgId; ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int)); mb->release (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Started processing message %d\n"), msgId)); ACE_OS::sleep (3); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Finished processing message %d\n"), msgId)); } IManager *manager_; ACE_Thread_ID thread_id_; }; // Listing 1 code/ch16 class Manager: public ACE_Task<ACE_MT_SYNCH>, private IManager { public: enum {POOL_SIZE = 5, MAX_TIMEOUT = 5}; Manager () : shutdown_(0), workers_lock_(), workers_cond_(workers_lock_) { ACE_TRACE ("Manager::Manager"); } int svc (void) { ACE_TRACE ("Manager::svc"); ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) Manager started\n"))); // Create pool. create_worker_pool (); while (!done ()) { ACE_Message_Block *mb = 0; ACE_Time_Value tv ((long)MAX_TIMEOUT); tv += ACE_OS::time (0); // Get a message request. if (this->getq (mb, &tv) < 0) { shut_down (); break; } // Choose a worker. Worker *worker = 0; { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); while (this->workers_.is_empty ()) workers_cond_.wait (); this->workers_.dequeue_head (worker); } // Ask the worker to do the job. worker->putq (mb); } return 0; } int shut_down (void); const ACE_Thread_ID& thread_id (Worker *worker); virtual int return_to_work (Worker *worker) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Worker %t returning to work.\n"))); this->workers_.enqueue_tail (worker); this->workers_cond_.signal (); return 0; } private: int create_worker_pool (void) { ACE_GUARD_RETURN (ACE_Thread_Mutex, worker_mon, this->workers_lock_, -1); for (int i = 0; i < POOL_SIZE; i++) { Worker *worker = 0; ACE_NEW_RETURN (worker, Worker (this), -1); this->workers_.enqueue_tail (worker); worker->activate (); } return 0; } int done (void); private: int shutdown_; ACE_Thread_Mutex workers_lock_; ACE_Condition<ACE_Thread_Mutex> workers_cond_; ACE_Unbounded_Queue<Worker* > workers_; }; // Listing 1 int Manager::done (void) { return (shutdown_ == 1); } int Manager::shut_down (void) { ACE_TRACE ("Manager::shut_down"); ACE_Unbounded_Queue<Worker* >::ITERATOR iter = this->workers_.begin (); Worker **worker_ptr = 0; do { iter.next (worker_ptr); Worker *worker = (*worker_ptr); ACE_Thread_ID id = thread_id (worker); char buf [65]; id.to_string (buf); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Attempting shutdown of %C\n"), buf)); // Send the hangup message. ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP), -1); worker->putq (mb); // Wait for the exit. worker->wait (); ACE_ASSERT (worker->msg_queue ()->is_empty ()); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Worker %C shut down.\n"), buf)); delete worker; } while (iter.advance ()); shutdown_ = 1; return 0; } const ACE_Thread_ID& Manager::thread_id (Worker *worker) { return worker->thread_id (); } int ACE_TMAIN (int, ACE_TCHAR *[]) { Manager tp; tp.activate (); // Wait for a moment every time you send a message. ACE_Time_Value tv; tv.msec (100); ACE_Message_Block *mb = 0; for (int i = 0; i < 30; i++) { ACE_NEW_RETURN (mb, ACE_Message_Block(sizeof(int)), -1); ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); ACE_OS::sleep (tv); // Add a new work item. tp.putq (mb); } ACE_Thread_Manager::instance ()->wait (); return 0; } #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */