ACE/FAQ/APG/Streams
Материал из Wiki.crossplatform.ru
[править] An answering machine based on a one-way ACE Stream
/** * $Id: Answerer.cpp 94345 2011-07-24 04:29:17Z mesnier_p $ * * Streams Listing 01 * * An answering machine based on a one-way ACE_Stream */ #include "ace/OS_NS_string.h" #include "ace/Stream.h" #include "ace/Message_Block.h" #include "ace/FILE_IO.h" #include "MessageInfo.h" #include "Message.h" #include "BasicTask.h" #include "EndTask.h" #include "Util.h" #include "RecordingDevice.h" // Listing 21 code/ch18 class AnswerIncomingCall : public BasicTask { protected: virtual int process (Message *message) { ACE_TRACE ("AnswerIncomingCall::process()"); if (message->recorder ()->answer_call () < 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("AnswerIncomingCall")), -1); return 0; } }; // Listing 21 // Listing 22 code/ch18 class GetCallerId : public BasicTask { protected: virtual int process (Message *message) { ACE_TRACE ("GetCallerId::process()"); CallerId *id; id = message->recorder ()->retrieve_callerId (); if (!id) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("GetCallerId")), -1); message->caller_id (id); return 0; } }; // Listing 22 // Listing 23 code/ch18 class PlayOutgoingMessage : public BasicTask { protected: virtual int process (Message *message) { ACE_TRACE ("PlayOutgoingMessage::process()"); ACE_FILE_Addr outgoing_message = this->get_outgoing_message (message); int pmrv = message->recorder ()->play_message (outgoing_message); if (pmrv < 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("PlayOutgoingMessage")), -1); return 0; } ACE_FILE_Addr get_outgoing_message (Message *) { // Exclude 23 return ACE_FILE_Addr (ACE_TEXT ("/tmp/outgoing_message")); // Exclude 23 } }; // Listing 23 // Listing 24 code/ch18 class RecordIncomingMessage : public BasicTask { protected: virtual int process (Message *message) { ACE_TRACE ("RecordIncomingMessage::process()"); ACE_FILE_Addr incoming_message = this->get_incoming_message_queue (); MessageType *type = message->recorder ()->record_message (incoming_message); if (!type) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("RecordIncomingMessage")), -1); message->incoming_message (incoming_message, type); return 0; } ACE_FILE_Addr get_incoming_message_queue (void) { // Exclude 24 return ACE_FILE_Addr (ACE_TEXT ("/tmp/incoming_message")); // Exclude 24 } }; // Listing 24 // Listing 25 code/ch18 class ReleaseDevice : public BasicTask { protected: virtual int process (Message *message) { ACE_TRACE ("ReleaseDevice::process()"); message->recorder ()->release (); return 0; } }; // Listing 25 // Listing 26 code/ch18 class EncodeMessage : public BasicTask { protected: virtual int process (Message *message) { ACE_TRACE ("EncodeMessage::process()"); ACE_FILE_Addr &incoming = message->addr (); ACE_FILE_Addr addr = this->get_message_destination (message); if (message->is_text ()) Util::convert_to_unicode (incoming, addr); else if (message->is_audio ()) Util::convert_to_mp3 (incoming, addr); else if (message->is_video ()) Util::convert_to_mpeg (incoming, addr); message->addr (addr); return 0; } ACE_FILE_Addr get_message_destination (Message *) { // Exclude 26 return ACE_FILE_Addr (ACE_TEXT ("/tmp/encoded_message")); // Exclude 26 } }; // Listing 26 // Listing 27 code/ch18 class SaveMetaData : public BasicTask { protected: virtual int process (Message *message) { ACE_TRACE ("SaveMetaData::process()"); ACE_TString path (message->addr ().get_path_name ()); path += ACE_TEXT (".xml"); ACE_FILE_Connector connector; ACE_FILE_IO file; ACE_FILE_Addr addr (path.c_str ()); if (connector.connect (file, addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("create meta-data file")), 0); file.truncate (0); this->write (file, "<Message>\n"); // ... this->write (file, "</Message>\n"); file.close (); return 0; } private: //FUZZ: disable check_for_lack_ACE_OS int write (ACE_FILE_IO &file, const char *str) { //FUZZ: enable check_for_lack_ACE_OS return file.send (str, ACE_OS::strlen (str)); } }; // Listing 27 // Listing 28 code/ch18 class NotifySomeone : public BasicTask { protected: virtual int process (Message *message) { ACE_TRACE ("NotifySomeone::process()"); // Format an email to tell someone about the // newly received message. // ... // Display message information in the logfile ACE_DEBUG ((LM_INFO, ACE_TEXT ("New message from %s ") ACE_TEXT ("received and stored at %s\n"), message->caller_id ()->string (), message->addr ().get_path_name ())); return 0; } }; // Listing 28 // Listing 10 code/ch18 class RecordingStream : public ACE_Stream<ACE_MT_SYNCH> { public: typedef ACE_Stream<ACE_MT_SYNCH> inherited; typedef ACE_Module<ACE_MT_SYNCH> Module; RecordingStream () : inherited() { } // Listing 10 //FUZZ: disable check_for_lack_ACE_OS // Listing 1000 code/ch18 virtual int open (void *arg, Module *head = 0, Module *tail = 0) { //FUZZ: enable check_for_lack_ACE_OS if (tail == 0) ACE_NEW_RETURN (tail, Module (ACE_TEXT ("End Module"), new TheEndTask ()), -1); this->inherited::open (arg, head, tail); // Listing 1000 // Listing 1001 code/ch18 Module *answerIncomingCallModule; ACE_NEW_RETURN (answerIncomingCallModule, Module (ACE_TEXT ("Answer Incoming Call"), new AnswerIncomingCall ()), -1); // Listing 11 code/ch18 Module *getCallerIdModule; ACE_NEW_RETURN (getCallerIdModule, Module (ACE_TEXT ("Get Caller ID"), new GetCallerId ()), -1); // Listing 11 Module *playOGMModule; ACE_NEW_RETURN (playOGMModule, Module (ACE_TEXT ("Play Outgoing Message"), new PlayOutgoingMessage ()), -1); Module *recordModule; ACE_NEW_RETURN (recordModule, Module (ACE_TEXT ("Record Incoming Message"), new RecordIncomingMessage ()), -1); Module *releaseModule; ACE_NEW_RETURN (releaseModule, Module (ACE_TEXT ("Release Device"), new ReleaseDevice ()), -1); Module *conversionModule; ACE_NEW_RETURN (conversionModule, Module (ACE_TEXT ("Encode Message"), new EncodeMessage ()), -1); Module *saveMetaDataModule; ACE_NEW_RETURN (saveMetaDataModule, Module (ACE_TEXT ("Save Meta-Data"), new SaveMetaData ()), -1); Module *notificationModule; ACE_NEW_RETURN (notificationModule, Module (ACE_TEXT ("Notify Someone"), new NotifySomeone ()), -1); // Listing 1001 // Listing 12 code/ch18 if (this->push (notificationModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), ACE_TEXT ("notificationModule")), -1); if (this->push (saveMetaDataModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), ACE_TEXT ("saveMetaDataModule")), -1); if (this->push (conversionModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), ACE_TEXT ("conversionModule")), -1); if (this->push (releaseModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), ACE_TEXT ("releaseModule")), -1); if (this->push (recordModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), ACE_TEXT ("recordModule")), -1); if (this->push (playOGMModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), ACE_TEXT ("playOGMModule")), -1); if (this->push (getCallerIdModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), ACE_TEXT ("getCallerIdModule")), -1); if (this->push (answerIncomingCallModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n") ACE_TEXT ("answerIncomingCallModule")), -1); // Listing 12 return 0; } // Listing 13 code/ch18 int record (RecordingDevice *recorder) { ACE_Message_Block * mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(Message)), -1); Message *message = (Message *)mb->wr_ptr (); mb->wr_ptr (sizeof(Message)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("RecordingStream::record() - ") ACE_TEXT ("message->recorder(recorder)\n"))); message->recorder (recorder); int rval = this->put (mb); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("RecordingStream::record() - ") ACE_TEXT ("this->put() returns %d\n"), rval)); return rval; } // Listing 13 }; // Listing 1 code/ch18 int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { RecordingDevice *recorder = RecordingDeviceFactory::instantiate (argc, argv); // Listing 1 // Listing 2 code/ch18 RecordingStream *recording_stream; ACE_NEW_RETURN (recording_stream, RecordingStream, -1); if (recording_stream->open (0) < 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("RecordingStream->open()")), 0); // Listing 2 // Listing 3 code/ch18 for (;;) { ACE_DEBUG ((LM_INFO, ACE_TEXT ("Waiting for incoming message\n"))); RecordingDevice *activeRecorder = recorder->wait_for_activity (); ACE_DEBUG ((LM_INFO, ACE_TEXT ("Initiating recording process\n"))); recording_stream->record (activeRecorder); } // Listing 3 ACE_NOTREACHED (return 0;) }
[править] BasicTask.h
/* -*- C++ -*- */ // $Id: BasicTask.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef BASIC_TASK_H #define BASIC_TASK_H #include "ace/Task_T.h" #include "ace/ace_wchar.h" // Listing 100 code/ch18 class BasicTask : public ACE_Task<ACE_MT_SYNCH> { public: typedef ACE_Task<ACE_MT_SYNCH> inherited; BasicTask () : inherited() { } virtual int open (void * = 0) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("BasicTask::open() starting ") ACE_TEXT ("%d threads\n"), this->desired_threads ())); return this->activate (THR_NEW_LWP | THR_JOINABLE, this->desired_threads ()); } // Listing 100 // Listing 101 code/ch18 int put (ACE_Message_Block *message, ACE_Time_Value *timeout) { return this->putq (message, timeout); } // Listing 101 // Listing 1020 code/ch18 virtual int svc (void) { for (ACE_Message_Block *message = 0; ; ) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("BasicTask::svc() - ") ACE_TEXT ("waiting for work\n" ))); if (this->getq (message) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")), -1); // Listing 1020 // Listing 1021 code/ch18 if (message->msg_type () == ACE_Message_Block::MB_HANGUP) { if (this->putq (message) == -1) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Task::svc() putq"))); message->release (); } break; } // Listing 1021 // Listing 1022 code/ch18 Message *recordedMessage = (Message *)message->rd_ptr (); if (this->process (recordedMessage) == -1) { message->release (); ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("process")), -1); } // Listing 1022 // Listing 1023 code/ch18 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("BasicTask::svc() - ") ACE_TEXT ("Continue to next stage\n" ))); if (this->next_step (message) < 0) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("put_next failed"))); message->release (); break; } // Listing 1023 } return 0; } // Listing 103 code/ch18 virtual int close (u_long flags) { int rval = 0; if (flags == 1) { ACE_Message_Block *hangup = new ACE_Message_Block (); hangup->msg_type (ACE_Message_Block::MB_HANGUP); if (this->putq (hangup) == -1) { hangup->release (); ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Task::close() putq")), -1); } rval = this->wait (); } return rval; } // Listing 103 // Listing 105 code/ch18 protected: virtual int next_step (ACE_Message_Block *message_block) { return this->put_next (message_block); } // Listing 105 // Listing 104 code/ch18 virtual int process (Message *message) = 0; virtual int desired_threads (void) { return 1; } }; // Listing 104 #endif /* BASIC_TASK_H */
[править] Command.h
/* -*- C++ -*- */ // $Id: Command.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef COMMAND_H #define COMMAND_H #include "ace/SString.h" #include "ace/Message_Block.h" // Listing 01 code/ch18 class Command : public ACE_Data_Block { public: // Result Values enum { RESULT_PASS = 1, RESULT_SUCCESS = 0, RESULT_FAILURE = -1 }; // Commands enum { CMD_UNKNOWN = -1, CMD_ANSWER_CALL = 10, CMD_RETRIEVE_CALLER_ID, CMD_PLAY_MESSAGE, CMD_RECORD_MESSAGE } commands; int flags_; int command_; void *extra_data_; int numeric_result_; ACE_TString result_; }; // Listing 01 #endif /* COMMAND_H */
[править] CommandModule.cpp
// $Id: CommandModule.cpp 80826 2008-03-04 14:51:23Z wotte $ #include "CommandModule.h" // Listing 01 code/ch18 CommandModule::CommandModule (const ACE_TCHAR *module_name, CommandTask *writer, CommandTask *reader, ACE_SOCK_Stream *peer) : inherited(module_name, writer, reader, peer) { } // Listing 01 // Listing 02 code/ch18 ACE_SOCK_Stream &CommandModule::peer (void) { ACE_SOCK_Stream *peer = (ACE_SOCK_Stream *)this->arg (); return *peer; } // Listing 02
[править] CommandModule.h
/* -*- C++ -*- */ // $Id: CommandModule.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef COMMAND_MODULE_H #define COMMAND_MODULE_H #include "ace/Module.h" #include "ace/SOCK_Stream.h" #include "CommandTask.h" // Listing 01 code/ch18 class CommandModule : public ACE_Module<ACE_MT_SYNCH> { public: typedef ACE_Module<ACE_MT_SYNCH> inherited; typedef ACE_Task<ACE_MT_SYNCH> Task; CommandModule (const ACE_TCHAR *module_name, CommandTask *writer, CommandTask *reader, ACE_SOCK_Stream *peer); ACE_SOCK_Stream &peer (void); }; // Listing 01 #endif /* COMMAND_MODULE_H */
[править] CommandStream.cpp
// $Id: CommandStream.cpp 94310 2011-07-09 19:10:06Z schmidt $ #include "ace/Log_Msg.h" #include "ace/OS_Memory.h" #include "CommandStream.h" #include "Command.h" #include "CommandModule.h" #include "CommandTasks.h" // Gotcha: superclass' open() won't open head/tail modules // Gotcha!! Must open the stream before pushing modules! // Listing 01 code/ch18 int CommandStream::open (void *arg, ACE_Module<ACE_MT_SYNCH> *head, ACE_Module<ACE_MT_SYNCH> *tail) { ACE_TRACE ("CommandStream::open(peer)"); if (this->inherited::open (arg, head, tail) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Failed to open superclass")), -1); // Listing 01 // Listing 02 code/ch18 CommandModule *answerCallModule; ACE_NEW_RETURN (answerCallModule, AnswerCallModule (this->peer_), -1); CommandModule *retrieveCallerIdModule; ACE_NEW_RETURN (retrieveCallerIdModule, RetrieveCallerIdModule (this->peer_), -1); CommandModule *playMessageModule; ACE_NEW_RETURN (playMessageModule, PlayMessageModule (this->peer_), -1); CommandModule *recordMessageModule; ACE_NEW_RETURN (recordMessageModule, RecordMessageModule (this->peer_), -1); // Listing 02 // Listing 03 code/ch18 if (this->push (answerCallModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), answerCallModule->name()), -1); if (this->push (retrieveCallerIdModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), retrieveCallerIdModule->name()), -1); if (this->push (playMessageModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), playMessageModule->name()), -1); if (this->push (recordMessageModule) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Failed to push %p\n"), recordMessageModule->name()), -1); // Listing 03 return 0; } // Listing 04 code/ch18 Command *CommandStream::execute (Command *command) { ACE_Message_Block *mb = 0; ACE_NEW_RETURN (mb, ACE_Message_Block (command), 0); if (this->put (mb) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Fail on put command %d: %p\n"), command->command_, ACE_TEXT ("")), 0); this->get (mb); command = (Command *)mb->data_block (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Command (%d) returns (%d)\n"), command->command_, command->numeric_result_)); return command; } // Listing 04
[править] CommandStream.h
/* -*- C++ -*- */ // $Id: CommandStream.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef COMMAND_STREAM_H #define COMMAND_STREAM_H #include "ace/Module.h" #include "ace/Stream.h" #include "ace/SOCK_Stream.h" #include "ace/Synch_Traits.h" #include "Command.h" // A CommandStream is a bidirectional ACE_Stream implementing a chain // of commands. A message will move down the stream until a // CommandModule is capable of processing it. After processing, it // will move on down the stream to the end. Data received from the // tail will likewise move up the stream until the downstream's // partner module is encoutered. The retrieved data will be processed // and continue on up the stream. // Listing 01 code/ch18 class CommandStream : public ACE_Stream<ACE_MT_SYNCH> { public: typedef ACE_Stream<ACE_MT_SYNCH> inherited; CommandStream (ACE_SOCK_Stream *peer) : inherited (), peer_(peer) { } virtual int open (void *arg, ACE_Module<ACE_MT_SYNCH> *head = 0, ACE_Module<ACE_MT_SYNCH> *tail = 0); Command *execute (Command *command); private: CommandStream () { } ACE_SOCK_Stream *peer_; }; // Listing 01 #endif /* COMMAND_STREAM_H */
[править] CommandTask.cpp
// $Id: CommandTask.cpp 94310 2011-07-09 19:10:06Z schmidt $ #include "CommandTask.h" // Listing 01 code/ch18 CommandTask::CommandTask (int command) : inherited (), command_(command) { } // Listing 01 // Listing 02 code/ch18 int CommandTask::open (void *) { return this->activate (); } // Listing 02 // Listing 03 code/ch18 int CommandTask::put (ACE_Message_Block *message, ACE_Time_Value *timeout) { return this->putq (message, timeout); } // Listing 03 // Listing 04 code/ch18 int CommandTask::process (Command *) { ACE_TRACE ("CommandTask::process()"); return Command::RESULT_FAILURE; } // Listing 04 // Listing 05 code/ch18 int CommandTask::close (u_long flags) { int rval = 0; if (flags == 1) { ACE_Message_Block *hangup = new ACE_Message_Block; hangup->msg_type (ACE_Message_Block::MB_HANGUP); if (this->putq (hangup->duplicate ()) == -1) { ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Task::close() putq")), -1); } hangup->release (); rval = this->wait (); } return rval; } // Listing 05 // Listing 06 code/ch18 // Listing 061 code/ch18 int CommandTask::svc (void) { ACE_Message_Block *message; for (;;) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("CommandTask::svc() - ") ACE_TEXT ("%s waiting for work\n"), this->module ()->name ())); if (this->getq (message) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("getq")), -1); if (message->msg_type () == ACE_Message_Block::MB_HANGUP) { if (this->putq (message->duplicate ()) == -1) { ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("Task::svc() putq")), -1); } message->release (); break; } // Listing 061 // Listing 062 code/ch18 Command *command = (Command *)message->data_block (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("CommandTask::svc() - ") ACE_TEXT ("%s got work request %d\n"), this->module ()->name (), command->command_)); if (command->command_ != this->command_) { this->put_next (message->duplicate ()); } // Listing 062 // Listing 063 code/ch18 else { int result = this->process (command); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("CommandTask::svc() - ") ACE_TEXT ("%s work request %d result is %d\n"), this->module ()->name (), command->command_, result)); if (result == Command::RESULT_FAILURE) { command->numeric_result_ = -1; } // Listing 063 // Listing 064 code/ch18 else if (result == Command::RESULT_PASS) { this->put_next (message->duplicate ()); } // Listing 064 // Listing 065 code/ch18 else // result == Command::RESULT_SUCCESS { if (this->is_writer ()) { this->sibling ()->putq (message->duplicate ()); } // Listing 065 // Listing 066 code/ch18 else // this->is_reader () { this->put_next (message->duplicate ()); } // Listing 066 } // result == ... } // command->command_ ? = this->command_ // Listing 067 code/ch18 message->release (); } // for (;;) return 0; } // Listing 067 // Listing 06
[править] CommandTask.h
/* -*- C++ -*- */ // $Id: CommandTask.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef COMMAND_TASK_H #define COMMAND_TASK_H #include "ace/Task.h" #include "ace/Module.h" #include "Command.h" // Listing 01 code/ch18 class CommandTask : public ACE_Task<ACE_MT_SYNCH> { public: typedef ACE_Task<ACE_MT_SYNCH> inherited; virtual ~CommandTask () { } virtual int open (void * = 0 ); int put (ACE_Message_Block *message, ACE_Time_Value *timeout); virtual int svc (void); virtual int close (u_long flags); protected: CommandTask (int command); virtual int process (Command *message); int command_; }; // Listing 01 #endif /* COMMAND_TASK_H */
[править] CommandTasks.cpp
// $Id: CommandTasks.cpp 80826 2008-03-04 14:51:23Z wotte $ #include "ace/FILE_Addr.h" #include "ace/FILE_Connector.h" #include "ace/FILE_IO.h" #include "Command.h" #include "CommandTasks.h" #include "RecordingDevice_Text.h" // Listing 011 code/ch18 AnswerCallModule::AnswerCallModule (ACE_SOCK_Stream *peer) : CommandModule (ACE_TEXT ("AnswerCall Module"), new AnswerCallDownstreamTask (), new AnswerCallUpstreamTask (), peer) { } // Listing 011 // Listing 012 code/ch18 AnswerCallDownstreamTask::AnswerCallDownstreamTask (void) : CommandTask(Command::CMD_ANSWER_CALL) { } // Listing 012 // Listing 013 code/ch18 int AnswerCallDownstreamTask::process (Command *command) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (downstream)\n"))); TextListenerAcceptor *acceptor = (TextListenerAcceptor *)command->extra_data_; CommandModule *module = (CommandModule*)this->module (); command->numeric_result_ = acceptor->accept (module->peer ()); acceptor->release (); return Command::RESULT_SUCCESS; } // Listing 013 // Listing 014 code/ch18 AnswerCallUpstreamTask::AnswerCallUpstreamTask (void) : CommandTask(Command::CMD_ANSWER_CALL) { } // Listing 014 // Listing 015 code/ch18 int AnswerCallUpstreamTask::process (Command *) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (upstream)\n"))); return Command::RESULT_SUCCESS; } // Listing 015 // Listing 021 code/ch18 RetrieveCallerIdModule::RetrieveCallerIdModule (ACE_SOCK_Stream *peer) : CommandModule (ACE_TEXT ("RetrieveCallerId Module"), new RetrieveCallerIdDownstreamTask (), new RetrieveCallerIdUpstreamTask (), peer) { } // Listing 021 // Listing 022 code/ch18 RetrieveCallerIdDownstreamTask::RetrieveCallerIdDownstreamTask (void) : CommandTask(Command::CMD_RETRIEVE_CALLER_ID) { } int RetrieveCallerIdDownstreamTask::process (Command *) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Retrieving Caller ID data\n"))); return Command::RESULT_SUCCESS; } // Listing 022 // Listing 023 code/ch18 RetrieveCallerIdUpstreamTask::RetrieveCallerIdUpstreamTask (void) : CommandTask(Command::CMD_RETRIEVE_CALLER_ID) { } int RetrieveCallerIdUpstreamTask::process (Command *command) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Returning Caller ID data\n"))); ACE_INET_Addr remote_addr; CommandModule *module = (CommandModule*)this->module (); module->peer ().get_remote_addr (remote_addr); ACE_TCHAR remote_addr_str[256]; remote_addr.addr_to_string (remote_addr_str, 256); command->result_ = ACE_TString (remote_addr_str); return Command::RESULT_SUCCESS; } // Listing 023 PlayMessageModule::PlayMessageModule (ACE_SOCK_Stream *peer) : CommandModule (ACE_TEXT ("PlayMessage Module"), new PlayMessageDownstreamTask (), new PlayMessageUpstreamTask (), peer) { } PlayMessageDownstreamTask::PlayMessageDownstreamTask (void) : CommandTask(Command::CMD_PLAY_MESSAGE) { } // Listing 032 code/ch18 int PlayMessageDownstreamTask::process (Command *command) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Play Outgoing Message\n"))); ACE_FILE_Connector connector; ACE_FILE_IO file; ACE_FILE_Addr *addr = (ACE_FILE_Addr *)command->extra_data_; if (connector.connect (file, *addr) == -1) { command->numeric_result_ = -1; } else { command->numeric_result_ = 0; CommandModule *module = (CommandModule*)this->module (); char rwbuf[512]; ssize_t rwbytes; while ((rwbytes = file.recv (rwbuf, 512)) > 0) { module->peer ().send_n (rwbuf, rwbytes); } } return Command::RESULT_SUCCESS; } // Listing 032 PlayMessageUpstreamTask::PlayMessageUpstreamTask (void) : CommandTask(Command::CMD_PLAY_MESSAGE) { } int PlayMessageUpstreamTask::process (Command *command) { ACE_FILE_Addr * addr = (ACE_FILE_Addr *)command->extra_data_; ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Outgoing message (%s) sent\n"), addr->get_path_name ())); return Command::RESULT_SUCCESS; } RecordMessageModule::RecordMessageModule (ACE_SOCK_Stream *peer) : CommandModule (ACE_TEXT ("RecordMessage Module"), new RecordMessageDownstreamTask (), new RecordMessageUpstreamTask (), peer) { } RecordMessageDownstreamTask::RecordMessageDownstreamTask (void) : CommandTask(Command::CMD_RECORD_MESSAGE) { } int RecordMessageDownstreamTask::process (Command *) { return Command::RESULT_SUCCESS; } RecordMessageUpstreamTask::RecordMessageUpstreamTask (void) : CommandTask(Command::CMD_RECORD_MESSAGE) { } // Listing 033 code/ch18 int RecordMessageUpstreamTask::process (Command *command) { // Collect whatever the peer sends and write into the // specified file. ACE_FILE_Connector connector; ACE_FILE_IO file; ACE_FILE_Addr *addr = (ACE_FILE_Addr *)command->extra_data_; if (connector.connect (file, *addr) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("create file")), Command::RESULT_FAILURE); file.truncate (0); CommandModule *module = (CommandModule*)this->module (); ssize_t total_bytes = 0; char rwbuf[512]; ssize_t rwbytes; while ((rwbytes = module->peer ().recv (rwbuf, 512)) > 0) { total_bytes += file.send_n (rwbuf, rwbytes); } file.close (); ACE_DEBUG ((LM_INFO, ACE_TEXT ("RecordMessageUpstreamTask ") ACE_TEXT ("- recorded %d byte message\n"), total_bytes)); return Command::RESULT_SUCCESS; } // Listing 033
[править] CommandTasks.h
/* -*- C++ -*- */ // $Id: CommandTasks.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef COMMAND_TASKS_H #define COMMAND_TASKS_H #include "ace/SOCK_Stream.h" #include "Command.h" #include "CommandTask.h" #include "CommandModule.h" // CommandModule and CommandTask objects that implement the command // stream functions. // Listing 011 code/ch18 class AnswerCallModule : public CommandModule { public: AnswerCallModule (ACE_SOCK_Stream * peer); }; // Listing 011 // Listing 012 code/ch18 class AnswerCallDownstreamTask : public CommandTask { public: AnswerCallDownstreamTask (); protected: virtual int process (Command *command); }; // Listing 012 // Listing 013 code/ch18 class AnswerCallUpstreamTask : public CommandTask { public: AnswerCallUpstreamTask (); protected: virtual int process (Command *command); }; // Listing 013 // Listing 02 code/ch18 class RetrieveCallerIdModule : public CommandModule { public: RetrieveCallerIdModule (ACE_SOCK_Stream *peer); }; class RetrieveCallerIdDownstreamTask : public CommandTask { public: RetrieveCallerIdDownstreamTask (); protected: virtual int process (Command *command); }; class RetrieveCallerIdUpstreamTask : public CommandTask { public: RetrieveCallerIdUpstreamTask (); protected: virtual int process (Command *command); }; // Listing 02 // Listing 03 code/ch18 class PlayMessageModule : public CommandModule { public: PlayMessageModule (ACE_SOCK_Stream *peer); }; class PlayMessageDownstreamTask : public CommandTask { public: PlayMessageDownstreamTask (); protected: virtual int process (Command *command); }; class PlayMessageUpstreamTask : public CommandTask { public: PlayMessageUpstreamTask (); protected: virtual int process (Command *command); }; // Listing 03 // Listing 04 code/ch18 class RecordMessageModule : public CommandModule { public: RecordMessageModule (ACE_SOCK_Stream *peer); }; class RecordMessageDownstreamTask : public CommandTask { public: RecordMessageDownstreamTask (); protected: virtual int process (Command *command); }; class RecordMessageUpstreamTask : public CommandTask { public: RecordMessageUpstreamTask (); protected: virtual int process (Command *command); }; // Listing 04 #endif /* COMMAND_TASKS_H */
[править] EndTask.h
/* -*- C++ -*- */ // $Id: EndTask.h 94310 2011-07-09 19:10:06Z schmidt $ #ifndef END_TASK_H #define END_TASK_H // Listing 1 code/ch18 class TheEndTask : public BasicTask { protected: virtual int process (Message *) { ACE_TRACE ("EndTask::process()"); return 0; } virtual int next_step (ACE_Message_Block *mb) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TheEndTask::next_step() - ") ACE_TEXT ("end of the line.\n"))); mb->release (); return 0; } }; // Listing 1 #endif /* END_TASK_H */
[править] Message.h
/* -*- C++ -*- */ // $Id: Message.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef MESSAGE_H #define MESSAGE_H class RecordingDevice; class Message { public: Message () : device_(0), type_(0), id_(0) { } ~Message () { } RecordingDevice *recorder (void) { return this->device_; } void recorder (RecordingDevice *device) { this->device_ = device; } void type (MessageType *type) { this->type_ = type; } MessageType *type (void) { return this->type_; } void caller_id (CallerId *id) { this->id_ = id; } CallerId *caller_id (void) { return this->id_; } void addr (ACE_FILE_Addr &addr) { this->addr_ = addr; } void incoming_message (ACE_FILE_Addr &addr, MessageType *type) { this->addr_ = addr; this->type_ = type; } ACE_FILE_Addr &addr (void) { return this->addr_; } int is_text (void) { return this->type_->is_text (); } int is_audio (void) { return this->type_->is_audio (); } int is_video (void) { return this->type_->is_video (); } private: RecordingDevice *device_; MessageType *type_; CallerId *id_; ACE_FILE_Addr addr_; }; class AudioMessage : public Message { }; class VideoMessage : public Message { }; #endif /* MESSAGE_H */
[править] MessageInfo.h
/* -*- C++ -*- */ // $Id: MessageInfo.h 94075 2011-05-23 06:57:42Z johnnyw $ #ifndef MESSAGE_INFO_H #define MESSAGE_INFO_H #include "ace/FILE_Addr.h" #include "ace/SString.h" /* Opaque class that represents a caller's ID */ class CallerId { public: CallerId () : id_ (ACE_TEXT ("UNKNOWN")) { } CallerId (ACE_TString id) : id_(id) { } const ACE_TCHAR * string(void) { return this->id_.c_str (); } private: ACE_TString id_; }; class MessageType { public: enum { // Known video codecs FIRST_VIDEO_CODEC = 1, DIVX, // ... LAST_VIDEO_CODEC, // Known audio codecs FIRST_AUDIO_CODEC, MP3, RAWPCM, // ... LAST_AUDIO_CODEC, // Known text codecs FIRST_TEXT_CODEC, RAWTEXT, XML, // ... LAST_TEXT_CODEC, LAST_CODEC }; MessageType (int codec, const ACE_FILE_Addr& addr) : codec_(codec), addr_(addr) { } int get_codec (void) { return this->codec_; } ACE_FILE_Addr &get_addr (void) { return this->addr_; } int is_video (void) { return this->get_codec () > FIRST_VIDEO_CODEC && this->get_codec () < LAST_VIDEO_CODEC; } int is_audio (void) { return this->get_codec () > FIRST_AUDIO_CODEC && this->get_codec () < LAST_AUDIO_CODEC ; } int is_text (void) { return this->get_codec () > FIRST_TEXT_CODEC && this->get_codec () < LAST_TEXT_CODEC ; } private: int codec_; ACE_FILE_Addr addr_; }; # endif /* MESSAGE_INFO_H */
[править] RecordingDevice.h
/* -*- C++ -*- */ // $Id: RecordingDevice.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef RECORDING_DEVICE_H #define RECORDING_DEVICE_H #include "ace/FILE_Addr.h" #include "ace/Event_Handler.h" #include "ace/Log_Msg.h" #include "ace/Reactor.h" #include "ace/Semaphore.h" class CallerId; class MessageType; class RecordingDevice { public: RecordingDevice () { // Initialize the semaphore so that we don't block on the // first call to wait_for_activity(). } virtual ~RecordingDevice () { } virtual const ACE_TCHAR *get_name (void) const { return ACE_TEXT ("UNKNOWN"); } virtual int init (int, ACE_TCHAR *[]) { return 0; } // Answer the incoming call virtual int answer_call (void) = 0; // Fetch some form of caller identification at the hardware level. virtual CallerId *retrieve_callerId (void) = 0; // Fetch the message at the location specified by 'addr' and play // it for the caller. virtual int play_message (ACE_FILE_Addr &addr) = 0; // Record data from our physical device into the file at the // specified address. Return the number of bytes recorded. virtual MessageType *record_message (ACE_FILE_Addr &addr) = 0; // Release the RecordingDevice to accept another incoming call virtual void release (void) { this->release_semaphore (); } // Get the handler of the device so that wait_for_activity() can // wait for data to arrive. virtual ACE_Event_Handler *get_handler (void) const { return 0; } virtual RecordingDevice *wait_for_activity (void) { // Block on a semaphore until it says we're ready to do // work. ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Waiting for semaphore\n"))); this->acquire_semaphore (); // Use the reactor to wait for activity on our handle ACE_Reactor reactor; ACE_Event_Handler *handler = this->get_handler (); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Handler is %@\n"), (void *)handler)); reactor.register_handler (this->get_handler (), ACE_Event_Handler::READ_MASK); reactor.handle_events (); // Error-check this... // Leave the semaphore locked so that we'll block until // recording_complete() is invoked. return this; } protected: void acquire_semaphore (void) { this->semaphore_.acquire (); } void release_semaphore (void) { // Reset the semaphore so that wait_for_activity() will // unblock. ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Releasing semaphore\n"))); this->semaphore_.release (); } private: ACE_Semaphore semaphore_; }; #include "RecordingDevice_Text.h" #include "RecordingDevice_USRVM.h" #include "RecordingDevice_QC.h" #include "RecordingDeviceFactory.h" #endif /* RECORDING_DEVICE_H */
[править] RecordingDeviceFactory.cpp
// $Id: RecordingDeviceFactory.cpp 91813 2010-09-17 07:52:52Z johnnyw $ #include "RecordingDevice.h" #include "RecordingDeviceFactory.h" #include "RecordingDevice_Text.h" RecordingDevice *RecordingDeviceFactory::instantiate (int argc, ACE_TCHAR *argv[]) { RecordingDevice * device = 0; // Determine the implementation based on the values of argv // Exclude 2 device = new TextListenerAcceptor (); // Exclude 2 // Initialize the device with the remaining parameters. if (device->init (argc, argv) < 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("RecordingDeviceFactory::instantiate() - ") ACE_TEXT ("%s->init(argc, argv)"), device->get_name()), 0); return device; }
[править] RecordingDeviceFactory.h
/* -*- C++ -*- */ // $Id: RecordingDeviceFactory.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef RECORDING_DEVICE_FACTORY_H #define RECORDING_DEVICE_FACTORY_H class RecordingDevice; /* * A factory class that creates an appropriate RecordingDevice * derivative based on command-line parameters. */ class RecordingDeviceFactory { public: // Instantiate the appropriate RecordingDevice implementation static RecordingDevice *instantiate (int argc, ACE_TCHAR *argv[]); }; #endif /* RECORDING_DEVICE_FACTORY_H */
[править] RecordingDevice_QC.h
// $Id: RecordingDevice_QC.h 80826 2008-03-04 14:51:23Z wotte $ class QuickCam : public RecordingDevice { };
[править] RecordingDevice_Text.cpp
/* * $Id: RecordingDevice_Text.cpp 80826 2008-03-04 14:51:23Z wotte $ * * A RecordingDevice that listens to a socket and collects text. */ #include "MessageInfo.h" #include "RecordingDevice.h" #include "RecordingDevice_Text.h" #include "Util.h" TextListenerAcceptor::TextListenerAcceptor (void) : ACE_Event_Handler(), RecordingDevice() { } // ACE_Event_Handler interface int TextListenerAcceptor::open (ACE_INET_Addr &addr) { if (this->acceptor_.open (addr, 1) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("acceptor open")), -1); return 0; } ACE_HANDLE TextListenerAcceptor::get_handle (void) const { return this->acceptor_.get_handle (); } int TextListenerAcceptor::handle_input (ACE_HANDLE) { ACE_DEBUG ((LM_INFO, ACE_TEXT ("TextListenerAcceptor - connection request\n" ))); return 0; } int TextListenerAcceptor::accept (ACE_SOCK_Stream &peer) { return this->acceptor_.accept (peer); } // RecordingDevice interface // Open a listening socket on the port specified by argv. int TextListenerAcceptor::init (int argc, ACE_TCHAR *argv[]) { ACE_UNUSED_ARG(argc); ACE_INET_Addr addr (argv[1]); if (this->open (addr) < 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("TextListener - open")), -1); return 0; } ACE_Event_Handler *TextListenerAcceptor::get_handler (void) const { return (ACE_Event_Handler *)this; } RecordingDevice *TextListenerAcceptor::wait_for_activity (void) { this->RecordingDevice::wait_for_activity (); return new TextListener (this); } int TextListenerAcceptor::answer_call (void) { return -1; } CallerId *TextListenerAcceptor::retrieve_callerId (void) { return 0; } int TextListenerAcceptor::play_message (ACE_FILE_Addr &addr) { ACE_UNUSED_ARG(addr); return 0; } MessageType *TextListenerAcceptor::record_message (ACE_FILE_Addr &addr) { ACE_UNUSED_ARG(addr); return 0; } // Listing 01 code/ch18 TextListener::TextListener (TextListenerAcceptor *acceptor) : acceptor_(acceptor) { ACE_TRACE ("TextListener ctor"); ACE_NEW (this->command_stream_, CommandStream (&(this->peer_))); this->command_stream_->open (0); } // Listing 01 const ACE_TCHAR *TextListener::get_name (void) const { return ACE_TEXT ("TextListener"); } // Listing 02 code/ch18 int TextListener::answer_call (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TextListener::answer_call()\n"))); Command *c = new Command (); c->command_ = Command::CMD_ANSWER_CALL; c->extra_data_ = this->acceptor_; c = this->command_stream_->execute (c); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TextListener::answer_call() ") ACE_TEXT ("result is %d\n"), c->numeric_result_)); return c->numeric_result_; } // Listing 02 // Listing 03 code/ch18 CallerId *TextListener::retrieve_callerId (void) { ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TextListener::retrieve_callerId()\n"))); Command *c = new Command (); c->command_ = Command::CMD_RETRIEVE_CALLER_ID; c = this->command_stream_->execute (c); CallerId *caller_id = new CallerId (c->result_); return caller_id; } // Listing 03 // Listing 04 code/ch18 int TextListener::play_message (ACE_FILE_Addr &addr) { MessageType *type = Util::identify_message (addr); if (type->is_text ()) { Command *c = new Command (); c->command_ = Command::CMD_PLAY_MESSAGE; c->extra_data_ = &addr; c = this->command_stream_->execute (c); return c->numeric_result_; } ACE_FILE_Addr temp (ACE_TEXT ("/tmp/outgoing_message.text")); ACE_FILE_IO *file; if (type->is_audio ()) file = Util::audio_to_text (addr, temp); else if (type->is_video ()) file = Util::video_to_text (addr, temp); else ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Invalid message type %d\n"), type->get_codec ()), -1); int rval = this->play_message (temp); file->remove (); return rval; } // Listing 04 // Listing 05 code/ch18 MessageType *TextListener::record_message (ACE_FILE_Addr &addr) { Command *c = new Command (); c->command_ = Command::CMD_RECORD_MESSAGE; c->extra_data_ = &addr; c = this->command_stream_->execute (c); if (c->numeric_result_ == -1) return 0; return new MessageType (MessageType::RAWTEXT, addr); } // Listing 05 // Listing 06 code/ch18 void TextListener::release (void) { delete this; } // Listing 06
[править] RecordingDevice_Text.h
/* -*- C++ -*- */ /* * $Id: RecordingDevice_Text.h 80826 2008-03-04 14:51:23Z wotte $ * * A RecordingDevice that listens to a socket and collects text. */ #ifndef RECORDING_DEVICE_TEXT_H #define RECORDING_DEVICE_TEXT_H #include "ace/FILE_IO.h" #include "ace/FILE_Connector.h" #include "ace/SOCK_Stream.h" #include "ace/SOCK_Acceptor.h" #include "CommandStream.h" #include "MessageInfo.h" #include "RecordingDevice.h" class TextListenerAcceptor : public ACE_Event_Handler, public RecordingDevice { public: TextListenerAcceptor (); // ACE_Event_Handler interface int open (ACE_INET_Addr &addr); ACE_HANDLE get_handle (void) const; int handle_input (ACE_HANDLE); int accept (ACE_SOCK_Stream &peer); // RecordingDevice interface // Open a listening socket on the port specified by argv. int init (int argc, ACE_TCHAR *argv[]); ACE_Event_Handler *get_handler (void) const; virtual RecordingDevice *wait_for_activity (void); virtual int answer_call (void); virtual CallerId *retrieve_callerId (void); virtual int play_message (ACE_FILE_Addr &addr); virtual MessageType *record_message (ACE_FILE_Addr &addr); private: ACE_SOCK_Acceptor acceptor_; }; // Listing 01 code/ch18 class TextListener : public RecordingDevice { public: TextListener (TextListenerAcceptor *acceptor); virtual const ACE_TCHAR *get_name (void) const; int answer_call (void); CallerId *retrieve_callerId (void); int play_message (ACE_FILE_Addr &addr); MessageType *record_message (ACE_FILE_Addr &addr); virtual void release (void); // Listing 01 // Listing 02 code/ch18 private: CommandStream *command_stream_; TextListenerAcceptor *acceptor_; ACE_SOCK_Stream peer_; }; // Listing 02 #endif /* RECORDING_DEVICE_TEXT_H */
[править] RecordingDevice_USRVM.h
// $Id: RecordingDevice_USRVM.h 80826 2008-03-04 14:51:23Z wotte $ class USRoboticsVoiceModem : public RecordingDevice { };
[править] Util.h
/* -*- C++ -*- */ // $Id: Util.h 80826 2008-03-04 14:51:23Z wotte $ #ifndef UTIL_H #define UTIL_H #include "ace/FILE_Addr.h" #include "ace/FILE_Connector.h" #include "ace/FILE_IO.h" class Util { public: static MessageType *identify_message (ACE_FILE_Addr &src) { // Determine the contents of the specified file return new MessageType (MessageType::RAWTEXT, src); } static ACE_FILE_IO *audio_to_text (ACE_FILE_Addr &, ACE_FILE_Addr &dest) { ACE_FILE_Connector connector; ACE_FILE_IO *file = 0; if (connector.connect (*file, dest) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("create file")), 0); // Convert audio data to printable text return file; } static ACE_FILE_IO *video_to_text (ACE_FILE_Addr &, ACE_FILE_Addr &dest) { ACE_FILE_Connector connector; ACE_FILE_IO *file = 0; if (connector.connect (*file, dest) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("create file")), 0); // Extract audio data from video file and convert to printable text return file; } static int convert_to_unicode (ACE_FILE_Addr &src, ACE_FILE_Addr &dest) { ACE_FILE_Connector connector; ACE_FILE_IO input; if (connector.connect (input, src) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("read file")), 0); ACE_FILE_IO output; if (connector.connect (output, dest) == -1) ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("create file")), 0); char rwbuf[512]; ssize_t rwbytes; while ((rwbytes = input.recv (rwbuf, 512)) > 0) { output.send_n (rwbuf, rwbytes); } input.close (); output.close (); // Convert arbirary text to unicode return 0; } static int convert_to_mp3 (ACE_FILE_Addr &, ACE_FILE_Addr &) { // Convert arbitrary audio data to some standard mp3 format return 0; } static int convert_to_mpeg (ACE_FILE_Addr &, ACE_FILE_Addr &) { // Convert arbitrary vidio data to some standard mpeg format return 0; } }; #endif /* UTIL_H */