19 #ifndef INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
20 #define INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
28 #include <boost/shared_ptr.hpp>
29 #include <RCF/Export.hpp>
30 #include <RCF/GetInterfaceName.hpp>
31 #include <RCF/PeriodicTimer.hpp>
32 #include <RCF/ServerStub.hpp>
33 #include <RCF/Service.hpp>
34 #include <RCF/Timer.hpp>
40 class ClientTransport;
41 class ServerTransport;
45 typedef boost::shared_ptr<I_RcfClient> RcfClientPtr;
46 typedef std::auto_ptr<ClientTransport> ClientTransportAutoPtr;
47 typedef boost::shared_ptr<RcfSession> RcfSessionPtr;
48 typedef boost::weak_ptr<RcfSession> RcfSessionWeakPtr;
49 typedef boost::shared_ptr<ServerTransport> ServerTransportPtr;
51 template<
typename T>
class RcfClient;
52 class I_RequestSubscription;
55 class SubscriptionService;
58 typedef boost::shared_ptr<Subscription> SubscriptionPtr;
59 typedef boost::weak_ptr<Subscription> SubscriptionWeakPtr;
61 typedef boost::function1<void, RcfSession &> OnSubscriptionDisconnect;
66 class RCF_EXPORT Subscription : boost::noncopyable
71 SubscriptionService & subscriptionService,
72 ClientTransportAutoPtr clientTransportAutoPtr,
73 RcfSessionWeakPtr rcfSessionWeakPtr,
74 boost::uint32_t incomingPingIntervalMs,
75 const std::string & publisherUrl,
76 const std::string & topic,
77 OnSubscriptionDisconnect onDisconnect);
79 void setWeakThisPtr(SubscriptionWeakPtr thisWeakPtr);
85 unsigned int getPingTimestamp();
88 RcfSessionPtr getRcfSessionPtr();
91 friend class SubscriptionService;
93 static void onDisconnect(SubscriptionWeakPtr subPtr, RcfSession & session);
95 SubscriptionService & mSubscriptionService;
96 SubscriptionWeakPtr mThisWeakPtr;
98 RecursiveMutex mMutex;
99 RcfSessionWeakPtr mRcfSessionWeakPtr;
101 boost::shared_ptr<I_RcfClient> mConnectionPtr;
103 boost::uint32_t mPingIntervalMs;
105 std::string mPublisherUrl;
108 OnSubscriptionDisconnect mOnDisconnect;
112 typedef boost::shared_ptr<Subscription> SubscriptionPtr;
113 typedef boost::weak_ptr<Subscription> SubscriptionWeakPtr;
115 typedef boost::function2<void, SubscriptionPtr, ExceptionPtr> OnAsyncSubscribeCompleted;
117 class RCF_EXPORT SubscriptionParms
122 void setTopicName(
const std::string & publisherName);
123 std::string getTopicName()
const;
124 void setPublisherEndpoint(
const Endpoint & publisherEp);
125 void setPublisherEndpoint(I_RcfClient & rcfClient);
126 void setOnSubscriptionDisconnect(OnSubscriptionDisconnect onSubscriptionDisconnect);
127 void setOnAsyncSubscribeCompleted(OnAsyncSubscribeCompleted onAsyncSubscribeCompleted);
131 friend class SubscriptionService;
133 std::string mPublisherName;
134 ClientStub mClientStub;
135 OnSubscriptionDisconnect mOnDisconnect;
136 OnAsyncSubscribeCompleted mOnAsyncSubscribeCompleted;
139 class RCF_EXPORT SubscriptionService :
145 SubscriptionService(boost::uint32_t pingIntervalMs = 0);
147 ~SubscriptionService();
149 template<
typename Interface,
typename T>
150 SubscriptionPtr createSubscription(
152 const SubscriptionParms & parms)
154 std::string defaultPublisherName = getInterfaceName((Interface *) NULL);
156 boost::shared_ptr< I_Deref<T> > derefPtr(
159 RcfClientPtr rcfClientPtr(
160 createServerStub((Interface *) 0, (T *) 0, derefPtr));
162 return createSubscriptionImpl(rcfClientPtr, parms, defaultPublisherName);
165 template<
typename Interface,
typename T>
166 SubscriptionPtr createSubscription(
170 SubscriptionParms parms;
171 parms.setPublisherEndpoint(publisherEp);
172 return createSubscription<Interface>(t, parms);
175 SubscriptionPtr createSubscriptionImpl(
176 RcfClientPtr rcfClientPtr,
177 const SubscriptionParms & parms,
178 const std::string & defaultPublisherName);
180 void createSubscriptionImplBegin(
181 RcfClientPtr rcfClientPtr,
182 const SubscriptionParms & parms,
183 const std::string & defaultPublisherName);
185 void createSubscriptionImplEnd(
187 ClientStubPtr clientStubPtr,
189 const std::string & publisherName,
190 RcfClientPtr rcfClientPtr,
191 OnSubscriptionDisconnect onDisconnect,
192 OnAsyncSubscribeCompleted onCompletion,
193 boost::uint32_t pubToSubPingIntervalMs,
196 void closeSubscription(SubscriptionWeakPtr subscriptionPtr);
198 void setPingIntervalMs(boost::uint32_t pingIntervalMs);
199 boost::uint32_t getPingIntervalMs()
const;
203 void onServerStart(RcfServer &server);
204 void onServerStop(RcfServer &server);
206 RcfServer * mpServer;
207 Mutex mSubscriptionsMutex;
209 typedef std::set<SubscriptionWeakPtr> Subscriptions;
210 Subscriptions mSubscriptions;
212 boost::uint32_t mPingIntervalMs;
213 PeriodicTimer mPeriodicTimer;
215 virtual void onTimer();
216 void pingAllSubscriptions();
217 void harvestExpiredSubscriptions();
219 static void sOnPingCompleted(RecursiveLockPtr lockPtr);
223 SubscriptionPtr onRequestSubscriptionCompleted(
225 const std::string & publisherName,
226 ClientStub & clientStub,
227 RcfClientPtr rcfClientPtr,
228 OnSubscriptionDisconnect onDisconnect,
229 boost::uint32_t pubToSubPingIntervalMs,
234 boost::int32_t doRequestSubscription(
235 ClientStub & clientStubOrig,
236 const std::string & publisherName,
237 boost::uint32_t subToPubPingIntervalMs,
238 boost::uint32_t & pubToSubPingIntervalMs,
239 bool & pingsEnabled);
241 void doRequestSubscriptionAsync(
242 ClientStub & clientStubOrig,
243 const std::string & publisherName,
244 RcfClientPtr rcfClientPtr,
245 const SubscriptionParms & parms);
247 void doRequestSubscriptionAsync_Complete(
249 RcfClientPtr requestClientPtr,
250 const std::string & publisherName,
251 RcfClientPtr rcfClientPtr,
252 OnSubscriptionDisconnect onDisconnect,
253 OnAsyncSubscribeCompleted onCompletion);
257 boost::int32_t doRequestSubscription_Legacy(
258 ClientStub & clientStubOrig,
259 const std::string & publisherName,
260 boost::uint32_t subToPubPingIntervalMs,
261 boost::uint32_t & pubToSubPingIntervalMs,
262 bool & pingsEnabled);
264 void doRequestSubscriptionAsync_Legacy(
265 ClientStub & clientStubOrig,
266 const std::string & publisherName,
267 RcfClientPtr rcfClientPtr,
268 const SubscriptionParms & parms);
270 void doRequestSubscriptionAsync_Legacy_Complete(
271 ClientStubPtr clientStubPtr,
272 Future<boost::int32_t> fRet,
273 const std::string & publisherName,
274 RcfClientPtr rcfClientPtr,
275 OnSubscriptionDisconnect onDisconnect,
276 OnAsyncSubscribeCompleted onCompletion,
277 Future<boost::uint32_t> pubToSubPingIntervalMs,
282 typedef boost::shared_ptr<SubscriptionService> SubscriptionServicePtr;
286 #endif // ! INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP