00001 #ifndef __XRDFILECACHE_FILE_HH__
00002 #define __XRDFILECACHE_FILE_HH__
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "XrdCl/XrdClXRootDResponses.hh"
00022 #include "XrdCl/XrdClDefaultEnv.hh"
00023
00024 #include "XrdOuc/XrdOucCache2.hh"
00025 #include "XrdOuc/XrdOucIOVec.hh"
00026
00027 #include "XrdFileCacheInfo.hh"
00028 #include "XrdFileCacheStats.hh"
00029
00030 #include <string>
00031 #include <map>
00032
00033 class XrdJob;
00034 class XrdOucIOVec;
00035
00036 namespace XrdCl
00037 {
00038 class Log;
00039 }
00040
00041 namespace XrdFileCache
00042 {
00043 class BlockResponseHandler;
00044 class DirectResponseHandler;
00045 class IO;
00046
00047 struct ReadVBlockListRAM;
00048 struct ReadVChunkListRAM;
00049 struct ReadVBlockListDisk;
00050 struct ReadVChunkListDisk;
00051 }
00052
00053
00054 namespace XrdFileCache
00055 {
00056
00057 class File;
00058
00059 class Block
00060 {
00061 public:
00062 std::vector<char> m_buff;
00063 long long m_offset;
00064 File *m_file;
00065 IO *m_io;
00066
00067 int m_refcnt;
00068 int m_errno;
00069 bool m_downloaded;
00070 bool m_prefetch;
00071
00072 Block(File *f, IO *io, long long off, int size, bool m_prefetch) :
00073 m_offset(off), m_file(f), m_io(io), m_refcnt(0),
00074 m_errno(0), m_downloaded(false), m_prefetch(m_prefetch)
00075 {
00076 m_buff.resize(size);
00077 }
00078
00079 char* get_buff(long long pos = 0) { return &m_buff[pos]; }
00080 int get_size() { return (int) m_buff.size(); }
00081 long long get_offset() { return m_offset; }
00082
00083 IO* get_io() const { return m_io; }
00084
00085 bool is_finished() { return m_downloaded || m_errno != 0; }
00086 bool is_ok() { return m_downloaded; }
00087 bool is_failed() { return m_errno != 0; }
00088
00089 void set_downloaded() { m_downloaded = true; }
00090 void set_error(int err) { m_errno = err; }
00091
00092 void reset_error_and_set_io(IO *io)
00093 {
00094 m_errno = 0;
00095 m_io = io;
00096 }
00097 };
00098
00099
00100
00101 class BlockResponseHandler : public XrdOucCacheIOCB
00102 {
00103 public:
00104 Block *m_block;
00105 bool m_for_prefetch;
00106
00107 BlockResponseHandler(Block *b, bool prefetch) :
00108 m_block(b), m_for_prefetch(prefetch) {}
00109
00110 virtual void Done(int result);
00111 };
00112
00113
00114
00115 class DirectResponseHandler : public XrdOucCacheIOCB
00116 {
00117 public:
00118 XrdSysCondVar m_cond;
00119 int m_to_wait;
00120 int m_errno;
00121
00122 DirectResponseHandler(int to_wait) : m_cond(0), m_to_wait(to_wait), m_errno(0) {}
00123
00124 bool is_finished() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0; }
00125 bool is_ok() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0 && m_errno == 0; }
00126 bool is_failed() { XrdSysCondVarHelper _lck(m_cond); return m_errno != 0; }
00127
00128 virtual void Done(int result);
00129 };
00130
00131
00132
00133 class File
00134 {
00135 public:
00136
00138
00139 File(const std::string &path, long long offset, long long fileSize);
00140
00141
00143
00144 ~File();
00145
00146 void BlockRemovedFromWriteQ(Block*);
00148 bool Open();
00149
00151 int ReadV(IO *io, const XrdOucIOVec *readV, int n);
00152
00154 int Read (IO *io, char* buff, long long offset, int size);
00155
00156
00158
00159 bool isOpen() const { return m_is_open; }
00160
00161
00164
00165 bool ioActive(IO *io);
00166
00167
00170
00171 void RequestSyncOfDetachStats();
00172
00173
00176
00177 bool FinalizeSyncBeforeExit();
00178
00179
00181
00182 void Sync();
00183
00184
00186
00187 Stats& GetStats() { return m_stats; }
00188
00189 void ProcessBlockResponse(BlockResponseHandler* brh, int res);
00190 void WriteBlockToDisk(Block* b);
00191
00192 void Prefetch();
00193
00194 float GetPrefetchScore() const;
00195
00197 const char* lPath() const;
00198
00199 std::string& GetLocalPath() { return m_filename; }
00200
00201 XrdSysTrace* GetTrace();
00202
00203 long long GetFileSize() { return m_fileSize; }
00204
00205 void AddIO(IO *io);
00206 int GetPrefetchCountOnIO(IO *io);
00207 void StopPrefetchingOnIO(IO *io);
00208 void RemoveIO(IO *io);
00209
00210
00211 int get_ref_cnt() { return m_ref_cnt; }
00212 int inc_ref_cnt() { return ++m_ref_cnt; }
00213 int dec_ref_cnt() { return --m_ref_cnt; }
00214
00215 private:
00216 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
00217
00218 int m_ref_cnt;
00219
00220 bool m_is_open;
00221
00222 XrdOssDF *m_output;
00223 XrdOssDF *m_infoFile;
00224 Info m_cfi;
00225
00226 std::string m_filename;
00227 long long m_offset;
00228 long long m_fileSize;
00229
00230
00231
00232 struct IODetails
00233 {
00234 int m_active_prefetches;
00235 bool m_allow_prefetching;
00236
00237 IODetails() : m_active_prefetches(0), m_allow_prefetching(true) {}
00238 };
00239
00240 typedef std::map<IO*, IODetails> IoMap_t;
00241 typedef IoMap_t::iterator IoMap_i;
00242
00243 IoMap_t m_io_map;
00244 IoMap_i m_current_io;
00245 int m_ios_in_detach;
00246
00247
00248 std::vector<int> m_writes_during_sync;
00249 int m_non_flushed_cnt;
00250 bool m_in_sync;
00251
00252 typedef std::list<int> IntList_t;
00253 typedef IntList_t::iterator IntList_i;
00254
00255 typedef std::list<Block*> BlockList_t;
00256 typedef BlockList_t::iterator BlockList_i;
00257
00258 typedef std::map<int, Block*> BlockMap_t;
00259 typedef BlockMap_t::iterator BlockMap_i;
00260
00261
00262 BlockMap_t m_block_map;
00263
00264 XrdSysCondVar m_downloadCond;
00265
00266 Stats m_stats;
00267
00268 PrefetchState_e m_prefetchState;
00269
00270 int m_prefetchReadCnt;
00271 int m_prefetchHitCnt;
00272 float m_prefetchScore;
00273
00274 bool m_detachTimeIsLogged;
00275
00276 static const char *m_traceID;
00277 bool overlap(int blk,
00278 long long blk_size,
00279 long long req_off,
00280 int req_size,
00281
00282 long long &off,
00283 long long &blk_off,
00284 long long &size);
00285
00286
00287 Block* PrepareBlockRequest(int i, IO *io, bool prefetch);
00288
00289 void ProcessBlockRequest (Block *b, bool prefetch);
00290 void ProcessBlockRequests(BlockList_t& blks, bool prefetch);
00291
00292 int RequestBlocksDirect(IO *io, DirectResponseHandler *handler, IntList_t& blocks,
00293 char* buff, long long req_off, long long req_size);
00294
00295 int ReadBlocksFromDisk(IntList_t& blocks,
00296 char* req_buf, long long req_off, long long req_size);
00297
00298
00299 bool VReadValidate (const XrdOucIOVec *readV, int n);
00300 bool VReadPreProcess (IO *io, const XrdOucIOVec *readV, int n,
00301 ReadVBlockListRAM& blks_to_process,
00302 ReadVBlockListDisk& blks_on_disk,
00303 std::vector<XrdOucIOVec>& chunkVec);
00304 int VReadFromDisk (const XrdOucIOVec *readV, int n,
00305 ReadVBlockListDisk& blks_on_disk);
00306 int VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n,
00307 std::vector<ReadVChunkListRAM>& blks_to_process,
00308 std::vector<ReadVChunkListRAM>& blks_rocessed);
00309
00310 long long BufferSize();
00311
00312 void inc_ref_count(Block*);
00313 void dec_ref_count(Block*);
00314 void free_block(Block*);
00315
00316 bool select_current_io_or_disable_prefetching(bool skip_current);
00317
00318 int offsetIdx(int idx);
00319 };
00320
00321 }
00322
00323 #endif