Remote Call Framework 3.4
PublishingService.hpp
1 
2 //******************************************************************************
3 // RCF - Remote Call Framework
4 //
5 // Copyright (c) 2005 - 2023, Delta V Software. All rights reserved.
6 // https://www.deltavsoft.com
7 //
8 // RCF is distributed under dual licenses - closed source or GPL.
9 // Consult your particular license for conditions of use.
10 //
11 // If you have not purchased a commercial license, you are using RCF under GPL terms.
12 //
13 // Version: 3.4
14 // Contact: support <at> deltavsoft.com
15 //
16 //******************************************************************************
17 
18 #ifndef INCLUDE_RCF_PUBLISHINGSERVICE_HPP
19 #define INCLUDE_RCF_PUBLISHINGSERVICE_HPP
20 
21 #include <functional>
22 #include <map>
23 #include <string>
24 
25 #include <RCF/ClientStub.hpp>
26 #include <RCF/Export.hpp>
27 #include <RCF/PeriodicTimer.hpp>
28 #include <RCF/RcfClient.hpp>
29 #include <RCF/RcfFwd.hpp>
30 #include <RCF/Service.hpp>
31 #include <RCF/ThreadLibrary.hpp>
32 #include <RCF/Timer.hpp>
33 #include <RCF/Tools.hpp>
34 
35 namespace RCF {
36 
38  class RCF_EXPORT PublisherParms
39  {
40  public:
41 
43  void setTopicName(const std::string & topicName);
44 
46  std::string getTopicName() const;
47 
49  void setOnSubscriberConnect(OnSubscriberConnect onSubscriberConnect);
50 
52  void setOnSubscriberDisconnect(OnSubscriberDisconnect onSubscriberDisconnect);
53 
54  private:
55 
56  friend class PublishingService;
57 
58  std::string mTopicName;
59  OnSubscriberConnect mOnSubscriberConnect;
60  OnSubscriberDisconnect mOnSubscriberDisconnect;
61  };
62 
64  class RCF_EXPORT PublisherBase : Noncopyable
65  {
66  public:
67  PublisherBase(PublishingService & pubService, const PublisherParms & parms);
68  ~PublisherBase();
69 
71  std::string getTopicName();
72 
74  std::size_t getSubscriberCount();
75 
77  void close();
78 
80  void dropSubscriber(RcfSessionWeakPtr sessionWeakPtr);
81 
82  protected:
83 
84  friend class PublishingService;
85 
86  void init();
87 
88  PublishingService & mPublishingService;
89  PublisherParms mParms;
90  bool mClosed;
91  std::string mTopicName;
92  RcfClientPtr mRcfClientPtr;
93  };
94 
96  template<typename Interface>
97  class Publisher : public PublisherBase
98  {
99  public:
100 
101  typedef typename Interface::RcfClientT RcfClientT;
102 
103  Publisher(PublishingService & pubService, const PublisherParms & parms) :
104  PublisherBase(pubService, parms),
105  mpClient(NULL)
106  {
107  if (mTopicName.empty())
108  {
109  mTopicName = getInterfaceName<Interface>();
110  }
111 
112  mRcfClientPtr.reset( new RcfClientT( ClientStub(mTopicName) ) );
113  mpClient = dynamic_cast<RcfClientT *>(mRcfClientPtr.get());
114 
115  init();
116  }
117 
119  RcfClientT & publish()
120  {
121  RCF_ASSERT(!mClosed);
122  return *mpClient;
123  }
124 
125  private:
126 
127  RcfClientT * mpClient;
128  };
129 
130  class I_RequestSubscription;
131  template<typename T> class RcfClient;
132 
133  class RCF_EXPORT PublishingService :
134  public I_Service,
135  Noncopyable
136  {
137  public:
138  PublishingService();
139 
140  ~PublishingService();
141 
142  template<typename Interface>
143  std::shared_ptr< Publisher<Interface> > createPublisher(
144  const PublisherParms & parms)
145  {
146  std::shared_ptr< Publisher<Interface> > publisherPtr(
147  new Publisher<Interface>(*this, parms) );
148 
149  std::string topicName = publisherPtr->getTopicName();
150 
151  RCF_ASSERT(topicName.size() > 0);
152  Lock lock(mPublishersMutex);
153  mPublishers[topicName] = publisherPtr;
154  return publisherPtr;
155  }
156 
157  void setPingIntervalMs(std::uint32_t pingIntervalMs);
158  std::uint32_t getPingIntervalMs() const;
159 
160  void closeSubscription(const std::string connectionGuid);
161 
162 
163  private:
164 
165  friend class RcfClient<I_RequestSubscription>;
166  friend class PublisherBase;
167  friend class PublishingServicePb;
168  friend class RcfSession;
169 
170  std::int32_t RequestSubscription(
171  const std::string &subscriptionName);
172 
173  std::int32_t RequestSubscription(
174  const std::string &subscriptionName,
175  std::uint32_t subToPubPingIntervalMs,
176  std::uint32_t & pubToSubPingIntervalMs);
177 
178  private:
179 
180  void onServiceAdded(RcfServer &server);
181  void onServiceRemoved(RcfServer &server);
182  void onServerStart(RcfServer &server);
183  void onServerStop(RcfServer &server);
184 
185  void addSubscriberTransport(
186  RcfSession &session,
187  const std::string &publisherName,
188  ClientTransportUniquePtrPtr clientTransportUniquePtrPtr,
189  const std::string & connectionGuid);
190 
191  void closePublisher(const std::string & name);
192 
193  // TODO: should be weak_ptr here probably. ~Publisher() will need
194  // to clean them up ?
195  typedef std::map<std::string, PublisherWeakPtr> Publishers;
196 
197  Mutex mPublishersMutex;
198  Publishers mPublishers;
199 
200  std::uint32_t mPingIntervalMs;
201  PeriodicTimer mPeriodicTimer;
202 
203  // Indexed map of subscriber connections, so subscribers can call in and actively terminate their connection.
204  std::map<std::string, std::weak_ptr<ClientTransportUniquePtr>> mSubscriberConnections;
205 
206  virtual void onTimer();
207  void pingAllSubscriptions();
208  void harvestExpiredSubscriptions();
209 
210  };
211 
212  typedef std::shared_ptr<PublishingService> PublishingServicePtr;
213 
214 } // namespace RCF
215 
216 #endif // ! INCLUDE_RCF_PUBLISHINGSERVICE_HPP
RcfClientT & publish()
Returns a reference to the RcfClient<> instance to use when publishing messages.
Definition: PublishingService.hpp:119
Represents a server side session, associated with a client connection.
Definition: RcfSession.hpp:64
Controls the client side of a RCF connection.
Definition: ClientStub.hpp:82
std::function< void(RcfSession &, const std::string &)> OnSubscriberDisconnect
Describes a user-provided callback function to be called on the publisher side, whenever a subscriber...
Definition: RcfFwd.hpp:74
Provides RCF server-side functionality.
Definition: RcfServer.hpp:53
Represents a single publisher within a RcfServer. To create a publisher, use RcfServer::createPublish...
Definition: PublishingService.hpp:97
Definition: AmiIoHandler.hpp:23
General configuration of a publisher.
Definition: PublishingService.hpp:38
std::function< bool(RcfSession &, const std::string &)> OnSubscriberConnect
Describes a user-provided callback function to be called on the publisher side, whenever a subscriber...
Definition: RcfFwd.hpp:71
RCF_EXPORT bool init(RcfConfigT *=nullptr)
Reference-counted initialization of RCF library. May be called multiple times (see deinit())...
Base class of all publishers.
Definition: PublishingService.hpp:64