RCFProto
 All Classes Functions Typedefs
SubscriptionService.hpp
1 
2 //******************************************************************************
3 // RCF - Remote Call Framework
4 //
5 // Copyright (c) 2005 - 2013, Delta V Software. All rights reserved.
6 // http://www.deltavsoft.com
7 //
8 // RCF is distributed under dual licenses - closed source or GPL.
9 // Consult your particular license for conditions of use.
10 //
11 // If you have not purchased a commercial license, you are using RCF
12 // under GPL terms.
13 //
14 // Version: 2.0
15 // Contact: support <at> deltavsoft.com
16 //
17 //******************************************************************************
18 
19 #ifndef INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
20 #define INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
21 
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <utility>
27 
28 #include <boost/shared_ptr.hpp>
29 #include <RCF/Export.hpp>
30 #include <RCF/GetInterfaceName.hpp>
31 #include <RCF/PeriodicTimer.hpp>
32 #include <RCF/ServerStub.hpp>
33 #include <RCF/Service.hpp>
34 #include <RCF/Timer.hpp>
35 
36 namespace RCF {
37 
38  class RcfServer;
39  class RcfSession;
40  class ClientTransport;
41  class ServerTransport;
42  class Endpoint;
43  class I_RcfClient;
44 
45  typedef boost::shared_ptr<I_RcfClient> RcfClientPtr;
46  typedef std::auto_ptr<ClientTransport> ClientTransportAutoPtr;
47  typedef boost::shared_ptr<RcfSession> RcfSessionPtr;
48  typedef boost::weak_ptr<RcfSession> RcfSessionWeakPtr;
49  typedef boost::shared_ptr<ServerTransport> ServerTransportPtr;
50 
51  template<typename T> class RcfClient;
52  class I_RequestSubscription;
53  class I_Null;
54 
55  class SubscriptionService;
56 
57  class Subscription;
58  typedef boost::shared_ptr<Subscription> SubscriptionPtr;
59  typedef boost::weak_ptr<Subscription> SubscriptionWeakPtr;
60 
61  typedef boost::function1<void, RcfSession &> OnSubscriptionDisconnect;
62 
63  template<typename T>
64  class Future;
65 
66  class RCF_EXPORT Subscription : boost::noncopyable
67  {
68  private:
69 
70  Subscription(
71  SubscriptionService & subscriptionService,
72  ClientTransportAutoPtr clientTransportAutoPtr,
73  RcfSessionWeakPtr rcfSessionWeakPtr,
74  boost::uint32_t incomingPingIntervalMs,
75  const std::string & publisherUrl,
76  const std::string & topic,
77  OnSubscriptionDisconnect onDisconnect);
78 
79  void setWeakThisPtr(SubscriptionWeakPtr thisWeakPtr);
80 
81  public:
82 
83  ~Subscription();
84 
85  unsigned int getPingTimestamp();
86  bool isConnected();
87  void close();
88  RcfSessionPtr getRcfSessionPtr();
89 
90  private:
91  friend class SubscriptionService;
92 
93  static void onDisconnect(SubscriptionWeakPtr subPtr, RcfSession & session);
94 
95  SubscriptionService & mSubscriptionService;
96  SubscriptionWeakPtr mThisWeakPtr;
97 
98  RecursiveMutex mMutex;
99  RcfSessionWeakPtr mRcfSessionWeakPtr;
100 
101  boost::shared_ptr<I_RcfClient> mConnectionPtr;
102 
103  boost::uint32_t mPingIntervalMs;
104  bool mPingsEnabled;
105  std::string mPublisherUrl;
106  std::string mTopic;
107 
108  OnSubscriptionDisconnect mOnDisconnect;
109  bool mClosed;
110  };
111 
112  typedef boost::shared_ptr<Subscription> SubscriptionPtr;
113  typedef boost::weak_ptr<Subscription> SubscriptionWeakPtr;
114 
115  typedef boost::function2<void, SubscriptionPtr, ExceptionPtr> OnAsyncSubscribeCompleted;
116 
117  class RCF_EXPORT SubscriptionParms
118  {
119  public:
120  SubscriptionParms();
121 
122  void setTopicName(const std::string & publisherName);
123  std::string getTopicName() const;
124  void setPublisherEndpoint(const Endpoint & publisherEp);
125  void setPublisherEndpoint(I_RcfClient & rcfClient);
126  void setOnSubscriptionDisconnect(OnSubscriptionDisconnect onSubscriptionDisconnect);
127  void setOnAsyncSubscribeCompleted(OnAsyncSubscribeCompleted onAsyncSubscribeCompleted);
128 
129  private:
130 
131  friend class SubscriptionService;
132 
133  std::string mPublisherName;
134  ClientStub mClientStub;
135  OnSubscriptionDisconnect mOnDisconnect;
136  OnAsyncSubscribeCompleted mOnAsyncSubscribeCompleted;
137  };
138 
139  class RCF_EXPORT SubscriptionService :
140  public I_Service,
141  boost::noncopyable
142  {
143  public:
144 
145  SubscriptionService(boost::uint32_t pingIntervalMs = 0);
146 
147  ~SubscriptionService();
148 
149  template<typename Interface, typename T>
150  SubscriptionPtr createSubscription(
151  T & t,
152  const SubscriptionParms & parms)
153  {
154  std::string defaultPublisherName = getInterfaceName((Interface *) NULL);
155 
156  boost::shared_ptr< I_Deref<T> > derefPtr(
157  new DerefObj<T>(t));
158 
159  RcfClientPtr rcfClientPtr(
160  createServerStub((Interface *) 0, (T *) 0, derefPtr));
161 
162  return createSubscriptionImpl(rcfClientPtr, parms, defaultPublisherName);
163  }
164 
165  template<typename Interface, typename T>
166  SubscriptionPtr createSubscription(
167  T & t,
168  const RCF::Endpoint & publisherEp)
169  {
170  SubscriptionParms parms;
171  parms.setPublisherEndpoint(publisherEp);
172  return createSubscription<Interface>(t, parms);
173  }
174 
175  SubscriptionPtr createSubscriptionImpl(
176  RcfClientPtr rcfClientPtr,
177  const SubscriptionParms & parms,
178  const std::string & defaultPublisherName);
179 
180  void createSubscriptionImplBegin(
181  RcfClientPtr rcfClientPtr,
182  const SubscriptionParms & parms,
183  const std::string & defaultPublisherName);
184 
185  void createSubscriptionImplEnd(
186  ExceptionPtr ePtr,
187  ClientStubPtr clientStubPtr,
188  boost::int32_t ret,
189  const std::string & publisherName,
190  RcfClientPtr rcfClientPtr,
191  OnSubscriptionDisconnect onDisconnect,
192  OnAsyncSubscribeCompleted onCompletion,
193  boost::uint32_t pubToSubPingIntervalMs,
194  bool pingsEnabled);
195 
196  void closeSubscription(SubscriptionWeakPtr subscriptionPtr);
197 
198  void setPingIntervalMs(boost::uint32_t pingIntervalMs);
199  boost::uint32_t getPingIntervalMs() const;
200 
201  private:
202 
203  void onServerStart(RcfServer &server);
204  void onServerStop(RcfServer &server);
205 
206  RcfServer * mpServer;
207  Mutex mSubscriptionsMutex;
208 
209  typedef std::set<SubscriptionWeakPtr> Subscriptions;
210  Subscriptions mSubscriptions;
211 
212  boost::uint32_t mPingIntervalMs;
213  PeriodicTimer mPeriodicTimer;
214 
215  virtual void onTimer();
216  void pingAllSubscriptions();
217  void harvestExpiredSubscriptions();
218 
219  static void sOnPingCompleted(RecursiveLockPtr lockPtr);
220 
221  public:
222 
223  SubscriptionPtr onRequestSubscriptionCompleted(
224  boost::int32_t ret,
225  const std::string & publisherName,
226  ClientStub & clientStub,
227  RcfClientPtr rcfClientPtr,
228  OnSubscriptionDisconnect onDisconnect,
229  boost::uint32_t pubToSubPingIntervalMs,
230  bool pingsEnabled);
231 
232  private:
233 
234  boost::int32_t doRequestSubscription(
235  ClientStub & clientStubOrig,
236  const std::string & publisherName,
237  boost::uint32_t subToPubPingIntervalMs,
238  boost::uint32_t & pubToSubPingIntervalMs,
239  bool & pingsEnabled);
240 
241  void doRequestSubscriptionAsync(
242  ClientStub & clientStubOrig,
243  const std::string & publisherName,
244  RcfClientPtr rcfClientPtr,
245  const SubscriptionParms & parms);
246 
247  void doRequestSubscriptionAsync_Complete(
248  Future<Void> fv,
249  RcfClientPtr requestClientPtr,
250  const std::string & publisherName,
251  RcfClientPtr rcfClientPtr,
252  OnSubscriptionDisconnect onDisconnect,
253  OnAsyncSubscribeCompleted onCompletion);
254 
255  // Legacy subscription requests.
256 
257  boost::int32_t doRequestSubscription_Legacy(
258  ClientStub & clientStubOrig,
259  const std::string & publisherName,
260  boost::uint32_t subToPubPingIntervalMs,
261  boost::uint32_t & pubToSubPingIntervalMs,
262  bool & pingsEnabled);
263 
264  void doRequestSubscriptionAsync_Legacy(
265  ClientStub & clientStubOrig,
266  const std::string & publisherName,
267  RcfClientPtr rcfClientPtr,
268  const SubscriptionParms & parms);
269 
270  void doRequestSubscriptionAsync_Legacy_Complete(
271  ClientStubPtr clientStubPtr,
272  Future<boost::int32_t> fRet,
273  const std::string & publisherName,
274  RcfClientPtr rcfClientPtr,
275  OnSubscriptionDisconnect onDisconnect,
276  OnAsyncSubscribeCompleted onCompletion,
277  Future<boost::uint32_t> pubToSubPingIntervalMs,
278  bool pingsEnabled);
279 
280  };
281 
282  typedef boost::shared_ptr<SubscriptionService> SubscriptionServicePtr;
283 
284 } // namespace RCF
285 
286 #endif // ! INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP