RCFProto
 All Classes Functions Typedefs
PublishingService.hpp
1 
2 //******************************************************************************
3 // RCF - Remote Call Framework
4 //
5 // Copyright (c) 2005 - 2013, Delta V Software. All rights reserved.
6 // http://www.deltavsoft.com
7 //
8 // RCF is distributed under dual licenses - closed source or GPL.
9 // Consult your particular license for conditions of use.
10 //
11 // If you have not purchased a commercial license, you are using RCF
12 // under GPL terms.
13 //
14 // Version: 2.0
15 // Contact: support <at> deltavsoft.com
16 //
17 //******************************************************************************
18 
19 #ifndef INCLUDE_RCF_PUBLISHINGSERVICE_HPP
20 #define INCLUDE_RCF_PUBLISHINGSERVICE_HPP
21 
22 #include <map>
23 #include <string>
24 
25 #include <boost/shared_ptr.hpp>
26 
27 #include <RCF/ClientStub.hpp>
28 #include <RCF/Export.hpp>
29 #include <RCF/GetInterfaceName.hpp>
30 #include <RCF/PeriodicTimer.hpp>
31 #include <RCF/Service.hpp>
32 #include <RCF/ThreadLibrary.hpp>
33 #include <RCF/Timer.hpp>
34 
35 namespace RCF {
36 
37  class RcfServer;
38  class I_RcfClient;
39  class RcfSession;
40  class ClientTransport;
41  typedef boost::shared_ptr<I_RcfClient> RcfClientPtr;
42  typedef boost::shared_ptr<ClientTransport> ClientTransportPtr;
43 
44  class PublisherBase;
45  typedef boost::shared_ptr<PublisherBase> PublisherPtr;
46  typedef boost::weak_ptr<PublisherBase> PublisherWeakPtr;
47 
48  class PublishingService;
49 
50  typedef boost::function2<bool, RcfSession &, const std::string &> OnSubscriberConnect;
51  typedef boost::function2<void, RcfSession &, const std::string &> OnSubscriberDisconnect;
52 
53  class RCF_EXPORT PublisherParms
54  {
55  public:
56 
57  void setTopicName(const std::string & topicName);
58  std::string getTopicName() const;
59  void setOnSubscriberConnect(OnSubscriberConnect onSubscriberConnect);
60  void setOnSubscriberDisconnect(OnSubscriberDisconnect onSubscriberDisconnect);
61 
62  private:
63 
64  friend class PublishingService;
65 
66  std::string mTopicName;
67  OnSubscriberConnect mOnSubscriberConnect;
68  OnSubscriberDisconnect mOnSubscriberDisconnect;
69  };
70 
71 
72  class RCF_EXPORT PublisherBase : boost::noncopyable
73  {
74  public:
75  PublisherBase(PublishingService & pubService, const PublisherParms & parms);
76  ~PublisherBase();
77 
78  std::string getTopicName();
79  std::size_t getSubscriberCount();
80  void close();
81 
82  protected:
83 
84  friend class PublishingService;
85 
86  void init();
87 
88  PublishingService & mPublishingService;
89  PublisherParms mParms;
90  bool mClosed;
91  std::string mTopicName;
92  RcfClientPtr mRcfClientPtr;
93  };
94 
95  // Rename to Topic?
96  template<typename Interface>
97  class Publisher : public PublisherBase
98  {
99  public:
100 
101  typedef typename Interface::RcfClientT RcfClientT;
102 
103  Publisher(PublishingService & pubService, const PublisherParms & parms) :
104  PublisherBase(pubService, parms),
105  mpClient(NULL)
106  {
107  if (mTopicName.empty())
108  {
109  mTopicName = getInterfaceName<Interface>();
110  }
111 
112  mRcfClientPtr.reset( new RcfClientT( ClientStub(mTopicName) ) );
113  mpClient = dynamic_cast<RcfClientT *>(mRcfClientPtr.get());
114 
115  init();
116  }
117 
118  RcfClientT & publish()
119  {
120  RCF_ASSERT(!mClosed);
121  return *mpClient;
122  }
123 
124  private:
125 
126  RcfClientT * mpClient;
127  };
128 
129  class I_RequestSubscription;
130  template<typename T> class RcfClient;
131 
132  class RCF_EXPORT PublishingService :
133  public I_Service,
134  boost::noncopyable
135  {
136  public:
137  PublishingService();
138 
139  ~PublishingService();
140 
141  template<typename Interface>
142  boost::shared_ptr< Publisher<Interface> > createPublisher(
143  const PublisherParms & parms)
144  {
145  boost::shared_ptr< Publisher<Interface> > publisherPtr(
146  new Publisher<Interface>(*this, parms) );
147 
148  std::string topicName = publisherPtr->getTopicName();
149 
150  RCF_ASSERT(topicName.size() > 0);
151  Lock lock(mPublishersMutex);
152  mPublishers[topicName] = publisherPtr;
153  return publisherPtr;
154  }
155 
156  void setPingIntervalMs(boost::uint32_t pingIntervalMs);
157  boost::uint32_t getPingIntervalMs() const;
158 
159 
160  private:
161 
162  friend class RcfClient<I_RequestSubscription>;
163  friend class PublisherBase;
164  friend class PublishingServicePb;
165  friend class RcfSession;
166 
167  boost::int32_t RequestSubscription(
168  const std::string &subscriptionName);
169 
170  boost::int32_t RequestSubscription(
171  const std::string &subscriptionName,
172  boost::uint32_t subToPubPingIntervalMs,
173  boost::uint32_t & pubToSubPingIntervalMs);
174 
175  private:
176 
177  void onServiceAdded(RcfServer &server);
178  void onServiceRemoved(RcfServer &server);
179  void onServerStart(RcfServer &server);
180  void onServerStop(RcfServer &server);
181 
182  void addSubscriberTransport(
183  RcfSession &session,
184  const std::string &publisherName,
185  ClientTransportAutoPtrPtr clientTransportAutoPtrPtr);
186 
187  void closePublisher(const std::string & name);
188 
189  // TODO: should be weak_ptr here probably. ~Publisher() will need
190  // to clean them up ?
191  typedef std::map<std::string, PublisherWeakPtr> Publishers;
192 
193  Mutex mPublishersMutex;
194  Publishers mPublishers;
195 
196  boost::uint32_t mPingIntervalMs;
197  PeriodicTimer mPeriodicTimer;
198 
199  virtual void onTimer();
200  void pingAllSubscriptions();
201  void harvestExpiredSubscriptions();
202 
203  };
204 
205  typedef boost::shared_ptr<PublishingService> PublishingServicePtr;
206 
207 } // namespace RCF
208 
209 #endif // ! INCLUDE_RCF_PUBLISHINGSERVICE_HPP