概況
CSocket是MFC在CAsyncSocket基礎上派生的一個同步阻塞Socket的封裝類,它的定義包含在<afxsock.h>中。它是如何又把CAsyncSocket變成同步的,而且還能回響同樣的Socket事件呢?
其實很簡單,CSocket在Connect()返回WSAEWOULDBLOCK錯誤時,不是在OnConnect(),OnReceive()這些事件終端函數裡去等待。你先必須明白Socket事件是如何到達這些事件函數裡的。這些事件處理函式是CSocketWnd視窗對象回調的,而視窗對象收到來自Socket的事件,又是靠執行緒訊息佇列分發過來的。總之,Socket事件首先是作為一個訊息發給CSocketWnd視窗對象,這個訊息肯定需要經過執行緒訊息佇列的分發,最終CSocketWnd視窗對象收到這些訊息就調用相應的回調函式(OnConnect()等)。
所以,CSocket在調用Connect()之後,如果返回一個WSAEWOULDBLOCK錯誤時,它馬上調用一個用於提取訊息的函式PumpMessage(...),就是從當前執行緒的訊息佇列里取關心的訊息.
情況
PumpMessage會遇到下面幾種情況:
1 提取出了(從訊息佇列中移出來Remove),用戶正在使用的一個Socket傳送的WM_SOCKET_NOTIFY訊息和對應的 FD_XXX事件,返回True.
2 提取出了(從訊息佇列中移出來Remove),用戶正在使用的一個Socket傳送的WM_SOCKET_NOTIFY訊息和對應的 FD_Close事件,返回True.
3 提取出了(從訊息佇列中移出來Remove),PumpMessage(..)設定的定時器的WM_TIMER訊息,TimeOut事件為 CSocket的一個成員變數,m_nTimeOut=2000ms,返回True
4 用戶調用了CancelBlockingCall() 設定錯誤代碼為WSAEINTR(被中斷了),返回False
5 用戶一直沒有取到用戶正在使用的一個Socket傳送的WM_SOCKET_NOTIFY訊息和對應的FD_XXX事件,但是取到了同一個執行緒中的其他Socket的WM_SOCKET_NOTIFY訊息及其對應的訊息,則將這些訊息,加入到一個輔助性的佇列中去,以後處理.
6 沒有取到任何WM_SOCKET_NOTIFY訊息,則開始查看(不是取出來,而是查看)本執行緒的訊息佇列中是否有其它訊息,如果有的話,調用虛函式OnMessagePending(),來處理這些訊息(OnMessagePending()用戶可以自定義。在阻塞時,用戶想要處理的訊息),如果沒有,則調用WaitMessage()開始等待訊息的到來.
說明
代碼說明如下:
A 先看Connect,因為Connect的阻塞的實現和Accept,Receive,ReceiveFrom,Send,SendTo都有點不同.
也許你們會奇怪為何是ConnectHelper(...),而不是Connect(...).其實ConnectHelper(...)才是Connect(..)
調用
真正調用的東西,如下:
BOOL CAsyncSocket::Connect(const SOCKADDR* lpSockAddr,int nSockAddrLen)
{
return ConnectHelper(lpSockAddr,nSockAddrLen);
}
//ConnectHelper(...)為一虛函式
//繼承自CAsyncSocket,Csocket有重新定義過.
//這也是為什麼CSocket是Public繼承的原因
BOOL CSocket::ConnectHelper(...)
{
//一旦調用 就先檢查當前是否有一個阻塞操作正在進行
//如果是,立馬返回,並設定錯誤代碼.
......
......
m_nConnectError = -1;
//注意它只調用了一次CAsyncSocket::ConnectHelper(...)
if(!CAsyncSocket::ConnectHelper(...))
{
//由於Connect(...)要求自己和Server進行三步握手
//即需要傳送一個Packet,而且等待回復(這個也就是
//涉及連線和傳送數據操作的Socket API會阻塞的原因)
//所以CAsyncSocket::ConnectHelper(...)會立即返回,
//並且設定錯誤為WSAEWOULDBLOCK(調用該函式會導致阻塞)
if(WSAGetLastError() == WSAEWOULDBLOCK)
{
//進入訊息循環,以從執行緒訊息佇列里查看FD_CONNECT訊息,
//收到FD_CONNECT訊息(在PumpMessage中會修改m_nConnectError),返回
//或者WM_TIMER/FD_CLOSE(return true,但不會修改m_nConnectError),
//繼續調用PumpMessage來取訊息
//或者錯誤,那么就返回socket_error
while(PumpMessages(FD_CONNECT))
{
if (m_nConnectError != -1)
{
WSASetLastError(m_nConnectError);
return (m_nConnectError == 0);
}
} //end while
}
return false;
}
return true;
}
//在PumpMessages中會設定一個定時器,時間為m_nTimeOut=2000ms
//當在這個時間之內,依然沒有得到訊息的話,就返回
BOOL CSocket::PumpMessages(UINT uStopFlag)
{
//一旦進入這個函式,就設定Socket當前狀態為阻塞
BOOL bBlocking = TRUE;
m_pbBlocking = &bBlocking;
....................
.....................
....................
CWinThread* pThread = AfxGetThread();
//bBlocking是一個標誌,
// 用來判斷用戶是否取消對Connect()的調用
//即是否調用CancelBlockingCall()
while(bBlocking)
{
//#define WM_SOCKET_NOTIFY 0x0373
//#define WM_SOCKET_DEAD 0x0374
MSG msg;
//在此處只是取WM_SOCKET_NOTIFY 和WM_SOCKET_DEAD訊息
if (::PeekMessage(&msg,pState->m_hSocketWindow,WM_SOCKET_NOTIFY,WM_SOCKET_DEAD,
PM_REMOVE))
{
if (msg.message == WM_SOCKET_NOTIFY && (SOCKET)msg.wParam == m_hSocket)
{
//這個是PumpMessage的第2種情況
if (WSAGETSELECTEVENT(msg.lParam) == FD_CLOSE)
{ break;}
//這個是PumpMessage的第1種情況
if(WSAGETSELECTEVENT(msg.lParam) == uStopFlag)
{ ......; break;}
}
//這個是PumpMessage的第5種情況
if (msg.wParam != 0 || msg.lParam != 0)
CSocket::AuxQueueAdd(msg.message,msg.wParam,msg.lParam);
}
//這個是PumpMessage的第3種情況
else if (::PeekMessage(&msg,pState->m_hSocketWindow,WM_TIMER,WM_TIMER,PM_REMOVE))
{ break;}
//這個是PumpMessage的第6種情況
if (bPeek && ::PeekMessage(&msg,NULL,0,0,PM_NOREMOVE))
{
if (OnMessagePending())
{
}
else
{
// 等待訊息的到來
WaitMessage();
.....
}
}
}//end while
////這個是PumpMessage的第4種情況
if (!bBlocking)
{
WSASetLastError(WSAEINTR);
return FALSE;
}
m_pbBlocking = NULL;
//將WM_SOCKET_NOTIFY訊息傳送到Creat CSocketWnd執行緒的訊息佇列中
//以便處理其他的Socket訊息
::PostMessage(pState->m_hSocketWindow,WM_SOCKET_NOTIFY,0,0);
return TRUE;
}
B 再看Receive(..)
//其實CSocket的這種實現方式決定了,應用程式不能夠在一個執行緒中Create一個socket,
//然後創建一個新的執行緒來專門Receive,因為這個新的執行緒將永遠不能取到FD_Read的事件,
//因為並不是在這個執行緒中創建的CSocketWnd對象,它無法接受到傳送到CSocketWnd的訊息
//(在Windows中接受訊息的主體是視窗)
//Receive和Connect的實現方式的最大區別為
//Connect 是不斷的調用PumpMessage(..)
//而Receive則不斷的調用自身
int CSocket::Receive(void* lpBuf,int nBufLen,int nFlags)
{
if (m_pbBlocking != NULL)
{
WSASetLastError(WSAEINPROGRESS);
returnFALSE;
}
int nResult;
while ((nResult = CAsyncSocket::Receive(lpBuf,nBufLen,nFlags)) == SOCKET_ERROR)
{
if (GetLastError() == WSAEWOULDBLOCK)
{
//一旦提取到FD_READ///FD_CLOSE///WM_TIMER時
// 就再次調用CAsyncSocket::Receive(...)
if (!PumpMessages(FD_READ))
return SOCKET_ERROR;
}
else
return SOCKET_ERROR;
}
return nResult;
}
總結
CSocket模式與socket API模式的最大區別在於它實現了:
1、將socket事件訊息化
2、用訊息阻塞方式實現同步
3、用序列化方式讀寫數據以防止死鎖,簡化讀取數據模型(流數據)。
多執行緒編程時,由於CSocket不能跨執行緒使用,所以,新建工作執行緒中會有一個CSocket對象,而該對象會重新與已知SOCKET句柄重新Attach,實現在新的執行緒中以CSocket對象的方式編程。
然後使用訊息通信的方式,實現在新建執行緒中讀寫數據。
通過訊息、執行緒事件等同步機制 實現UI執行緒與CSocket對象所在工作執行緒的通信,並不會影響UI的回響。
在BOOL CSocket::PumpMessages(UINT uStopFlag)中
看OnMessagePending:
但上面針對的都是當前執行緒的WM_PAIT訊息,而不是跨執行緒的,而多執行緒模式下的SOCKET都是放在工作執行緒中執行的!