19 #ifndef INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
20 #define INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP
28 #include <boost/shared_ptr.hpp>
29 #include <RCF/Export.hpp>
30 #include <RCF/GetInterfaceName.hpp>
31 #include <RCF/PeriodicTimer.hpp>
32 #include <RCF/ServerStub.hpp>
33 #include <RCF/Service.hpp>
34 #include <RCF/Timer.hpp>
40 class ClientTransport;
41 class ServerTransport;
45 typedef boost::shared_ptr<I_RcfClient> RcfClientPtr;
46 typedef std::auto_ptr<ClientTransport> ClientTransportAutoPtr;
47 typedef boost::shared_ptr<RcfSession> RcfSessionPtr;
48 typedef boost::weak_ptr<RcfSession> RcfSessionWeakPtr;
49 typedef boost::shared_ptr<ServerTransport> ServerTransportPtr;
51 template<
typename T>
class RcfClient;
52 class I_RequestSubscription;
55 class SubscriptionService;
58 typedef boost::shared_ptr<Subscription> SubscriptionPtr;
59 typedef boost::weak_ptr<Subscription> SubscriptionWeakPtr;
61 typedef boost::function1<void, RcfSession &> OnSubscriptionDisconnect;
66 class RCF_EXPORT Subscription : boost::noncopyable
70 typedef RcfClient<I_RequestSubscription> AsyncClient;
71 typedef boost::shared_ptr<AsyncClient> AsyncClientPtr;
74 SubscriptionService & subscriptionService,
75 ClientTransportAutoPtr clientTransportAutoPtr,
76 RcfSessionWeakPtr rcfSessionWeakPtr,
77 boost::uint32_t incomingPingIntervalMs,
78 const std::string & publisherUrl,
79 const std::string & topic,
80 OnSubscriptionDisconnect onDisconnect);
82 void setWeakThisPtr(SubscriptionWeakPtr thisWeakPtr);
88 unsigned int getPingTimestamp();
91 RcfSessionPtr getRcfSessionPtr();
94 friend class SubscriptionService;
96 static void onDisconnect(SubscriptionWeakPtr subPtr, RcfSession & session);
98 SubscriptionService & mSubscriptionService;
99 SubscriptionWeakPtr mThisWeakPtr;
101 RecursiveMutex mMutex;
102 RcfSessionWeakPtr mRcfSessionWeakPtr;
104 boost::shared_ptr< RcfClient<I_Null> > mConnectionPtr;
106 boost::uint32_t mPingIntervalMs;
108 std::string mPublisherUrl;
111 OnSubscriptionDisconnect mOnDisconnect;
115 typedef boost::shared_ptr<Subscription> SubscriptionPtr;
116 typedef boost::weak_ptr<Subscription> SubscriptionWeakPtr;
118 typedef boost::function2<void, SubscriptionPtr, ExceptionPtr> OnAsyncSubscribeCompleted;
120 class RCF_EXPORT SubscriptionParms
125 void setTopicName(
const std::string & publisherName);
126 std::string getTopicName()
const;
127 void setPublisherEndpoint(
const Endpoint & publisherEp);
128 void setPublisherEndpoint(I_RcfClient & rcfClient);
129 void setOnSubscriptionDisconnect(OnSubscriptionDisconnect onSubscriptionDisconnect);
130 void setOnAsyncSubscribeCompleted(OnAsyncSubscribeCompleted onAsyncSubscribeCompleted);
134 friend class SubscriptionService;
136 std::string mPublisherName;
137 ClientStub mClientStub;
138 OnSubscriptionDisconnect mOnDisconnect;
139 OnAsyncSubscribeCompleted mOnAsyncSubscribeCompleted;
142 class RCF_EXPORT SubscriptionService :
148 SubscriptionService(boost::uint32_t pingIntervalMs = 0);
150 ~SubscriptionService();
152 template<
typename Interface,
typename T>
153 SubscriptionPtr createSubscription(
155 const SubscriptionParms & parms)
157 std::string defaultPublisherName = getInterfaceName((Interface *) NULL);
159 boost::shared_ptr< I_Deref<T> > derefPtr(
162 RcfClientPtr rcfClientPtr(
163 createServerStub((Interface *) 0, (T *) 0, derefPtr));
165 return createSubscriptionImpl(rcfClientPtr, parms, defaultPublisherName);
168 template<
typename Interface,
typename T>
169 SubscriptionPtr createSubscription(
173 SubscriptionParms parms;
174 parms.setPublisherEndpoint(publisherEp);
175 return createSubscription<Interface>(t, parms);
178 SubscriptionPtr createSubscriptionImpl(
179 RcfClientPtr rcfClientPtr,
180 const SubscriptionParms & parms,
181 const std::string & defaultPublisherName);
183 void createSubscriptionImplBegin(
184 RcfClientPtr rcfClientPtr,
185 const SubscriptionParms & parms,
186 const std::string & defaultPublisherName);
188 void createSubscriptionImplEnd(
189 Subscription::AsyncClientPtr clientPtr,
190 Future<boost::int32_t> fRet,
191 const std::string & publisherName,
192 RcfClientPtr rcfClientPtr,
193 OnSubscriptionDisconnect onDisconnect,
194 OnAsyncSubscribeCompleted onCompletion,
195 Future<boost::uint32_t> pubToSubPingIntervalMs,
198 void closeSubscription(SubscriptionWeakPtr subscriptionPtr);
200 void setPingIntervalMs(boost::uint32_t pingIntervalMs);
201 boost::uint32_t getPingIntervalMs()
const;
205 void onServerStart(RcfServer &server);
206 void onServerStop(RcfServer &server);
208 RcfServer * mpServer;
209 Mutex mSubscriptionsMutex;
211 typedef std::set<SubscriptionWeakPtr> Subscriptions;
212 Subscriptions mSubscriptions;
214 boost::uint32_t mPingIntervalMs;
215 PeriodicTimer mPeriodicTimer;
217 virtual void onTimer();
218 void pingAllSubscriptions();
219 void harvestExpiredSubscriptions();
221 static void sOnPingCompleted(RecursiveLockPtr lockPtr);
225 SubscriptionPtr onRequestSubscriptionCompleted(
227 const std::string & publisherName,
228 RcfClient<I_RequestSubscription> & client,
229 RcfClientPtr rcfClientPtr,
230 OnSubscriptionDisconnect onDisconnect,
231 boost::uint32_t pubToSubPingIntervalMs,
235 typedef boost::shared_ptr<SubscriptionService> SubscriptionServicePtr;
239 #endif // ! INCLUDE_RCF_SUBSCRIPTIONSERVICE_HPP