ACE/FAQ/APG/ThreadManagement
Материал из Wiki.crossplatform.ru
Содержание |
[править] Async Cancel
// $Id: Async_Cancel.cpp 94362 2011-08-04 15:54:26Z mesnier_p $ #include "ace/OS_NS_unistd.h" #include "ace/Task.h" #include "ace/Log_Msg.h" #if defined (ACE_HAS_PTHREADS) && !defined (ACE_LACKS_PTHREAD_CANCEL) // Only works on Pthreads... // Listing 1 code/ch13 class CanceledTask : public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Starting thread\n"))); if (this->set_cancel_mode () < 0) return -1; while (1) { // Put this thread in a compute loop.. no // cancellation points are available. } } int set_cancel_mode (void) { cancel_state new_state; // Set the cancel state to asynchronous and enabled. new_state.cancelstate = PTHREAD_CANCEL_ENABLE; new_state.canceltype = PTHREAD_CANCEL_ASYNCHRONOUS; if (ACE_Thread::setcancelstate (new_state, 0) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("cancelstate")), -1); return 0; } }; // Listing 1 // Listing 2 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { CanceledTask task; task.activate (); ACE_OS::sleep (1); ACE_Thread_Manager::instance ()->cancel_task (&task, 1); task.wait (); return 0; } // Listing 2 #else /* ACE_HAS_PTHREADS */ int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts ("This example works on Pthreads platforms.\n"); return 0; } #endif /* ACE_HAS_PTHREADS */
[править] Coop Cancel
// $Id: Coop_Cancel.cpp 84565 2009-02-23 08:20:39Z johnnyw $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_time.h" #include "ace/OS_NS_unistd.h" #include "ace/Task.h" #include "ace/Log_Msg.h" // Listing 1 code/ch13 class CanceledTask : public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up\n"))); // Cache our ACE_Thread_Manager pointer. ACE_Thread_Manager *mgr = this->thr_mgr (); while (1) { if (mgr->testcancel (mgr->thr_self ())) return 0; ACE_Message_Block *mb = 0; ACE_Time_Value tv (0, 1000); tv += ACE_OS::time (0); int result = this->getq (mb, &tv); if (result == -1 && errno == EWOULDBLOCK) continue; else { // Do real work. } } ACE_NOTREACHED (return 0); } }; // Listing 1 // Listing 2 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { CanceledTask task; task.activate (); ACE_OS::sleep (1); ACE_Thread_Manager::instance ()->cancel_task (&task); task.wait (); return 0; } // Listing 2 #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] ExitHandler
// $Id: ExitHandler.cpp 84565 2009-02-23 08:20:39Z johnnyw $ // Listing 1 code/ch13 #include "ace/Task.h" #include "ace/Log_Msg.h" class ExitHandler : public ACE_At_Thread_Exit { public: virtual void apply (void) { ACE_DEBUG ((LM_INFO, ACE_TEXT ("(%t) is exiting\n"))); // Shut down all devices. } }; // Listing 1 // Listing 2 code/ch13 class HA_CommandHandler : public ACE_Task_Base { public: HA_CommandHandler(ExitHandler& eh) : eh_(eh) { } virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up\n"))); this->thr_mgr ()->at_exit (eh_); // Do something. // Forcefully exit. ACE_Thread::exit (); // NOT REACHED return 0; } private: ExitHandler& eh_; }; // Listing 2 // Listing 3 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { ExitHandler eh; HA_CommandHandler handler (eh); handler.activate (); ACE_Thread_Manager::instance ()->wait (); return 0; } // Listing 3 #if 0 // Listing 4 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { ExitHandler eh; ACE_Thread_Manager tm; HA_CommandHandler handler (eh); handler.thr_mgr (&tm); handler.activate (); tm.wait(); return 0; } // Listing 4 #endif
[править] Pool
// $Id: Pool.cpp 91626 2010-09-07 10:59:20Z johnnyw $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/Task.h" #include "ace/Log_Msg.h" // Listing 1 code/ch13 class HA_CommandHandler : public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up\n"))); ACE_Message_Block *mb = 0; if (this->getq (mb) == -1) return -1; // ... do something with the message. return 0; } }; // Listing 1 // Listing 2 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { HA_CommandHandler handler; // Create 4 threads. handler.activate (THR_NEW_LWP | THR_JOINABLE, 4); handler.wait (); return 0; } // Listing 2 #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] Priorities
// $Id: Priorities.cpp 80826 2008-03-04 14:51:23Z wotte $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/Task.h" #include "ace/Log_Msg.h" #include "ace/OS_NS_unistd.h" // Listing 2 code/ch13 class HA_CommandHandler : public ACE_Task<ACE_MT_SYNCH> { public: HA_CommandHandler (const char *name) : name_ (name) { } virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up %C\n"), name_)); ACE_OS::sleep (2); ACE_Message_Block *mb = 0; while (this->getq (mb) != -1) { if (mb->msg_type () == ACE_Message_Block::MB_BREAK) { mb->release (); break; } process_message (mb); mb->release (); } return 0; } void process_message (ACE_Message_Block *) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Processing message %C\n"), name_)); // Simulate compute bound task. for (int i = 0; i < 100; i++) ; } private: const char *name_; }; // Listing 2 #if !defined (ACE_THR_PRI_OTHER_MAX) // This should be fixed in ACE... There's no _MAX, _MIN values for // thread priorities. #if defined (ACE_WIN32) # define ACE_THR_PRI_OTHER_MAX ((ACE_THR_PRI_OTHER_DEF) + 1) #elif defined (VXWORKS) # define ACE_THR_PRI_OTHER_MAX 0 #endif #endif // Listing 1 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { HA_CommandHandler hp_handler ("HighPriority"); hp_handler.activate (THR_NEW_LWP | THR_JOINABLE, 1, 1, ACE_THR_PRI_OTHER_MAX); HA_CommandHandler lp_handler ("LowPriority"); lp_handler.activate (THR_NEW_LWP | THR_JOINABLE, 1, 1, ACE_THR_PRI_OTHER_DEF); ACE_Message_Block mb; for (int i = 0; i < 100; i++) { ACE_Message_Block *mb_hp, *mb_lp; mb_hp = mb.clone (); mb_lp = mb.clone (); hp_handler.putq (mb_hp); lp_handler.putq (mb_lp); } ACE_Message_Block stop (0, ACE_Message_Block::MB_BREAK); hp_handler.putq (stop.clone ()); lp_handler.putq (stop.clone ()); hp_handler.wait (); lp_handler.wait (); return 0; } // Listing 1 #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] SecurityContext.h
/** * $Id: SecurityContext.h 80826 2008-03-04 14:51:23Z wotte $ * * Sample code from The ACE Programmer's Guide, * copyright 2003 Addison-Wesley. All Rights Reserved. */ #ifndef __SECURITYCONTEXT_H_ #define __SECURITYCONTEXT_H_ struct SecurityContext { const char * user; }; #endif /* __SECURITYCONTEXT_H_ */
[править] Signals
// $Id: Signals.cpp 80826 2008-03-04 14:51:23Z wotte $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_time.h" #include "ace/OS_NS_unistd.h" #include "ace/Task.h" #include "ace/Log_Msg.h" #include "ace/Signal.h" #include "ace/Sig_Handler.h" // Listing 1 code/ch13 class SignalableTask : public ACE_Task<ACE_MT_SYNCH> { public: virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0) { if (signum == SIGUSR1) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) received a %S signal\n"), signum)); handle_alert (); } return 0; } virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Starting thread\n"))); while (1) { ACE_Message_Block* mb = 0; ACE_Time_Value tv (0, 1000); tv += ACE_OS::time (0); int result = this->getq (mb, &tv); if (result == -1 && errno == EWOULDBLOCK) continue; else process_message (mb); } ACE_NOTREACHED (return 0); } void handle_alert (); void process_message (ACE_Message_Block *mb); }; // Listing 1 void SignalableTask::process_message (ACE_Message_Block *) { } void SignalableTask::handle_alert () { } // Listing 2 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { SignalableTask handler; handler.activate (THR_NEW_LWP | THR_JOINABLE , 5); ACE_Sig_Handler sh; sh.register_handler (SIGUSR1, &handler); ACE_OS::sleep (1); ACE_Thread_Manager::instance () -> kill_grp (handler.grp_id (), SIGUSR1); handler.wait (); return 0; } // Listing 2 #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] Signals2
// $Id: Signals2.cpp 84565 2009-02-23 08:20:39Z johnnyw $ #include "ace/config-lite.h" #if defined (ACE_HAS_THREADS) #include "ace/OS_NS_time.h" #include "ace/OS_NS_unistd.h" #include "ace/Task.h" #include "ace/Log_Msg.h" #include "ace/Signal.h" #include "ace/Sig_Handler.h" class SignalableTask : public ACE_Task<ACE_MT_SYNCH> { public: virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0) { if (signum == SIGUSR1) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) received a %S signal\n"), signum)); handle_alert(); } return 0; } virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Starting thread\n"))); while (1) { ACE_Message_Block* mb = 0; ACE_Time_Value tv (0, 1000); tv += ACE_OS::time (0); int result = this->getq(mb, &tv); if (result == -1 && errno == EWOULDBLOCK) continue; else process_message (mb); } ACE_NOTREACHED (return 0); } void handle_alert (); void process_message (ACE_Message_Block *mb); }; void SignalableTask::process_message (ACE_Message_Block *) { return; } void SignalableTask::handle_alert (void) { return; } // Listing 1 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Main thread\n"))); SignalableTask handler; handler.activate (THR_NEW_LWP | THR_JOINABLE, 5); ACE_Sig_Handler sh; sh.register_handler (SIGUSR1, &handler); ACE_OS::sleep (1); // Allow threads to start for (int i = 0; i < 5; i++) ACE_OS::kill (ACE_OS::getpid (), SIGUSR1); handler.wait (); return 0; } // Listing 1 #else #include "ace/OS_main.h" #include "ace/OS_NS_stdio.h" int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_OS::puts (ACE_TEXT ("This example requires threads.")); return 0; } #endif /* ACE_HAS_THREADS */
[править] Start Hook
// $Id: Start_Hook.cpp 84565 2009-02-23 08:20:39Z johnnyw $ #include "ace/Thread_Hook.h" #include "ace/Task.h" #include "ace/Log_Msg.h" #include "SecurityContext.h" // Listing 1 code/ch13 class HA_ThreadHook : public ACE_Thread_Hook { public: virtual ACE_THR_FUNC_RETURN start (ACE_THR_FUNC func, void* arg) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%t) New Thread Spawned\n"))); // Create the context on the thread's own stack. ACE_TSS<SecurityContext> secCtx; // Special initialization. add_sec_context_thr (secCtx); return (*func) (arg); } void add_sec_context_thr (ACE_TSS<SecurityContext> &secCtx); }; // Listing 1 void HA_ThreadHook::add_sec_context_thr(ACE_TSS<SecurityContext> &secCtx) { secCtx->user = 0; } class HA_CommandHandler : public ACE_Task_Base { public: virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) starting up\n"))); // Do something. return 0; } }; // Listing 2 code/ch13 int ACE_TMAIN (int, ACE_TCHAR *[]) { HA_ThreadHook hook; ACE_Thread_Hook::thread_hook (&hook); HA_CommandHandler handler; handler.activate (); handler.wait(); return 0; } // Listing 2
[править] State
// $Id: State.cpp 91813 2010-09-17 07:52:52Z johnnyw $ #include "ace/Task.h" class HA_CommandHandler : public ACE_Task_Base { public: virtual int svc (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Handler Thread running\n"))); return 0; } }; int ACE_TMAIN (int, ACE_TCHAR *[]) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Main Thread running\n"))); // Listing 1 code/ch13 HA_CommandHandler handler; int result = handler.activate (THR_NEW_LWP | THR_JOINABLE | THR_SUSPENDED); ACE_ASSERT (result == 0); ACE_UNUSED_ARG (result); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) The current thread count is %d\n"), handler.thr_count ())); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) The group identifier is %d\n"), handler.grp_id ())); handler.resume (); handler.wait (); // Listing 1 return 0; }