19 #ifndef INCLUDE_RCF_PUBLISHINGSERVICE_HPP
20 #define INCLUDE_RCF_PUBLISHINGSERVICE_HPP
25 #include <boost/shared_ptr.hpp>
27 #include <RCF/ClientStub.hpp>
28 #include <RCF/Export.hpp>
29 #include <RCF/GetInterfaceName.hpp>
30 #include <RCF/PeriodicTimer.hpp>
31 #include <RCF/Service.hpp>
32 #include <RCF/ThreadLibrary.hpp>
33 #include <RCF/Timer.hpp>
40 class ClientTransport;
41 typedef boost::shared_ptr<I_RcfClient> RcfClientPtr;
42 typedef boost::shared_ptr<ClientTransport> ClientTransportPtr;
45 typedef boost::shared_ptr<PublisherBase> PublisherPtr;
46 typedef boost::weak_ptr<PublisherBase> PublisherWeakPtr;
48 class PublishingService;
50 typedef boost::function2<bool, RcfSession &, const std::string &> OnSubscriberConnect;
51 typedef boost::function2<void, RcfSession &, const std::string &> OnSubscriberDisconnect;
53 class RCF_EXPORT PublisherParms
57 void setTopicName(
const std::string & topicName);
58 std::string getTopicName()
const;
59 void setOnSubscriberConnect(OnSubscriberConnect onSubscriberConnect);
60 void setOnSubscriberDisconnect(OnSubscriberDisconnect onSubscriberDisconnect);
64 friend class PublishingService;
66 std::string mTopicName;
67 OnSubscriberConnect mOnSubscriberConnect;
68 OnSubscriberDisconnect mOnSubscriberDisconnect;
72 class RCF_EXPORT PublisherBase : boost::noncopyable
75 PublisherBase(PublishingService & pubService,
const PublisherParms & parms);
78 std::string getTopicName();
79 std::size_t getSubscriberCount();
84 friend class PublishingService;
88 PublishingService & mPublishingService;
89 PublisherParms mParms;
91 std::string mTopicName;
92 RcfClientPtr mRcfClientPtr;
96 template<
typename Interface>
97 class Publisher :
public PublisherBase
101 typedef typename Interface::RcfClientT RcfClientT;
103 Publisher(PublishingService & pubService,
const PublisherParms & parms) :
104 PublisherBase(pubService, parms),
107 if (mTopicName.empty())
109 mTopicName = getInterfaceName<Interface>();
112 mRcfClientPtr.reset(
new RcfClientT( ClientStub(mTopicName) ) );
113 mpClient =
dynamic_cast<RcfClientT *
>(mRcfClientPtr.get());
118 RcfClientT & publish()
120 RCF_ASSERT(!mClosed);
126 RcfClientT * mpClient;
129 class I_RequestSubscription;
130 template<
typename T>
class RcfClient;
132 class RCF_EXPORT PublishingService :
139 ~PublishingService();
141 template<
typename Interface>
142 boost::shared_ptr< Publisher<Interface> > createPublisher(
143 const PublisherParms & parms)
145 boost::shared_ptr< Publisher<Interface> > publisherPtr(
146 new Publisher<Interface>(*
this, parms) );
148 std::string topicName = publisherPtr->getTopicName();
150 RCF_ASSERT(topicName.size() > 0);
151 Lock lock(mPublishersMutex);
152 mPublishers[topicName] = publisherPtr;
156 void setPingIntervalMs(boost::uint32_t pingIntervalMs);
157 boost::uint32_t getPingIntervalMs()
const;
162 friend class RcfClient<I_RequestSubscription>;
163 friend class PublisherBase;
164 friend class PublishingServicePb;
165 friend class RcfSession;
167 boost::int32_t RequestSubscription(
168 const std::string &subscriptionName);
170 boost::int32_t RequestSubscription(
171 const std::string &subscriptionName,
172 boost::uint32_t subToPubPingIntervalMs,
173 boost::uint32_t & pubToSubPingIntervalMs);
177 void onServiceAdded(RcfServer &server);
178 void onServiceRemoved(RcfServer &server);
179 void onServerStart(RcfServer &server);
180 void onServerStop(RcfServer &server);
182 void addSubscriberTransport(
184 const std::string &publisherName,
185 ClientTransportAutoPtrPtr clientTransportAutoPtrPtr);
187 void closePublisher(
const std::string & name);
191 typedef std::map<std::string, PublisherWeakPtr> Publishers;
193 Mutex mPublishersMutex;
194 Publishers mPublishers;
196 boost::uint32_t mPingIntervalMs;
197 PeriodicTimer mPeriodicTimer;
199 virtual void onTimer();
200 void pingAllSubscriptions();
201 void harvestExpiredSubscriptions();
205 typedef boost::shared_ptr<PublishingService> PublishingServicePtr;
209 #endif // ! INCLUDE_RCF_PUBLISHINGSERVICE_HPP