Is there any way to allow compression in Publish/Subscribe communication?

RCF support and general discussion.
Post Reply
biaosiegel
Posts: 1
Joined: Fri Jan 06, 2017 1:56 pm

Is there any way to allow compression in Publish/Subscribe communication?

Post by biaosiegel »

Hi,

My server updates my clients using publish/subscribe.
I noticed that these updates are pretty big.
For example, an update of a std::vector<MyObj> generated a stream of 107Mb.

For diagnostics purpose, I compressed this stream using boost, reducing the stream size to 12Mb:

Code: Select all

void doSomething(const  std::vector<TaskSummary> &vTaskSummary)
{
   std::stringstream stream, compstream;
   SF::OBinaryStream out(stream);
   out << vTaskSummary;
   std::size_t size= stream.tellp(); // 107 Mb

   boost::iostreams::filtering_streambuf< boost::iostreams::input> in;
   in.push(boost::iostreams::gzip_compressor());
   in.push(stream);
   boost::iostreams::copy(in,compstream);
   auto otherSize= compstream.tellp(); // 12 Mb
}
Is there any way to allow compression in this communication?

An example of this communication:

Code: Select all

#include <RCF/RCF.hpp>
#include <SF/vector.hpp>

RCF_BEGIN(I_TestDataTransfer, "I_TestDataTransfer")
   RCF_METHOD_V1(void, Transfer, const std::vector<char>&)
RCF_END(I_TestDataTransfer)

class TestDataTransfer
{
public:
   void Transfer(const std::vector<char> &data)
   {
      std::cout << "Received vector " << data.size() << std::endl;
   }
};

int main(int argc, char* argv[])
{
   RCF::RcfInitDeinit rcfInit;
   RCF::enableLogging(RCF::LogToDebugWindow(), 4);
   RCF::setDefaultMaxMessageLength(1 * 1024); // 1 Mb

   // BEGIN Publisher config
   RCF::RcfServer publishingServer( RCF::TcpEndpoint(50001) );
   publishingServer.start();

   typedef boost::shared_ptr< RCF::Publisher<I_TestDataTransfer> > DataTransferPublisherPtr;
   DataTransferPublisherPtr pubPtr = publishingServer.createPublisher<I_TestDataTransfer>();
   // END Publisher config

   // BEGIN Subscriber config
   RCF::RcfServer subscriptionServer( RCF::TcpEndpoint(-1) );
   subscriptionServer.start();

   TestDataTransfer dataTransfer;
   RCF::SubscriptionParms subParms;
   subParms.setPublisherEndpoint( RCF::TcpEndpoint(50001) );
   RCF::SubscriptionPtr subPtr = subscriptionServer.createSubscription<I_TestDataTransfer>(
      dataTransfer, 
      subParms);
   // END Subscriber config

   std::vector<char> data;

   // BEGIN Transfer OK!! 951 < RCF::getDefaultMaxMessageLength()
   data.resize(900);
   pubPtr->publish().Transfer(data);
   // LOG: MulticastClientTransport::send() - entry. [Args: lengthByteBuffers(data)=951, timeoutMs=10000, ]
   // END...

   // BEGIN Transfer NOT OK!! 1071 > RCF::getDefaultMaxMessageLength()
   data.resize(1020);
   pubPtr->publish().getClientStub().setEnableCompression(true);
   pubPtr->publish().Transfer(data);
   // LOG: MulticastClientTransport::send() - entry. [Args: lengthByteBuffers(data)=1071, timeoutMs=10000, ]
   // END...

   std::cin.get();

   return 0;
}

jarl
Posts: 238
Joined: Mon Oct 03, 2011 4:53 am
Contact:

Re: Is there any way to allow compression in Publish/Subscribe communication?

Post by jarl »

You can use a message filter to compress message data before transmitting over the connection:

Code: Select all

#include <RCF/ZlibCompressionFilter.hpp>

pubPtr->publish().getClientStub().setMessageFilters( 
            RCF::FilterPtr( new RCF::ZlibStatelessCompressionFilter() ) );
Published messages will then be compressed before being sent out out to all the subscribers.
Kind Regards

Jarl Lindrud
Delta V Software
http://www.deltavsoft.com

Post Reply