00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #ifndef __XRD_CL_PARALLELOPERATION_HH__
00027 #define __XRD_CL_PARALLELOPERATION_HH__
00028
00029 #include "XrdCl/XrdClOperations.hh"
00030 #include "XrdCl/XrdClOperationHandlers.hh"
00031
00032 #include <atomic>
00033
00034 namespace XrdCl
00035 {
00036
00037 class ParallelHandler: public PipelineHandler
00038 {
00039
00040 };
00041
00042
00048
00049 template<bool HasHndl>
00050 class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
00051 {
00052 template<bool> friend class ParallelOperation;
00053
00054 public:
00055
00056
00058
00059 template<bool from>
00060 ParallelOperation( ParallelOperation<from> &&obj ) :
00061 ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
00062 pipelines( std::move( obj.pipelines ) )
00063 {
00064 }
00065
00066
00072
00073 template<class Container>
00074 ParallelOperation( Container &&container )
00075 {
00076 static_assert( !HasHndl, "Constructor is available only operation without handler");
00077
00078 pipelines.reserve( container.size() );
00079 auto begin = std::make_move_iterator( container.begin() );
00080 auto end = std::make_move_iterator( container.end() );
00081 std::copy( begin, end, std::back_inserter( pipelines ) );
00082 container.clear();
00083 }
00084
00085
00087
00088 std::string ToString()
00089 {
00090 std::ostringstream oss;
00091 oss << "Parallel(";
00092 for( int i = 0; i < pipelines.size(); i++ )
00093 {
00094 oss << pipelines[i]->ToString();
00095 if( i != pipelines.size() - 1 )
00096 {
00097 oss << " && ";
00098 }
00099 }
00100 oss << ")";
00101 return oss.str();
00102 }
00103
00104 private:
00105
00106
00111
00112 struct Ctx
00113 {
00114
00118
00119 Ctx( PipelineHandler *handler ): handler( handler )
00120 {
00121
00122 }
00123
00124
00126
00127 ~Ctx()
00128 {
00129 Handle( XRootDStatus() );
00130 }
00131
00132
00137
00138 void Handle( const XRootDStatus &st )
00139 {
00140 PipelineHandler* hdlr = handler.exchange( nullptr );
00141 if( hdlr )
00142 hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
00143 }
00144
00145
00147
00148 std::atomic<PipelineHandler*> handler;
00149 };
00150
00151
00157
00158 XRootDStatus RunImpl()
00159 {
00160 std::shared_ptr<Ctx> ctx( new Ctx( this->handler.release() ) );
00161
00162 try
00163 {
00164 for( size_t i = 0; i < pipelines.size(); ++i )
00165 {
00166 pipelines[i].Run( [ctx]( const XRootDStatus &st ){ if( !st.IsOK() ) ctx->Handle( st ); } );
00167 }
00168 }
00169 catch( const PipelineException& ex )
00170 {
00171 return ex.GetError();
00172 }
00173 catch( const std::exception& ex )
00174 {
00175 return XRootDStatus( stError, ex.what() );
00176 }
00177
00178 return XRootDStatus();
00179 }
00180
00181 std::vector<Pipeline> pipelines;
00182 };
00183
00184
00186
00187 template<class Container>
00188 ParallelOperation<false> Parallel( Container &container )
00189 {
00190 return ParallelOperation<false>( container );
00191 }
00192
00193
00195
00196 inline void PipesToVec( std::vector<Pipeline>& )
00197 {
00198
00199 }
00200
00201
00202
00203
00204
00205 template<typename ... Others>
00206 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
00207 Others&... others );
00208
00209 template<typename ... Others>
00210 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
00211 Others&... others );
00212
00213 template<typename ... Others>
00214 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
00215 Others&... others );
00216
00217
00218
00219
00220 template<typename ... Others>
00221 void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
00222 Others&... others )
00223 {
00224 v.emplace_back( operation );
00225 PipesToVec( v, others... );
00226 }
00227
00228 template<typename ... Others>
00229 void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
00230 Others&... others )
00231 {
00232 v.emplace_back( operation );
00233 PipesToVec( v, others... );
00234 }
00235
00236 template<typename ... Others>
00237 void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
00238 Others&... others )
00239 {
00240 v.emplace_back( std::move( pipeline ) );
00241 PipesToVec( v, others... );
00242 }
00243
00244
00249
00250 template<typename ... Operations>
00251 ParallelOperation<false> Parallel( Operations&& ... operations )
00252 {
00253 constexpr size_t size = sizeof...( operations );
00254 std::vector<Pipeline> v;
00255 v.reserve( size );
00256 PipesToVec( v, operations... );
00257 return Parallel( v );
00258 }
00259 }
00260
00261 #endif // __XRD_CL_OPERATIONS_HH__