RCFProto
 All Classes Functions Typedefs
SubscriptionService.hpp
1 
2 //******************************************************************************
3 // RCF - Remote Call Framework
4 //
5 // Copyright (c) 2005 - 2013, Delta V Software. All rights reserved.
6 // http://www.deltavsoft.com
7 //
8 // RCF is distributed under dual licenses - closed source or GPL.
9 // Consult your particular license for conditions of use.
10 //
11 // If you have not purchased a commercial license, you are using RCF
12 // under GPL terms.
13 //
14 // Version: 2.0
15 // Contact: support <at> deltavsoft.com
16 //
17 //******************************************************************************
18 
19 #ifndef INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
20 #define INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
21 
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <utility>
27 
28 #include <boost/shared_ptr.hpp>
29 #include <RCF/Export.hpp>
30 #include <RCF/GetInterfaceName.hpp>
31 #include <RCF/PeriodicTimer.hpp>
32 #include <RCF/ServerStub.hpp>
33 #include <RCF/Service.hpp>
34 #include <RCF/Timer.hpp>
35 
36 namespace RCF {
37 
38  class RcfServer;
39  class RcfSession;
40  class ClientTransport;
41  class ServerTransport;
42  class Endpoint;
43  class I_RcfClient;
44 
45  typedef boost::shared_ptr<I_RcfClient> RcfClientPtr;
46  typedef std::auto_ptr<ClientTransport> ClientTransportAutoPtr;
47  typedef boost::shared_ptr<RcfSession> RcfSessionPtr;
48  typedef boost::weak_ptr<RcfSession> RcfSessionWeakPtr;
49  typedef boost::shared_ptr<ServerTransport> ServerTransportPtr;
50 
51  template<typename T> class RcfClient;
52  class I_RequestSubscription;
53  class I_Null;
54 
55  class SubscriptionService;
56 
57  class Subscription;
58  typedef boost::shared_ptr<Subscription> SubscriptionPtr;
59  typedef boost::weak_ptr<Subscription> SubscriptionWeakPtr;
60 
61  typedef boost::function1<void, RcfSession &> OnSubscriptionDisconnect;
62 
63  template<typename T>
64  class Future;
65 
66  class RCF_EXPORT Subscription : boost::noncopyable
67  {
68  private:
69 
70  typedef RcfClient<I_RequestSubscription> AsyncClient;
71  typedef boost::shared_ptr<AsyncClient> AsyncClientPtr;
72 
73  Subscription(
74  SubscriptionService & subscriptionService,
75  ClientTransportAutoPtr clientTransportAutoPtr,
76  RcfSessionWeakPtr rcfSessionWeakPtr,
77  boost::uint32_t incomingPingIntervalMs,
78  const std::string & publisherUrl,
79  const std::string & topic,
80  OnSubscriptionDisconnect onDisconnect);
81 
82  void setWeakThisPtr(SubscriptionWeakPtr thisWeakPtr);
83 
84  public:
85 
86  ~Subscription();
87 
88  unsigned int getPingTimestamp();
89  bool isConnected();
90  void close();
91  RcfSessionPtr getRcfSessionPtr();
92 
93  private:
94  friend class SubscriptionService;
95 
96  static void onDisconnect(SubscriptionWeakPtr subPtr, RcfSession & session);
97 
98  SubscriptionService & mSubscriptionService;
99  SubscriptionWeakPtr mThisWeakPtr;
100 
101  RecursiveMutex mMutex;
102  RcfSessionWeakPtr mRcfSessionWeakPtr;
103 
104  boost::shared_ptr< RcfClient<I_Null> > mConnectionPtr;
105 
106  boost::uint32_t mPingIntervalMs;
107  bool mPingsEnabled;
108  std::string mPublisherUrl;
109  std::string mTopic;
110 
111  OnSubscriptionDisconnect mOnDisconnect;
112  bool mClosed;
113  };
114 
115  typedef boost::shared_ptr<Subscription> SubscriptionPtr;
116  typedef boost::weak_ptr<Subscription> SubscriptionWeakPtr;
117 
118  typedef boost::function2<void, SubscriptionPtr, ExceptionPtr> OnAsyncSubscribeCompleted;
119 
120  class RCF_EXPORT SubscriptionParms
121  {
122  public:
123  SubscriptionParms();
124 
125  void setTopicName(const std::string & publisherName);
126  std::string getTopicName() const;
127  void setPublisherEndpoint(const Endpoint & publisherEp);
128  void setPublisherEndpoint(I_RcfClient & rcfClient);
129  void setOnSubscriptionDisconnect(OnSubscriptionDisconnect onSubscriptionDisconnect);
130  void setOnAsyncSubscribeCompleted(OnAsyncSubscribeCompleted onAsyncSubscribeCompleted);
131 
132  private:
133 
134  friend class SubscriptionService;
135 
136  std::string mPublisherName;
137  ClientStub mClientStub;
138  OnSubscriptionDisconnect mOnDisconnect;
139  OnAsyncSubscribeCompleted mOnAsyncSubscribeCompleted;
140  };
141 
142  class RCF_EXPORT SubscriptionService :
143  public I_Service,
144  boost::noncopyable
145  {
146  public:
147 
148  SubscriptionService(boost::uint32_t pingIntervalMs = 0);
149 
150  ~SubscriptionService();
151 
152  template<typename Interface, typename T>
153  SubscriptionPtr createSubscription(
154  T & t,
155  const SubscriptionParms & parms)
156  {
157  std::string defaultPublisherName = getInterfaceName((Interface *) NULL);
158 
159  boost::shared_ptr< I_Deref<T> > derefPtr(
160  new DerefObj<T>(t));
161 
162  RcfClientPtr rcfClientPtr(
163  createServerStub((Interface *) 0, (T *) 0, derefPtr));
164 
165  return createSubscriptionImpl(rcfClientPtr, parms, defaultPublisherName);
166  }
167 
168  template<typename Interface, typename T>
169  SubscriptionPtr createSubscription(
170  T & t,
171  const RCF::Endpoint & publisherEp)
172  {
173  SubscriptionParms parms;
174  parms.setPublisherEndpoint(publisherEp);
175  return createSubscription<Interface>(t, parms);
176  }
177 
178  SubscriptionPtr createSubscriptionImpl(
179  RcfClientPtr rcfClientPtr,
180  const SubscriptionParms & parms,
181  const std::string & defaultPublisherName);
182 
183  void createSubscriptionImplBegin(
184  RcfClientPtr rcfClientPtr,
185  const SubscriptionParms & parms,
186  const std::string & defaultPublisherName);
187 
188  void createSubscriptionImplEnd(
189  Subscription::AsyncClientPtr clientPtr,
190  Future<boost::int32_t> fRet,
191  const std::string & publisherName,
192  RcfClientPtr rcfClientPtr,
193  OnSubscriptionDisconnect onDisconnect,
194  OnAsyncSubscribeCompleted onCompletion,
195  Future<boost::uint32_t> pubToSubPingIntervalMs,
196  bool pingsEnabled);
197 
198  void closeSubscription(SubscriptionWeakPtr subscriptionPtr);
199 
200  void setPingIntervalMs(boost::uint32_t pingIntervalMs);
201  boost::uint32_t getPingIntervalMs() const;
202 
203  private:
204 
205  void onServerStart(RcfServer &server);
206  void onServerStop(RcfServer &server);
207 
208  RcfServer * mpServer;
209  Mutex mSubscriptionsMutex;
210 
211  typedef std::set<SubscriptionWeakPtr> Subscriptions;
212  Subscriptions mSubscriptions;
213 
214  boost::uint32_t mPingIntervalMs;
215  PeriodicTimer mPeriodicTimer;
216 
217  virtual void onTimer();
218  void pingAllSubscriptions();
219  void harvestExpiredSubscriptions();
220 
221  static void sOnPingCompleted(RecursiveLockPtr lockPtr);
222 
223  public:
224 
225  SubscriptionPtr onRequestSubscriptionCompleted(
226  boost::int32_t ret,
227  const std::string & publisherName,
228  RcfClient<I_RequestSubscription> & client,
229  RcfClientPtr rcfClientPtr,
230  OnSubscriptionDisconnect onDisconnect,
231  boost::uint32_t pubToSubPingIntervalMs,
232  bool pingsEnabled);
233  };
234 
235  typedef boost::shared_ptr<SubscriptionService> SubscriptionServicePtr;
236 
237 } // namespace RCF
238 
239 #endif // ! INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP