00001 //------------------------------------------------------------------------------ 00002 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN) 00003 // Author: Lukasz Janyst <ljanyst@cern.ch> 00004 //------------------------------------------------------------------------------ 00005 // XRootD is free software: you can redistribute it and/or modify 00006 // it under the terms of the GNU Lesser General Public License as published by 00007 // the Free Software Foundation, either version 3 of the License, or 00008 // (at your option) any later version. 00009 // 00010 // XRootD is distributed in the hope that it will be useful, 00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 // GNU General Public License for more details. 00014 // 00015 // You should have received a copy of the GNU Lesser General Public License 00016 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 00017 //------------------------------------------------------------------------------ 00018 00019 #ifndef __XRD_CL_ASYNC_SOCKET_HANDLER_HH__ 00020 #define __XRD_CL_ASYNC_SOCKET_HANDLER_HH__ 00021 00022 #include "XrdCl/XrdClSocket.hh" 00023 #include "XrdCl/XrdClConstants.hh" 00024 #include "XrdCl/XrdClDefaultEnv.hh" 00025 #include "XrdCl/XrdClPoller.hh" 00026 #include "XrdCl/XrdClPostMasterInterfaces.hh" 00027 #include "XrdCl/XrdClTaskManager.hh" 00028 00029 #include <sys/types.h> 00030 #include <sys/socket.h> 00031 00032 namespace XrdCl 00033 { 00034 class Stream; 00035 00036 //---------------------------------------------------------------------------- 00039 //---------------------------------------------------------------------------- 00040 class AsyncSocketHandler: public SocketHandler 00041 { 00042 //------------------------------------------------------------------------ 00043 // We need an extra task for rescheduling of HS request that received 00044 // a wait response. 00045 //------------------------------------------------------------------------ 00046 class WaitTask: public XrdCl::Task 00047 { 00048 public: 00049 WaitTask( XrdCl::AsyncSocketHandler *handler, XrdCl::Message *msg ): 00050 pHandler( handler ), pMsg( msg ) 00051 { 00052 std::ostringstream o; 00053 o << "WaitTask for: 0x" << msg; 00054 SetName( o.str() ); 00055 } 00056 00057 virtual time_t Run( time_t now ) 00058 { 00059 pHandler->RetryHSMsg( pMsg ); 00060 return 0; 00061 } 00062 00063 private: 00064 XrdCl::AsyncSocketHandler *pHandler; 00065 XrdCl::Message *pMsg; 00066 }; 00067 00068 public: 00069 //------------------------------------------------------------------------ 00071 //------------------------------------------------------------------------ 00072 AsyncSocketHandler( Poller *poller, 00073 TransportHandler *transport, 00074 AnyObject *channelData, 00075 uint16_t subStreamNum ); 00076 00077 //------------------------------------------------------------------------ 00079 //------------------------------------------------------------------------ 00080 ~AsyncSocketHandler(); 00081 00082 //------------------------------------------------------------------------ 00084 //------------------------------------------------------------------------ 00085 void SetAddress( const XrdNetAddr &address ) 00086 { 00087 pSockAddr = address; 00088 } 00089 00090 //------------------------------------------------------------------------ 00092 //------------------------------------------------------------------------ 00093 const XrdNetAddr &GetAddress() const 00094 { 00095 return pSockAddr; 00096 } 00097 00098 //------------------------------------------------------------------------ 00100 //------------------------------------------------------------------------ 00101 Status Connect( time_t timeout ); 00102 00103 //------------------------------------------------------------------------ 00105 //------------------------------------------------------------------------ 00106 Status Close(); 00107 00108 //------------------------------------------------------------------------ 00110 //------------------------------------------------------------------------ 00111 void SetStream( Stream *stream ); 00112 00113 //------------------------------------------------------------------------ 00115 //------------------------------------------------------------------------ 00116 virtual void Event( uint8_t type, XrdCl::Socket */*socket*/ ); 00117 00118 //------------------------------------------------------------------------ 00120 //------------------------------------------------------------------------ 00121 Status EnableUplink() 00122 { 00123 if( !pPoller->EnableWriteNotification( pSocket, true, pTimeoutResolution ) ) 00124 return Status( stFatal, errPollerError ); 00125 return Status(); 00126 } 00127 00128 //------------------------------------------------------------------------ 00130 //------------------------------------------------------------------------ 00131 Status DisableUplink() 00132 { 00133 if( !pPoller->EnableWriteNotification( pSocket, false ) ) 00134 return Status( stFatal, errPollerError ); 00135 return Status(); 00136 } 00137 00138 //------------------------------------------------------------------------ 00140 //------------------------------------------------------------------------ 00141 const std::string &GetStreamName() 00142 { 00143 return pStreamName; 00144 } 00145 00146 //------------------------------------------------------------------------ 00148 //------------------------------------------------------------------------ 00149 time_t GetLastActivity() 00150 { 00151 return pLastActivity; 00152 } 00153 00154 private: 00155 00156 //------------------------------------------------------------------------ 00157 // Connect returned 00158 //------------------------------------------------------------------------ 00159 void OnConnectionReturn(); 00160 00161 //------------------------------------------------------------------------ 00162 // Got a write readiness event 00163 //------------------------------------------------------------------------ 00164 void OnWrite(); 00165 00166 //------------------------------------------------------------------------ 00167 // Got a write readiness event while handshaking 00168 //------------------------------------------------------------------------ 00169 void OnWriteWhileHandshaking(); 00170 00171 00172 Status WriteMessageAndRaw( Message *toWrite, Message *&sign ); 00173 00174 Status WriteSeparately( Message *toWrite, Message *&sign ); 00175 00176 //------------------------------------------------------------------------ 00177 // Write the current message 00178 //------------------------------------------------------------------------ 00179 Status WriteCurrentMessage( Message *toWrite ); 00180 00181 //------------------------------------------------------------------------ 00182 // Write the message, its signature and its body 00183 //------------------------------------------------------------------------ 00184 Status WriteVMessage( Message *toWrite, 00185 Message *&sign, 00186 ChunkList *chunks, 00187 uint32_t *asyncOffset ); 00188 00189 //------------------------------------------------------------------------ 00190 // Got a read readiness event 00191 //------------------------------------------------------------------------ 00192 void OnRead(); 00193 00194 //------------------------------------------------------------------------ 00195 // Got a read readiness event while handshaking 00196 //------------------------------------------------------------------------ 00197 void OnReadWhileHandshaking(); 00198 00199 //------------------------------------------------------------------------ 00200 // Read a message 00201 //------------------------------------------------------------------------ 00202 Status ReadMessage( Message *&toRead ); 00203 00204 //------------------------------------------------------------------------ 00205 // Handle fault 00206 //------------------------------------------------------------------------ 00207 void OnFault( Status st ); 00208 00209 //------------------------------------------------------------------------ 00210 // Handle fault while handshaking 00211 //------------------------------------------------------------------------ 00212 void OnFaultWhileHandshaking( Status st ); 00213 00214 //------------------------------------------------------------------------ 00215 // Handle write timeout event 00216 //------------------------------------------------------------------------ 00217 void OnWriteTimeout(); 00218 00219 //------------------------------------------------------------------------ 00220 // Handle read timeout event 00221 //------------------------------------------------------------------------ 00222 void OnReadTimeout(); 00223 00224 //------------------------------------------------------------------------ 00225 // Handle timeout event while handshaking 00226 //------------------------------------------------------------------------ 00227 void OnTimeoutWhileHandshaking(); 00228 00229 //------------------------------------------------------------------------ 00230 // Get signature for given message 00231 //------------------------------------------------------------------------ 00232 Status GetSignature( Message *toSign, Message *&sign ); 00233 00234 //------------------------------------------------------------------------ 00235 // Initialize the iovec with given message 00236 //------------------------------------------------------------------------ 00237 inline void ToIov( Message &msg, iovec &iov ); 00238 00239 //------------------------------------------------------------------------ 00240 // Update iovec after write 00241 //------------------------------------------------------------------------ 00242 inline void UpdateAfterWrite( Message &msg, iovec &iov, int &bytesRead ); 00243 00244 //------------------------------------------------------------------------ 00245 // Add chunks to the given iovec 00246 //------------------------------------------------------------------------ 00247 inline uint32_t ToIov( ChunkList *chunks, 00248 const uint32_t *offset, 00249 iovec *iov ); 00250 00251 //------------------------------------------------------------------------ 00252 // Update raw data after write 00253 //------------------------------------------------------------------------ 00254 inline void UpdateAfterWrite( ChunkList *chunks, 00255 uint32_t *offset, 00256 iovec *iov, 00257 int &bytesWritten ); 00258 00259 //------------------------------------------------------------------------ 00260 // Retry hand shake message 00261 //------------------------------------------------------------------------ 00262 void RetryHSMsg( Message *msg ); 00263 00264 //------------------------------------------------------------------------ 00265 // Extract the value of a wait response 00266 // 00267 // @param rsp : the server response 00268 // @return : if rsp is a wait response then its value 00269 // otherwise -1 00270 //------------------------------------------------------------------------ 00271 inline kXR_int32 HandleWaitRsp( Message *rsp ); 00272 00273 //------------------------------------------------------------------------ 00278 //------------------------------------------------------------------------ 00279 Status ClassifyErrno( int error ); 00280 00281 //------------------------------------------------------------------------ 00282 // Data members 00283 //------------------------------------------------------------------------ 00284 Poller *pPoller; 00285 TransportHandler *pTransport; 00286 AnyObject *pChannelData; 00287 uint16_t pSubStreamNum; 00288 Stream *pStream; 00289 std::string pStreamName; 00290 Socket *pSocket; 00291 Message *pIncoming; 00292 Message *pHSIncoming; 00293 Message *pOutgoing; 00294 Message *pSignature; 00295 Message *pHSOutgoing; 00296 XrdNetAddr pSockAddr; 00297 HandShakeData *pHandShakeData; 00298 bool pHandShakeDone; 00299 uint16_t pTimeoutResolution; 00300 time_t pConnectionStarted; 00301 time_t pConnectionTimeout; 00302 bool pHeaderDone; 00303 std::pair<IncomingMsgHandler*, bool> pIncHandler; 00304 bool pOutMsgDone; 00305 OutgoingMsgHandler *pOutHandler; 00306 uint32_t pIncMsgSize; 00307 uint32_t pOutMsgSize; 00308 time_t pLastActivity; 00309 }; 00310 } 00311 00312 #endif // __XRD_CL_ASYNC_SOCKET_HANDLER_HH__