00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
00026 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
00027
00028 #include "XrdCl/XrdClPostMasterInterfaces.hh"
00029 #include "XrdCl/XrdClXRootDResponses.hh"
00030 #include "XrdCl/XrdClDefaultEnv.hh"
00031 #include "XrdCl/XrdClMessage.hh"
00032 #include "XProtocol/XProtocol.hh"
00033
00034 #include <sys/uio.h>
00035
00036 #include <list>
00037 #include <memory>
00038
00039 namespace XrdCl
00040 {
00041 class PostMaster;
00042 class SIDManager;
00043 class URL;
00044 class LocalFileHandler;
00045
00046
00047
00048
00049 struct RedirectEntry
00050 {
00051 RedirectEntry( const URL &from, const URL &to ) : from( from ), to( to )
00052 {
00053
00054 }
00055
00056 URL from;
00057 URL to;
00058 XRootDStatus status;
00059
00060 std::string ToString( bool prevok = true )
00061 {
00062 const std::string tostr = to.GetLocation();
00063 const std::string fromstr = from.GetLocation();
00064
00065 if( prevok )
00066 {
00067 if( tostr == fromstr )
00068 return "Retrying: " + tostr;
00069 return "Redirected from: " + fromstr + " to: " + tostr;
00070 }
00071 return "Failed at: " + fromstr + ", retrying at: " + tostr;
00072 }
00073 };
00074
00075 class XRootDMsgHandler;
00076
00077
00078
00079
00080 class MsgHandlerRef
00081 {
00082 public:
00083
00084 MsgHandlerRef( XRootDMsgHandler *handler) : ref( handler ), count( 1 )
00085 {
00086
00087 }
00088
00089 XRootDMsgHandler* operator->()
00090 {
00091 return ref;
00092 }
00093
00094 operator bool() const
00095 {
00096 return ref;
00097 }
00098
00099 operator XrdSysMutex&()
00100 {
00101 return mtx;
00102 }
00103
00104 MsgHandlerRef& Self()
00105 {
00106 XrdSysMutexHelper lck( mtx );
00107 ++count;
00108 return *this;
00109 }
00110
00111 void Invalidate()
00112 {
00113 XrdSysMutexHelper lck( mtx );
00114 ref = 0;
00115 }
00116
00117 void Free()
00118 {
00119 XrdSysMutexHelper lck( mtx );
00120 --count;
00121 if( count == 0 )
00122 {
00123 lck.UnLock();
00124 delete this;
00125 }
00126 }
00127
00128 private:
00129
00130 XrdSysMutex mtx;
00131 XRootDMsgHandler *ref;
00132 uint16_t count;
00133 };
00134
00135
00137
00138 class XRootDMsgHandler: public IncomingMsgHandler,
00139 public OutgoingMsgHandler
00140 {
00141 friend class HandleRspJob;
00142
00143 public:
00144
00153
00154 XRootDMsgHandler( Message *msg,
00155 ResponseHandler *respHandler,
00156 const URL *url,
00157 SIDManager *sidMgr,
00158 LocalFileHandler *lFileHandler):
00159 pRequest( msg ),
00160 pResponse( 0 ),
00161 pResponseHandler( respHandler ),
00162 pUrl( *url ),
00163 pSidMgr( sidMgr ),
00164 pLFileHandler( lFileHandler ),
00165 pExpiration( 0 ),
00166 pRedirectAsAnswer( false ),
00167 pHosts( 0 ),
00168 pHasLoadBalancer( false ),
00169 pHasSessionId( false ),
00170 pChunkList( 0 ),
00171 pRedirectCounter( 0 ),
00172
00173 pAsyncOffset( 0 ),
00174 pAsyncReadSize( 0 ),
00175 pAsyncReadBuffer( 0 ),
00176 pAsyncMsgSize( 0 ),
00177
00178 pReadRawStarted( false ),
00179 pReadRawCurrentOffset( 0 ),
00180
00181 pReadVRawMsgOffset( 0 ),
00182 pReadVRawChunkHeaderDone( false ),
00183 pReadVRawChunkHeaderStarted( false ),
00184 pReadVRawSizeError( false ),
00185 pReadVRawChunkIndex( 0 ),
00186 pReadVRawMsgDiscard( false ),
00187
00188 pOtherRawStarted( false ),
00189
00190 pFollowMetalink( false ),
00191
00192 pStateful( false ),
00193
00194 pAggregatedWaitTime( 0 ),
00195
00196 pMsgInFly( false ),
00197
00198 pRef( new MsgHandlerRef( this ) )
00199 {
00200 pPostMaster = DefaultEnv::GetPostMaster();
00201 if( msg->GetSessionId() )
00202 pHasSessionId = true;
00203 memset( &pReadVRawChunkHeader, 0, sizeof( readahead_list ) );
00204 }
00205
00206
00208
00209 ~XRootDMsgHandler()
00210 {
00211 pRef->Free();
00212
00213 DumpRedirectTraceBack();
00214
00215 if( !pHasSessionId )
00216 delete pRequest;
00217 delete pResponse;
00218 std::vector<Message *>::iterator it;
00219 for( it = pPartialResps.begin(); it != pPartialResps.end(); ++it )
00220 delete *it;
00221 }
00222
00223
00229
00230 virtual uint16_t Examine( Message *msg );
00231
00232
00236
00237 virtual uint16_t GetSid() const;
00238
00239
00243
00244 virtual void Process( Message *msg );
00245
00246
00256
00257 virtual Status ReadMessageBody( Message *msg,
00258 int socket,
00259 uint32_t &bytesRead );
00260
00261
00267
00268 virtual uint8_t OnStreamEvent( StreamEvent event,
00269 uint16_t streamNum,
00270 Status status );
00271
00272
00274
00275 virtual void OnStatusReady( const Message *message,
00276 Status status );
00277
00278
00280
00281 virtual bool IsRaw() const;
00282
00283
00292
00293 Status WriteMessageBody( int socket,
00294 uint32_t &bytesRead );
00295
00296
00301
00302 ChunkList* GetMessageBody( uint32_t *&asyncOffset )
00303 {
00304 asyncOffset = &pAsyncOffset;
00305 return pChunkList;
00306 }
00307
00308
00312
00313 void WaitDone( time_t now );
00314
00315
00317
00318 void SetExpiration( time_t expiration )
00319 {
00320 pExpiration = expiration;
00321 }
00322
00323
00326
00327 void SetRedirectAsAnswer( bool redirectAsAnswer )
00328 {
00329 pRedirectAsAnswer = redirectAsAnswer;
00330 }
00331
00332
00334
00335 const Message *GetRequest() const
00336 {
00337 return pRequest;
00338 }
00339
00340
00342
00343 void SetLoadBalancer( const HostInfo &loadBalancer )
00344 {
00345 if( !loadBalancer.url.IsValid() )
00346 return;
00347 pLoadBalancer = loadBalancer;
00348 pHasLoadBalancer = true;
00349 }
00350
00351
00353
00354 void SetHostList( HostList *hostList )
00355 {
00356 delete pHosts;
00357 pHosts = hostList;
00358 }
00359
00360
00362
00363 void SetChunkList( ChunkList *chunkList )
00364 {
00365 pChunkList = chunkList;
00366 if( chunkList )
00367 pChunkStatus.resize( chunkList->size() );
00368 else
00369 pChunkStatus.clear();
00370 }
00371
00372
00374
00375 void SetRedirectCounter( uint16_t redirectCounter )
00376 {
00377 pRedirectCounter = redirectCounter;
00378 }
00379
00380 void SetFollowMetalink( bool followMetalink )
00381 {
00382 pFollowMetalink = followMetalink;
00383 }
00384
00385 void SetStateful( bool stateful )
00386 {
00387 pStateful = stateful;
00388 }
00389
00390 private:
00391
00393
00394 Status ReadRawRead( Message *msg,
00395 int socket,
00396 uint32_t &bytesRead );
00397
00398
00400
00401 Status ReadRawReadV( Message *msg,
00402 int socket,
00403 uint32_t &bytesRead );
00404
00405
00407
00408 Status ReadRawOther( Message *msg,
00409 int socket,
00410 uint32_t &bytesRead );
00411
00412
00415
00416 Status ReadAsync( int socket, uint32_t &btesRead );
00417
00418
00420
00421 void HandleError( Status status, Message *msg = 0 );
00422
00423
00425
00426 Status RetryAtServer( const URL &url );
00427
00428
00430
00431 void HandleResponse();
00432
00433
00435
00436 XRootDStatus *ProcessStatus();
00437
00438
00441
00442 Status ParseResponse( AnyObject *&response );
00443
00444
00447
00448 Status RewriteRequestRedirect( const URL &newUrl );
00449
00450
00452
00453 Status RewriteRequestWait();
00454
00455
00457
00458 Status PostProcessReadV( VectorReadInfo *vReadInfo );
00459
00460
00462
00463 Status UnPackReadVResponse( Message *msg );
00464
00465
00467
00468 void UpdateTriedCGI(uint32_t errNo=0);
00469
00470
00472
00473 void SwitchOnRefreshFlag();
00474
00475
00478
00479 void HandleRspOrQueue();
00480
00481
00483
00484 void HandleLocalRedirect( URL *url );
00485
00486
00491
00492 bool IsRetryable( Message *request );
00493
00494
00501
00502 bool OmitWait( Message *request, const URL &url );
00503
00504
00506
00507 void DumpRedirectTraceBack();
00508
00509
00510
00511
00512 struct ChunkStatus
00513 {
00514 ChunkStatus(): sizeError( false ), done( false ) {}
00515 bool sizeError;
00516 bool done;
00517 };
00518
00519 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
00520
00521 Message *pRequest;
00522 Message *pResponse;
00523 std::vector<Message *> pPartialResps;
00524 ResponseHandler *pResponseHandler;
00525 URL pUrl;
00526 PostMaster *pPostMaster;
00527 SIDManager *pSidMgr;
00528 LocalFileHandler *pLFileHandler;
00529 Status pStatus;
00530 Status pLastError;
00531 time_t pExpiration;
00532 bool pRedirectAsAnswer;
00533 HostList *pHosts;
00534 bool pHasLoadBalancer;
00535 HostInfo pLoadBalancer;
00536 bool pHasSessionId;
00537 std::string pRedirectUrl;
00538 ChunkList *pChunkList;
00539 std::vector<ChunkStatus> pChunkStatus;
00540 uint16_t pRedirectCounter;
00541
00542 uint32_t pAsyncOffset;
00543 uint32_t pAsyncReadSize;
00544 char* pAsyncReadBuffer;
00545 uint32_t pAsyncMsgSize;
00546
00547 bool pReadRawStarted;
00548 uint32_t pReadRawCurrentOffset;
00549
00550 uint32_t pReadVRawMsgOffset;
00551 bool pReadVRawChunkHeaderDone;
00552 bool pReadVRawChunkHeaderStarted;
00553 bool pReadVRawSizeError;
00554 int32_t pReadVRawChunkIndex;
00555 readahead_list pReadVRawChunkHeader;
00556 bool pReadVRawMsgDiscard;
00557
00558 bool pOtherRawStarted;
00559
00560 bool pFollowMetalink;
00561
00562 bool pStateful;
00563 int pAggregatedWaitTime;
00564
00565 std::unique_ptr<RedirectEntry> pRdirEntry;
00566 RedirectTraceBack pRedirectTraceBack;
00567
00568 bool pMsgInFly;
00569
00570
00571
00572
00573 MsgHandlerRef *pRef;
00574 };
00575 }
00576
00577 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__