RCFProto
 All Classes Functions Typedefs
Future.hpp
1 
2 //******************************************************************************
3 // RCF - Remote Call Framework
4 //
5 // Copyright (c) 2005 - 2013, Delta V Software. All rights reserved.
6 // http://www.deltavsoft.com
7 //
8 // RCF is distributed under dual licenses - closed source or GPL.
9 // Consult your particular license for conditions of use.
10 //
11 // If you have not purchased a commercial license, you are using RCF
12 // under GPL terms.
13 //
14 // Version: 2.0
15 // Contact: support <at> deltavsoft.com
16 //
17 //******************************************************************************
18 
19 #ifndef INCLUDE_RCF_FUTURE_HPP
20 #define INCLUDE_RCF_FUTURE_HPP
21 
22 #include <RCF/ClientStub.hpp>
23 #include <RCF/Marshal.hpp>
24 
25 namespace RCF {
26 
27  class I_Future
28  {
29  public:
30  virtual ~I_Future() {}
31  virtual void setClientStub(ClientStub *pClientStub) = 0;
32  };
33 
34  template<typename T>
35  class FutureImpl;
36 
37  template<typename T>
38  class Future
39  {
40  public:
41  Future() : mStatePtr(new State())
42  {}
43 
44  Future(T *pt) : mStatePtr( new State(pt))
45  {}
46 
47  Future(T *pt, ClientStub *pClientStub) : mStatePtr( new State(pt))
48  {
49  pClientStub->enrol(mStatePtr.get());
50  }
51 
52  Future(const T &t) : mStatePtr( new State(t))
53  {}
54 
55  operator T&()
56  {
57  return mStatePtr->operator T&();
58  }
59 
60  T& operator*()
61  {
62  return mStatePtr->operator T&();
63  }
64 
65  Future &operator=(const Future &rhs)
66  {
67  mStatePtr = rhs.mStatePtr;
68  return *this;
69  }
70 
71  Future &operator=(const FutureImpl<T> &rhs)
72  {
73  rhs.assignTo(*this);
74  return *this;
75  }
76 
77  Future(const FutureImpl<T> &rhs) : mStatePtr( new State())
78  {
79  rhs.assignTo(*this);
80  }
81 
82  bool ready()
83  {
84  return mStatePtr->ready();
85  }
86 
87  void wait(boost::uint32_t timeoutMs = 0)
88  {
89  mStatePtr->wait(timeoutMs);
90  }
91 
92  void cancel()
93  {
94  mStatePtr->cancel();
95  }
96 
97  void clear()
98  {
99  mStatePtr->clear();
100  }
101 
102  ClientStub & getClientStub()
103  {
104  return mStatePtr->getClientStub();
105  }
106 
107  std::auto_ptr<Exception> getAsyncException()
108  {
109  return mStatePtr->getClientStub().getAsyncException();
110  }
111 
112  private:
113 
114  template<typename U>
115  friend class FutureImpl;
116 
117  class State : public I_Future, boost::noncopyable
118  {
119  public:
120  State() :
121  mpt(),
122  mtPtr( new T() ),
123  mpClientStub()
124  {}
125 
126  State(T *pt) :
127  mpt(pt),
128  mpClientStub()
129  {}
130 
131  State(const T &t) :
132  mpt(),
133  mtPtr( new T(t) ),
134  mpClientStub()
135  {}
136 
137  ~State()
138  {
139  RCF_DTOR_BEGIN
140  unregisterFromCandidates();
141  RCF_DTOR_END
142  }
143 
144  operator T&()
145  {
146  // If a call has been made, check that it has completed, and
147  // that there was no exception.
148 
149  if (mpClientStub)
150  {
151  if (!mpClientStub->ready())
152  {
153  mpClientStub->waitForReady();
154  }
155 
156  std::auto_ptr<Exception> ePtr =
157  mpClientStub->getAsyncException();
158 
159  if (ePtr.get())
160  {
161  ePtr->throwSelf();
162  }
163  }
164 
165  T *pt = mpt ? mpt : mtPtr.get();
166  {
167  Lock lock(gCandidatesMutex());
168  gCandidates().add(pt, this);
169  }
170 
171  return *pt;
172  }
173 
174  void setClientStub(ClientStub *pClientStub)
175  {
176  mpClientStub = pClientStub;
177  }
178 
179  void setClientStub(ClientStub *pClientStub, T * pt)
180  {
181  unregisterFromCandidates();
182 
183  mpClientStub = pClientStub;
184  mpt = pt;
185  mtPtr.reset();
186  }
187 
188  private:
189 
190  T * mpt;
191  boost::scoped_ptr<T> mtPtr;
192  RCF::ClientStub * mpClientStub;
193 
194  public:
195 
196  bool ready()
197  {
198  return mpClientStub->ready();
199  }
200 
201  void wait(boost::uint32_t timeoutMs = 0)
202  {
203  mpClientStub->waitForReady(timeoutMs);
204  }
205 
206  void cancel()
207  {
208  mpClientStub->cancel();
209  }
210 
211  ClientStub & getClientStub()
212  {
213  return *mpClientStub;
214  }
215 
216  void unregisterFromCandidates()
217  {
218  T *pt = mpt ? mpt : mtPtr.get();
219  Lock lock(gCandidatesMutex());
220  I_Future * pFuture = gCandidates().find(pt);
221  if (pFuture)
222  {
223  gCandidates().erase(pt);
224  }
225  }
226 
227  };
228 
229  boost::shared_ptr<State> mStatePtr;
230  };
231 
232  class LogEntryExit
233  {
234  public:
235  LogEntryExit(ClientStub & clientStub) :
236  mClientStub(clientStub),
237  mMsg(clientStub.mCurrentCallDesc)
238  {
239  if (mClientStub.mCallInProgress)
240  {
241  RCF_THROW(_RcfError_ConcurrentCalls());
242  }
243 
244  mClientStub.mCallInProgress = true;
245  RCF_LOG_2() << "RcfClient - begin remote call. " << mMsg;
246  }
247 
248  ~LogEntryExit()
249  {
250  if (!mClientStub.getAsync())
251  {
252  RCF_LOG_2() << "RcfClient - end remote call. " << mMsg;
253  mClientStub.mCallInProgress = false;
254  }
255  }
256 
257  private:
258  ClientStub & mClientStub;
259  const std::string & mMsg;
260  };
261 
262  template<typename T>
263  class FutureImpl
264  {
265  public:
266  FutureImpl(
267  T &t,
268  ClientStub &clientStub,
269  const std::string & interfaceName,
270  int fnId,
271  RemoteCallSemantics rcs,
272  const char * szFunc = "",
273  const char * szArity = "") :
274  mpT(&t),
275  mpClientStub(&clientStub),
276  mFnId(fnId),
277  mRcs(rcs),
278  mSzFunc(szFunc),
279  mSzArity(szArity),
280  mOwn(true)
281  {
282  // TODO: put this in the initializer list instead?
283  clientStub.init(interfaceName, fnId, rcs);
284  }
285 
286  FutureImpl(const FutureImpl &rhs) :
287  mpT(rhs.mpT),
288  mpClientStub(rhs.mpClientStub),
289  mFnId(rhs.mFnId),
290  mRcs(rhs.mRcs),
291  mSzFunc(rhs.mSzFunc),
292  mSzArity(rhs.mSzArity),
293  mOwn(rhs.mOwn)
294  {
295  rhs.mOwn = false;
296  }
297 
298  FutureImpl &operator=(const FutureImpl &rhs)
299  {
300  mpT = rhs.mpT;
301  mpClientStub = rhs.mpClientStub;
302  mFnId = rhs.mFnId;
303  mRcs = rhs.mRcs;
304  mSzFunc = rhs.mSzFunc;
305  mSzArity = rhs.mSzArity;
306 
307  mOwn = rhs.mOwn;
308  rhs.mOwn = false;
309  return *this;
310  }
311 
312  T get()
313  {
314  return operator T();
315  }
316 
317  // Conversion to T kicks off a sync call.
318  operator T() const
319  {
320  mOwn = false;
321  call();
322  T t = *mpT;
323  mpClientStub->clearParameters();
324  return t;
325  }
326 
327  // Assignment to Future<> kicks off an async call.
328  void assignTo(Future<T> &future) const
329  {
330  mOwn = false;
331  mpClientStub->setAsync(true);
332  future.mStatePtr->setClientStub(mpClientStub, mpT);
333  call();
334  }
335 
336  // Void or ignored return value, kicks off a sync call.
337  ~FutureImpl()
338  {
339  if(mOwn)
340  {
341  call();
342 
343  if (!mpClientStub->getAsync())
344  {
345  mpClientStub->clearParameters();
346  }
347  }
348  }
349 
350  private:
351 
352  void call() const
353  {
354  if (mpClientStub->getTransport().isInProcess())
355  {
356  mpClientStub->getTransport().doInProcessCall(*mpClientStub);
357  if (mpClientStub->mAsyncCallback)
358  {
359  mpClientStub->mAsyncCallback = boost::function0<void>();
360  }
361  return;
362  }
363 
364 #if RCF_FEATURE_FILETRANSFER==1
365 
366  // File uploads are done before the call itself.
367  mpClientStub->processUploadStreams();
368 
369 #endif
370 
371  // TODO
372  bool async = mpClientStub->getAsync();
373 
374  mpClientStub->setTries(0);
375 
376  setCurrentCallDesc(mpClientStub->mCurrentCallDesc, mpClientStub->mRequest, mSzFunc, mSzArity);
377 
378  if (async)
379  {
380  callAsync();
381  }
382  else
383  {
384  callSync();
385  }
386  }
387 
388  void callSync() const
389  {
390  // ClientStub::onConnectCompleted() uses the contents of mEncodedByteBuffers
391  // to determine what stage the current call is in. So mEncodedByteBuffers
392  // needs to be cleared after a remote call, even if an exception is thrown.
393 
394  // Error handling code here will generally also need to be present in
395  // ClientStub::onError().
396 
397  LogEntryExit logEntryExit(*mpClientStub);
398 
399  RCF_LOG_3()(mpClientStub)(mpClientStub->mRequest)
400  << "RcfClient - sending synchronous request.";
401 
402  try
403  {
404  mpClientStub->call(mRcs);
405  }
406  catch(const RCF::RemoteException & e)
407  {
408  mpClientStub->mEncodedByteBuffers.resize(0);
409  if (shouldDisconnectOnRemoteError( e.getError() ))
410  {
411  mpClientStub->disconnect();
412  }
413  throw;
414  }
415  catch(const RCF::Exception &)
416  {
417  mpClientStub->mEncodedByteBuffers.resize(0);
418  mpClientStub->disconnect();
419  throw;
420  }
421  catch(...)
422  {
423  mpClientStub->mEncodedByteBuffers.resize(0);
424  mpClientStub->disconnect();
425  throw;
426  }
427  }
428 
429  void callAsync() const
430  {
431  LogEntryExit logEntryExit(*mpClientStub);
432 
433  RCF_LOG_3()(mpClientStub)(mpClientStub->mRequest)
434  << "RcfClient - sending asynchronous request.";
435 
436  std::auto_ptr<RCF::Exception> ape;
437 
438  try
439  {
440  mpClientStub->call(mRcs);
441  }
442  catch(const RCF::Exception & e)
443  {
444  ape.reset( e.clone().release() );
445  }
446  catch(...)
447  {
448  ape.reset( new Exception(_RcfError_NonStdException()) );
449  }
450 
451  if (ape.get())
452  {
453  mpClientStub->onError(*ape);
454  }
455 
456  getTlsAmiNotification().run();
457  }
458 
459  T * mpT;
460  ClientStub * mpClientStub;
461  int mFnId;
462  RemoteCallSemantics mRcs;
463  const char * mSzFunc;
464  const char * mSzArity;
465 
466  mutable bool mOwn;
467  };
468 
469  template<typename T, typename U>
470  bool operator==(const FutureImpl<T> & fi, const U & u)
471  {
472  return fi.operator T() == u;
473  }
474 
475  template<typename T, typename U>
476  bool operator==(const U & u, const FutureImpl<T> & fi)
477  {
478  return u == fi.operator T();
479  }
480 
481  template<typename T>
482  std::ostream & operator<<(std::ostream & os, const FutureImpl<T> & fi)
483  {
484  return os << fi.operator T();
485  }
486 
487 
488 }
489 
490 #endif // INCLUDE_RCF_FUTURE_HPP