ACE/FAQ/APG/Proactor

Материал из Wiki.crossplatform.ru

Перейти к: навигация, поиск

Содержание


[править] HA_Proactive_Status.h

/* -*- C++ -*- */
/*
** $Id: HA_Proactive_Status.h 80826 2008-03-04 14:51:23Z wotte $
**
** Example program from The ACE Programmer's Guide, Chapter 8.
** Copyright 2003 Addison-Wesley. All Rights Reserved.
*/
 
#ifndef __HA_PROACTIVE_STATUS_H
#define __HA_PROACTIVE_STATUS_H
 
#include "ace/OS_NS_sys_socket.h"
#include "ace/Asynch_IO.h"
#include "ace/Service_Object.h"
// Listing 1 code/ch08
#include "ace/Asynch_IO.h"
 
#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
 
class HA_Proactive_Service : public ACE_Service_Handler
{
public:
  ~HA_Proactive_Service ()
   {
     if (this->handle () != ACE_INVALID_HANDLE)
       ACE_OS::closesocket (this->handle ());
   }
 
  virtual void open (ACE_HANDLE h, ACE_Message_Block&);
 
  // This method will be called when an asynchronous read
  // completes on a stream.
  virtual void handle_read_stream
    (const ACE_Asynch_Read_Stream::Result &result);
 
  // This method will be called when an asynchronous write
  // completes on a stream.
  virtual void handle_write_stream
    (const ACE_Asynch_Write_Stream::Result &result);
 
private:
  ACE_Asynch_Read_Stream reader_;
  ACE_Asynch_Write_Stream writer_;
};
// Listing 1
 
// Listing 2 code/ch08
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
 
class HA_Proactive_Acceptor :
    public ACE_Asynch_Acceptor<HA_Proactive_Service>
{
public:
  virtual int validate_connection
    (const ACE_Asynch_Accept::Result& result,
     const ACE_INET_Addr &remote,
     const ACE_INET_Addr &local);
};
// Listing 2
 
#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */
 
#if 0
// Listing 3 code/ch08
template <class HANDLER>
class ACE_Asynch_Acceptor : public ACE_Handler
      ...
protected:
  virtual HANDLER *make_handler (void)
  {
    return new HANDLER;
  }
// Listing 3
#endif
 
class HA_Proactive_Status : public ACE_Service_Object
{
public:
  // Initializes object when dynamic linking occurs.
  virtual int init (int argc, ACE_TCHAR *argv[]);
 
  // Terminates object when dynamic unlinking occurs.
  virtual int fini (void);
};
 
#endif /* __HA_PROACTIVE_STATUS_H */

[править] HA_Proactive_Status.cpp

/*
** $Id: HA_Proactive_Status.cpp 91626 2010-09-07 10:59:20Z johnnyw $
**
** Example program from The ACE Programmer's Guide, Chapter 8.
** Copyright 2003 Addison-Wesley. All Rights Reserved.
*/
 
#include "HA_Proactive_Status.h"
#include "ace/Log_Msg.h"
#include "ace/Message_Block.h"
#include "ace/Proactor.h"
#include "ace/os_include/arpa/os_inet.h"
 
#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
 
// Listing 1 code/ch08
void
HA_Proactive_Service::open (ACE_HANDLE h, ACE_Message_Block&)
{
  this->handle (h);
  if (this->reader_.open (*this) != 0 ||
      this->writer_.open (*this) != 0   )
    {
      ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
                  ACE_TEXT ("HA_Proactive_Service open")));
      delete this;
      return;
    }
 
  ACE_Message_Block *mb = 0;
  ACE_NEW_NORETURN (mb, ACE_Message_Block (1024));
  if (this->reader_.read (*mb, mb->space ()) != 0)
    {
      ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
                  ACE_TEXT ("HA_Proactive_Service begin read")));
      mb->release ();
      delete this;
      return;
    }
 
  // mb is now controlled by Proactor framework.
  return;
}
// Listing 1
 
// Listing 2 code/ch08
void
HA_Proactive_Service::handle_read_stream
  (const ACE_Asynch_Read_Stream::Result &result)
{
  ACE_Message_Block &mb = result.message_block ();
  if (!result.success () || result.bytes_transferred () == 0)
    {
      mb.release ();
      delete this;
    }
  else
    {
      if (this->writer_.write (mb, mb.length ()) != 0)
        {
          ACE_ERROR ((LM_ERROR,
                      ACE_TEXT ("%p\n"),
                      ACE_TEXT ("starting write")));
          mb.release ();
        }
      else
        {
          ACE_Message_Block *new_mb;
          ACE_NEW_NORETURN (new_mb, ACE_Message_Block (1024));
          this->reader_.read (*new_mb, new_mb->space ());
        }
    }
  return;
}
// Listing 2
 
// Listing 3 code/ch08
void
HA_Proactive_Service::handle_write_stream
(const ACE_Asynch_Write_Stream::Result &result)
{
  result.message_block ().release ();
  return;
}
// Listing 3
 
// The network address check only works for BSD-ish systems. This
// sort of network number accessor should be added to ACE_INET_Addr
// at some point...
#if defined (ACE_WIN32)
int
HA_Proactive_Acceptor::validate_connection
(const ACE_Asynch_Accept::Result&,
 const ACE_INET_Addr&,
 const ACE_INET_Addr&)
{
  return 0;
}
#else
 
// Listing 4 code/ch08
int
HA_Proactive_Acceptor::validate_connection (
   const ACE_Asynch_Accept::Result&,
   const ACE_INET_Addr& remote,
   const ACE_INET_Addr& local)
{
  struct in_addr *remote_addr =
    reinterpret_cast<struct in_addr*> (remote.get_addr ());
  struct in_addr *local_addr =
    reinterpret_cast<struct in_addr*> (local.get_addr ());
  if (inet_netof (*local_addr) == inet_netof (*remote_addr))
    return 0;
 
  return -1;
}
// Listing 4
 
#endif /* ACE_WIN32 */
 
int
ACE_TMAIN (int, ACE_TCHAR *[])
{
  // Listing 5 code/ch08
  ACE_INET_Addr listen_addr;     // Set up with listen port
  HA_Proactive_Acceptor aio_acceptor;
  if (0 != aio_acceptor.open (listen_addr,
                              0,     // bytes_to_read
                              0,     // pass_addresses
                              ACE_DEFAULT_ASYNCH_BACKLOG,
                              1,     // reuse_addr
                              0,     // proactor
                              1))    // validate_new_connection
    ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
                       ACE_TEXT ("acceptor open")), 1);
  // Listing 5
 
#if 0
  // Listing 6 code/ch08
  ACE_INET_Addr peer_addr;    // Set up peer addr
  ACE_Asynch_Connector<HA_Proactive_Service> aio_connect;
  aio_connect.connect (peer_addr);
  // Listing 6
#endif
 
  // Listing 7 code/ch08
  ACE_Proactor::instance ()->proactor_run_event_loop ();
  // Listing 7
  return 0;
}
 
#else
 
int
ACE_TMAIN (int, ACE_TCHAR *[])
{
  ACE_DEBUG ((LM_DEBUG,
              ACE_TEXT ("This example requires asynchronous I/O support.\n")));
  return 1;
}
 
#endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */