概述
ACE_Task是ACE中的任務或主動對象“處理結構”的基類。ACE使用此類來實現主動對象模式。所有希望成為“主動對象”的對象都必須由此類派生。同時可將它看作是更高級的、更為面向對象的執行緒。
ACE_Task處理的是對象,因此更有利於構造OO程式,產生更好的OO軟體,而且,它還包括了一種用於
與其他任務通信的易於使用的機制。
ACE_Task可用作:
<1>更高級的執行緒(常稱其為任務)
<2>主動對象模式中的主動對象
PS.ACE任務:
每個任務都含有一或多個執行緒,以及一個底層訊息佇列。各個任務通過訊息佇列進行通信。至於訊息佇列實現的內在細節程式設計師不必關注。傳送任務用putq() 將訊息插入到另一任務的訊息佇列中,接收任務通過使用getq()將訊息提取出來。這樣的體系結構大大簡化了多執行緒程式的編程模型。
步驟
從ACE_Task類派生的子類應實現以下業務邏輯:
<1>實現服務初始化和終止方法。
open()方法應該包含所有專屬於任務的初始化代碼。其中可能包括諸如連線控制塊、鎖和記憶體這樣的資源。close()方法用於終止。
<2>調用啟用(Activation)方法。
在主動對象實例化後,必須通過調用activate()啟用它。要在主動對象中創建的執行緒數目及其它參數,被傳遞給activate()方法。它將使svc()方法成為所有它生成的執行緒的啟動點。
<3>實現服務專有的處理方法。
在主動對象被啟用後,各個新執行緒在svc() 方法中啟動。程式設計師必須在子類中定義此方法。
實例
//消費者類定義
#i nclude "ace/OS.h"
#i nclude "ace/Task.h"
#i nclude "ace/Message_Block.h"
//The Consumer Task.
class Consumer :
public ACE_Task<ACE_MT_SYNCH>
{
public:
int open(void*)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));
//Activate the Task
activate(THR_NEW_LWP,1);
return 0;
}
//The Service Processing routine
int svc(void)
{
//Get ready to receive message from Producer
ACE_Message_Block * mb = 0;
do
{
mb = 0;
//Get message of underlying queue
getq(mb);
ACE_DEBUG((LM_DEBUG,
"(%t) Got message: %d from remote task\n", *mb->rd_ptr()));
}while(*mb->rd_ptr()<10);
return 0;
}
int close(u_long)
{
ACE_DEBUG((LM_DEBUG, "Consumer closes down \n"));
return 0;
}
};
//生產者類定義
class Producer :
public ACE_Task<ACE_MT_SYNCH>
{
public:
Producer(Consumer * consumer) :
consumer_(consumer), data_(0)
{
mb = new ACE_Message_Block((char *)&data_, sizeof(data_));
}
int open(void *)
{
ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));
//Activate the Task
activate(THR_NEW_LWP, 1);
return 0;
}
//The Service Processing routine
int svc(void)
{
while(data_ < 11)
{
//Send message to consumer
ACE_DEBUG((LM_DEBUG,
"(%t) Sending message: %d to remote task\n", data_));
consumer_->putq(mb_);
//Go to sleep for a sec.
ACE_OS::sleep(1);
data_++;
}
return 0;
}
int close(void)
{
ACE_DEBUG((LM_DEBUG, "Producer closes down \n"));
return 0;
}
private:
char data_;
Consumer * consumer_;
ACE_Message_Block * mb_;
};
//main()函式
int main(int argc, char *argv[])
{
Consumer * consumer = new Consumer;
Producer * producer = new Producer(consumer);
producer->open(0);
consumer->open(0);
//Wait for all the tasks to exit. ACE_Thread_Manager::instance()->wait();
ACE_OS::exit(0);
}
分析:
以上為經典的生產者-消費者例子,演示了兩個任務如何使用底層的訊息佇列進行通信。我們可以將生產者和消費者看作是不同的ACE_Task類型的對象。方案十分簡單,但卻是面向對象的,在編寫面向對象的多執行緒程式或主動對象的實例時,我們可採用此方案,它提供了比低級執行緒API更好的方法。
框架
ACE的ACE_Task框架提供了一種基於訊息的編程模式,可以Windows編程的訊息循環進行類比。
ACE_Task | Windows 訊息循環 | 說明 | |
訊息類型 | ACE_Message_Block* | MSG | Windows 訊息用MSG結構表示,ACE_Task中因為不能預計各種套用中訊息的類型,所以ACE_Message_Block基本上可以理解為是對一個指針的封裝,這個指針指向實際的一塊記憶體或是一個對象等等。在創建ACE_Message_Block時,可以指定是由ACE_Message_Block來管理記憶體(構造函式中指定一個size_t類型的大小),還是由我們自己管理記憶體(構造函式中指定一個指針)。而一個ACE_Message_Block類型的指針,就是一個訊息,我們通過傳遞它來進行邏輯的業務處理。 |
傳送訊息 | ACE_Task::putq | SendMessage | 事實上,到底用SendMessage還是PostMessage與ACE_Task::putq來進行類比,我很為難,PostMessage傳送一個訊息後立刻返回,這與通常的ACE_Task::putq行為非常類似,因為ACE_Task是運行在另外一個執行緒上,ACE_Task::putq只是完成將訊息插入到訊息佇列的工作,理論上它應該立刻返回,但實際上,ACE_Task的訊息佇列有容量大小限制,這個限制由我們自己限定,噹噹前訊息佇列滿時,ACE_Task::putq將阻塞一直到可以插入,這時候就比較類似與SendMessage, |
取出訊息 | ACE_Task::getq | GetMessage | GetMessage和ACE_Task::getq在當前訊息佇列沒有訊息時都會阻塞到有訊息可用為止,所以對於它們的使用比較類似,通常會寫一個訊息循環函式 BOOL bRet; MSG msg; while( (bRet = GetMessage( &msg, NULL, 0, 0 )) != 0) { if (bRet == -1) { // handle the error and possibly exit } else { TranslateMessage(&msg); DispatchMessage(&msg); } } ACE_Message_Block * msg; while(getq(msg) != -1) // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0); { // process msg here } |
訊息處理函式 | 默認沒有提供 | WNDPROC | 通過TranslateMessage和DispatchMessage,Windows將訊息投遞到相映視窗的WNDPROC上,ACE_Task沒有提供類似與WNDPROC的回調函式,如果願意,我們可以在ACE_Task上寫出類似的結構,而通常,我們直接在訊息循環中編寫處理訊息的代碼 |
儘管看起來ACE_Task提供的訊息系統與WIndows的訊息系統很象,但實際上,它們還是有比較大的區別,要搭架一個基於ACE_Task的訊息系統,通常要做如下的步驟:
編寫一個派生自ACE_Task的類,指定它的同步模式
ACE_Task的訊息佇列可以由多個處理執行緒共享使用,所以需要提供同步模式,例如ACE_MT_SYNCH和ACE_NULL_SYNCH分別表示基於多執行緒的同步和不使用同步,這個參數是ACE_Task的一個模板參數。 class My_Task : public ACE_Task<ACE_MT_SYNCH>
{
public:
virtual int svc();
}
重載ACE_Task的svc 方法,編寫訊息循環相關的代碼 int My_Task::svc()
{
ACE_Message_Block * msg;
while(getq(msg) != -1) // int putq (ACE_Message_Block *, ACE_Time_Value *timeout = 0);
{
// process msg here
}
}
svc 方法相當與處理執行緒的入口方法。
假設My_Task是一個基於ACE_Task的類,創建一個唯一的My_Task實例,這個可以通過
typedef ACE_Singleton<MyTask, SYNCH_METHOD> MYTASK;
然後總是使用MYTASK::instance方法來獲取一個My_Task的指針來完成。 在適當位置(一般是程式開始的時候),讓My_Task開始工作
MYTASK::intance()->activate(
THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , // 執行緒創建的屬性
n_threads = 1, // 執行緒的數目,即有多少處理執行緒
...)在有訊息發生的時候傳送訊息 ACE_Message_Block * msg;
// fill the msg
...
MYTASK::intance()->putq(msg);
最後考慮一個使用ACE_Task的實例,在一個編寫WEB伺服器的項目中,類Request_Handler負責處理HTTP請求,Request_Hanlder派生自ACE_Task,當有請求時,其他的代碼將Http請求構造成一個ACE_Message_Block,並調用Request_Handler的putq方法將請求插入訊息佇列,Request_Handler 配置為根據CPU的數目創建處理執行緒,Request_Handler的svc方法從佇列中獲取請求進行處理,然後將處理的結果構造成為一個ACE_Message_Block,插入到Response_Handler的訊息佇列,Response_Handler也派生自ACE_Task,但它只有一個處理執行緒,它僅僅將相應的數據寫回給客戶端。