RCFProto
 All Classes Functions Typedefs
PublishingService.hpp
1 
2 //******************************************************************************
3 // RCF - Remote Call Framework
4 //
5 // Copyright (c) 2005 - 2013, Delta V Software. All rights reserved.
6 // http://www.deltavsoft.com
7 //
8 // RCF is distributed under dual licenses - closed source or GPL.
9 // Consult your particular license for conditions of use.
10 //
11 // If you have not purchased a commercial license, you are using RCF
12 // under GPL terms.
13 //
14 // Version: 2.0
15 // Contact: support <at> deltavsoft.com
16 //
17 //******************************************************************************
18 
19 #ifndef INCLUDE_RCF_PUBLISHINGSERVICE_HPP
20 #define INCLUDE_RCF_PUBLISHINGSERVICE_HPP
21 
22 #include <map>
23 #include <string>
24 
25 #include <boost/shared_ptr.hpp>
26 
27 #include <RCF/ClientStub.hpp>
28 #include <RCF/Export.hpp>
29 #include <RCF/GetInterfaceName.hpp>
30 #include <RCF/PeriodicTimer.hpp>
31 #include <RCF/Service.hpp>
32 #include <RCF/ThreadLibrary.hpp>
33 #include <RCF/Timer.hpp>
34 
35 namespace RCF {
36 
37  class RcfServer;
38  class I_RcfClient;
39  class RcfSession;
40  class ClientTransport;
41  typedef boost::shared_ptr<I_RcfClient> RcfClientPtr;
42  typedef boost::shared_ptr<ClientTransport> ClientTransportPtr;
43 
44  class PublisherBase;
45  typedef boost::shared_ptr<PublisherBase> PublisherPtr;
46  typedef boost::weak_ptr<PublisherBase> PublisherWeakPtr;
47 
48  class PublishingService;
49 
50  typedef boost::function2<bool, RcfSession &, const std::string &> OnSubscriberConnect;
51  typedef boost::function2<void, RcfSession &, const std::string &> OnSubscriberDisconnect;
52 
53  class RCF_EXPORT PublisherParms
54  {
55  public:
56 
57  void setTopicName(const std::string & topicName);
58  std::string getTopicName() const;
59  void setOnSubscriberConnect(OnSubscriberConnect onSubscriberConnect);
60  void setOnSubscriberDisconnect(OnSubscriberDisconnect onSubscriberDisconnect);
61 
62  private:
63 
64  friend class PublishingService;
65 
66  std::string mTopicName;
67  OnSubscriberConnect mOnSubscriberConnect;
68  OnSubscriberDisconnect mOnSubscriberDisconnect;
69  };
70 
71 
72  class RCF_EXPORT PublisherBase : boost::noncopyable
73  {
74  public:
75  PublisherBase(PublishingService & pubService, const PublisherParms & parms);
76  ~PublisherBase();
77 
78  std::string getTopicName();
79  void close();
80 
81  protected:
82 
83  friend class PublishingService;
84 
85  void init();
86 
87  PublishingService & mPublishingService;
88  PublisherParms mParms;
89  bool mClosed;
90  std::string mTopicName;
91  RcfClientPtr mRcfClientPtr;
92  };
93 
94  // Rename to Topic?
95  template<typename Interface>
96  class Publisher : public PublisherBase
97  {
98  public:
99 
100  typedef typename Interface::RcfClientT RcfClientT;
101 
102  Publisher(PublishingService & pubService, const PublisherParms & parms) :
103  PublisherBase(pubService, parms),
104  mpClient(NULL)
105  {
106  if (mTopicName.empty())
107  {
108  mTopicName = getInterfaceName<Interface>();
109  }
110 
111  mRcfClientPtr.reset( new RcfClientT( ClientStub(mTopicName) ) );
112  mpClient = dynamic_cast<RcfClientT *>(mRcfClientPtr.get());
113 
114  init();
115  }
116 
117  RcfClientT & publish()
118  {
119  RCF_ASSERT(!mClosed);
120  return *mpClient;
121  }
122 
123  private:
124 
125  RcfClientT * mpClient;
126  };
127 
128  class I_RequestSubscription;
129  template<typename T> class RcfClient;
130 
131  class RCF_EXPORT PublishingService :
132  public I_Service,
133  boost::noncopyable
134  {
135  public:
136  PublishingService();
137 
138  ~PublishingService();
139 
140  template<typename Interface>
141  boost::shared_ptr< Publisher<Interface> > createPublisher(
142  const PublisherParms & parms)
143  {
144  boost::shared_ptr< Publisher<Interface> > publisherPtr(
145  new Publisher<Interface>(*this, parms) );
146 
147  std::string topicName = publisherPtr->getTopicName();
148 
149  RCF_ASSERT(topicName.size() > 0);
150  Lock lock(mPublishersMutex);
151  mPublishers[topicName] = publisherPtr;
152  return publisherPtr;
153  }
154 
155  void setPingIntervalMs(boost::uint32_t pingIntervalMs);
156  boost::uint32_t getPingIntervalMs() const;
157 
158 
159  private:
160 
161  friend class RcfClient<I_RequestSubscription>;
162  friend class PublisherBase;
163  friend class PublishingServicePb;
164 
165  boost::int32_t RequestSubscription(
166  const std::string &subscriptionName);
167 
168  boost::int32_t RequestSubscription(
169  const std::string &subscriptionName,
170  boost::uint32_t subToPubPingIntervalMs,
171  boost::uint32_t & pubToSubPingIntervalMs);
172 
173  private:
174 
175  void onServiceAdded(RcfServer &server);
176  void onServiceRemoved(RcfServer &server);
177  void onServerStart(RcfServer &server);
178  void onServerStop(RcfServer &server);
179 
180  void addSubscriberTransport(
181  RcfSession &session,
182  const std::string &publisherName,
183  ClientTransportAutoPtrPtr clientTransportAutoPtrPtr);
184 
185  void closePublisher(const std::string & name);
186 
187  // TODO: should be weak_ptr here probably. ~Publisher() will need
188  // to clean them up ?
189  typedef std::map<std::string, PublisherWeakPtr> Publishers;
190 
191  Mutex mPublishersMutex;
192  Publishers mPublishers;
193 
194  boost::uint32_t mPingIntervalMs;
195  PeriodicTimer mPeriodicTimer;
196 
197  virtual void onTimer();
198  void pingAllSubscriptions();
199  void harvestExpiredSubscriptions();
200 
201  };
202 
203  typedef boost::shared_ptr<PublishingService> PublishingServicePtr;
204 
205 } // namespace RCF
206 
207 #endif // ! INCLUDE_RCF_PUBLISHINGSERVICE_HPP