Page 1 of 1

How to reuse TCP connection among RcfProtoChannel instances

Posted: Mon Sep 29, 2014 9:21 am
by mou
First, the following code is somewhat unreasonable.
I just want to carry out a stress test.
Howerver, I found a TCP connection is not reused in a new RcfProtoChannel, though it connected to the same server.
Is there some way to reuse an existing TCP connection.
Or, is it by design in RCFProto framework that we cannot reuse a TCP connection?

Code: Select all

#include <iostream>

#include <RCF/RCFProto.hpp>

#include "ranking_service.pb.h"

namespace {

void OnCallCompletion(RCF::RcfProtoController* controller,
                      RankingResponse* response) {
  if (controller->Failed()) {
    std::cout << "Error: " << controller->ErrorText() << std::endl;
  } else {
    std::cout << "Received response:" << std::endl;
    std::cout << response->DebugString() << std::endl;
  }

  // delete controller->getChannel();
  delete response;
}

bool AllCompleted(const std::vector<RCF::RcfProtoController>& controllers) {
  for (const RCF::RcfProtoController& controller : controllers) {
    if (!controller.Completed()) {
      return false;
    }
  }

  return true;
}

} // namespace

int main(int argc, char* argv[]) {
  try {
    RCF::init();

    RankingRequest request;
    std::vector<RCF::RcfProtoController> controllers(std::stol(argv[1]));
    const RCF::TcpEndpoint tcp_endpoint("127.0.0.1", 50001);
    for (RCF::RcfProtoController& controller : controllers) {
      RCF::RcfProtoChannel* channel =
          new RCF::RcfProtoChannel(tcp_endpoint);
      channel->setAsynchronousRpcMode(true);
      channel->setConnectTimeoutMs(1000000);
      RankingResponse* response = new RankingResponse;

      RankingService::Stub stub(channel);
      stub.Score(&controller,
                 &request,
                 response,
                 google::protobuf::NewCallback(OnCallCompletion, &controller, response));

      std::cout << "Sending request:" << std::endl;
      std::cout << request.DebugString() << std::endl;
    }

    int i = 0;
    while (!AllCompleted(controllers)) {
      RCF::sleepMs(1);
      ++i;
    }
    std::cout << "Have waited for " << i << " milliseconds." << std::endl;
  } catch(const RCF::Exception & e) {
    std::cout << "RCF::Exception: " << e.getErrorString() << std::endl;
    return 1;
  }

  return 0;
}

Re: How to reuse TCP connection among RcfProtoChannel instan

Posted: Thu Oct 02, 2014 12:23 pm
by jarl
That's correct - with RCFProto there is a one-to-one relationship between RcfProtoChannel and an underlying network connection (TCP in this case). So different RcfProtoChannel instances will have different TCP connections.

Re: How to reuse TCP connection among RcfProtoChannel instan

Posted: Thu Oct 09, 2014 4:14 am
by mou
jarl wrote:That's correct - with RCFProto there is a one-to-one relationship between RcfProtoChannel and an underlying network connection (TCP in this case). So different RcfProtoChannel instances will have different TCP connections.
Do you think this is too heavy?
For each new request, I need use a new RcfProtoChannel and RcfProtoController.

Re: How to reuse TCP connection among RcfProtoChannel instan

Posted: Thu Oct 09, 2014 5:01 am
by jarl
You don't need a new connection for each request... The client is expected to create a single RcfProtoChannel, then use that to create a service stub, and after that all requests that are made on that stub will be using the same underlying connection.

if you have several different services you are calling, or if you have multiple client threads needing to access the service, that's when you would need multiple RcfProtoChannel's.

Re: How to reuse TCP connection among RcfProtoChannel instan

Posted: Thu Oct 09, 2014 5:38 am
by mou
jarl wrote:You don't need a new connection for each request... The client is expected to create a single RcfProtoChannel, then use that to create a service stub, and after that all requests that are made on that stub will be using the same underlying connection.

if you have several different services you are calling, or if you have multiple client threads needing to access the service, that's when you would need multiple RcfProtoChannel's.
I did what you said. But, it threw an exception as the following:
Error: multiple concurrent calls attempted on the same RcfClient<> object. To make concurrent calls, use multiple RcfClient<> objects instead.

The code:

Code: Select all

#include <iostream>
#include <mutex>
#include <queue>

#include <RCF/RCFProto.hpp>

namespace {

size_t g_allocated_size = 0;

std::mutex g_mutex;
std::queue<RCF::RcfProtoController*> g_controller_cache;

void OnCallCompletion(RCF::RcfProtoController* controller,
                      Response* response) {
  if (controller->Failed()) {
    std::cout << "Error: " << controller->ErrorText() << std::endl;
  } else {
    std::cout << "Received response:" << std::endl;
    std::cout << response->DebugString() << std::endl;
  }

  {
    std::lock_guard<std::mutex> guard(g_mutex);
    g_controller_cache.push(controller);
    std::cout << "cache_size = " << g_controller_cache.size() << std::endl;
  }

  delete response;
}

inline bool HasCompleted() {
  std::lock_guard<std::mutex> guard(g_mutex);
  return g_allocated_size == g_controller_cache.size();
}

} // namespace

int main(int argc, char* argv[]) {
  const size_t requests = std::stol(argv[1]);

  try {
    RCF::init();

    ads_proto::RankingRequest request;
    RCF::RcfProtoChannel channel(RCF::TcpEndpoint("127.0.0.1", 7105));
    channel.setAsynchronousRpcMode(true);
    channel.setConnectTimeoutMs(100000);
    ads_proto::RankingService::Stub stub(&channel);

    for (size_t i = 0; i < requests; ++i) {
      RCF::RcfProtoController* controller = nullptr;
      {
        std::lock_guard<std::mutex> guard(g_mutex);
        if (g_controller_cache.empty()) {
          ++g_allocated_size;
          controller = new RCF::RcfProtoController;
        } else {
          controller = g_controller_cache.front();
          g_controller_cache.pop();
        }
      }

      Response* response = new Response;

      stub.Score(controller,
                 &request,
                 response,
                 google::protobuf::NewCallback(OnCallCompletion, controller, response));
    }

    int i = 0;
    while (!HasCompleted()) {
      RCF::sleepMs(1);
      ++i;
    }
    std::cout << "Have waited for " << i << " milliseconds." << std::endl;
  } catch(const RCF::Exception & e) {
    std::cout << "RCF::Exception: " << e.getErrorString() << std::endl;
    return 1;
  }

Re: How to reuse TCP connection among RcfProtoChannel instan

Posted: Thu Oct 09, 2014 11:49 am
by jarl
OK I see what you're doing. The problem is that you can't make multiple simultaneous calls on the same client stub, which is what your code is trying to do.

When making asynchronous calls on a single stub, you can only begin a new call after the previous call has completed. This is a bit of a limitation which we are looking at fixing. In the meantime, for concurrent calls, you'll need separate RcfProtoChannel's.

Re: How to reuse TCP connection among RcfProtoChannel instan

Posted: Fri Oct 10, 2014 1:31 am
by mou
jarl wrote:OK I see what you're doing. The problem is that you can't make multiple simultaneous calls on the same client stub, which is what your code is trying to do.

When making asynchronous calls on a single stub, you can only begin a new call after the previous call has completed. This is a bit of a limitation which we are looking at fixing. In the meantime, for concurrent calls, you'll need separate RcfProtoChannel's.
Okay, looking forward to the next release.
BTW, is there any problem with my sample code in reusing RcfProtoController and RcfProtoChannel through cache mechanism?

Re: How to reuse TCP connection among RcfProtoChannel instances

Posted: Tue Jun 09, 2015 2:22 pm
by pateli
What is the status of this issue in the current release? Is the solution still to use multiple RcfProtoChannel's?

Re: How to reuse TCP connection among RcfProtoChannel instances

Posted: Wed Jun 10, 2015 7:25 am
by jarl
The behavior here has not changed - you still need to use multiple RcfProtoChannel objects (one for each thread).