publish/subscribe has a bug

RCF support and general discussion.
Post Reply
wulixue
Posts: 8
Joined: Sun Dec 20, 2015 6:26 pm

publish/subscribe has a bug

Post by wulixue »

I found a bug when I was using publish/subscribe,
The emergence of the problem is random, information:0xC0000008: An invalid handle error was specified. The problem is called class PublishCompletionInfo's function notifyCompletion() at the end of the call. Need to execute the unlock win_mutex (), and error, from the Microsoft document, :: LeaveCriticalSection (&crit_section_) error, there are two main reasons, an additional object or thread, the release of the Critical Section. Or Critical Section is break.

Can you help me resolve it?
Thanks!!

jarl
Posts: 238
Joined: Mon Oct 03, 2011 4:53 am
Contact:

Re: publish/subscribe has a bug

Post by jarl »

Are you able to send us some code that will reproduce the problem? Otherwise it will be hard to make any progress.
Kind Regards

Jarl Lindrud
Delta V Software
http://www.deltavsoft.com

wulixue
Posts: 8
Joined: Sun Dec 20, 2015 6:26 pm

Re: publish/subscribe has a bug-- code of publisher

Post by wulixue »

//Define a interface
//IDataPush.hpp
#ifndef _IDATAPUSH_OF_SBZVMS_
#define _IDATAPUSH_OF_SBZVMS_
#include <boost/tuple/tuple.hpp>
#include <RCF/RCF.hpp>
#include <RCF/Idl.hpp>
#include <SF/vector.hpp>
#include <SF/tuple.hpp>
#include <SF/list.hpp>

RCF_BEGIN(IDATAPUSH_OF_SBZVMS, "IDATAPUSH_OF_SBZVMS")
RCF_METHOD_V1(void, push_hd_video_data, RCF::ByteBuffer &);
RCF_METHOD_V1(void, push_sd_video_data, RCF::ByteBuffer &);
RCF_METHOD_V5(void, push_alarm_data, int, int, int, int, int);
RCF_END(IDATAPUSH_OF_SBZVMS);

#endif // ! INCLUDE_MYSERVICE_HPP

//publisher
#pragma once
#include <IDataPush.hpp>
#include "RCFNetService.h"

class CPublisher
{
public:
CPublisher();
virtual ~CPublisher();
//初始化publisher
void init_publisher_ptr(CRCFNetService *netService,std::string &ipAddress);
//高清视频数据publisher ptr
boost::shared_ptr<RCF::Publisher<IDATAPUSH_OF_SBZVMS>> hdVideoDataPusherPtr;
//高清视频数据publisher ptr
boost::shared_ptr<RCF::Publisher<IDATAPUSH_OF_SBZVMS>> sdVideoDataPusherPtr;
//高清视频数据publisher ptr
boost::shared_ptr<RCF::Publisher<IDATAPUSH_OF_SBZVMS>> alarmDataPusherPtr;
//推送高清视频数据
void push_hd_video_data(BYTE *pBuffer, DWORD dwBufSize);
//推送标清视频数据
void push_sd_video_data(BYTE *pBuffer, DWORD dwBufSize);
//推送智能数据
//告警触发,此方法由CRuleAnalysisEngine调用
//iType:0(显示);1(显示并告警);2(提示正在学习);3(预留);4(预留);
void push_alarm_data(int wX, int wY, int wWidth, int wHeight, int iType);
private:

};

#include "stdafx.h"
#include "Publisher.h"

CPublisher::CPublisher()
{
}

CPublisher::~CPublisher()
{
}

//初始化publisher
void CPublisher::init_publisher_ptr(CRCFNetService *netService, std::string &ipAddress)
{
RCF::PublisherParms hdPublisherParms;
hdPublisherParms.setTopicName(std::string("topic_name_" + ipAddress + "_hd_video"));
hdVideoDataPusherPtr = netService->server.createPublisher<IDATAPUSH_OF_SBZVMS>(hdPublisherParms);

RCF::PublisherParms sdPublisherParms;
sdPublisherParms.setTopicName(std::string("topic_name_" + ipAddress + "_sd_video"));
sdVideoDataPusherPtr = netService->server.createPublisher<IDATAPUSH_OF_SBZVMS>(sdPublisherParms);

RCF::PublisherParms alarmPublisherParms;
alarmPublisherParms.setTopicName(std::string("topic_name_" + ipAddress + "_alarm"));
alarmDataPusherPtr = netService->server.createPublisher<IDATAPUSH_OF_SBZVMS>(alarmPublisherParms);
}

//推送高清视频数据
void CPublisher::push_hd_video_data(BYTE *pBuffer, DWORD dwBufSize)
{
//////LOGGING(L"push_hd_video_data CPublisher SubscriberCount():%d , TopicName:%S\n", dataPusherPtr->getSubscriberCount(), dataPusherPtr->getTopicName().c_str());

if (!hdVideoDataPusherPtr)
{
return;
}

try
{
RCF::ByteBuffer byteBuffer((char*)pBuffer, (size_t)dwBufSize);
hdVideoDataPusherPtr->publish().push_hd_video_data(byteBuffer);
}
catch (const RCF::Exception & e)
{
////LOGGING(L"push_hd_video_data Caught exception:%S", e.getError().getErrorString().c_str());
return;
}
}
//推送标清视频数据
void CPublisher::push_sd_video_data(BYTE *pBuffer, DWORD dwBufSize)
{
//////LOGGING(L"push_hd_video_data CPublisher SubscriberCount():%d , TopicName:%S\n", dataPusherPtr->getSubscriberCount(), dataPusherPtr->getTopicName().c_str());

if (!sdVideoDataPusherPtr)
{
return;
}

try
{
RCF::ByteBuffer byteBuffer((char*)pBuffer, (size_t)dwBufSize);
sdVideoDataPusherPtr->publish().push_sd_video_data(byteBuffer);
}
catch (const RCF::Exception & e)
{
////LOGGING(L"push_sd_video_data Caught exception:%S", e.getError().getErrorString().c_str());
return;
}
}

//推送智能分析数据
void CPublisher::push_alarm_data(int wX, int wY, int wWidth, int wHeight, int isAlarm)
{
//////LOGGING(L"push_alarm_data CPublisher SubscriberCount():%d , TopicName:%S\n",
// dataPusherPtr->getSubscriberCount(),
// dataPusherPtr->getTopicName().c_str());
if (!alarmDataPusherPtr)
{
return;
}

try
{
alarmDataPusherPtr->publish().push_alarm_data(wX, wY, wWidth, wHeight, isAlarm);
}
catch (const RCF::Exception & e)
{
////LOGGING(L"push_alarm_data Caught exception:%S", e.getError().getErrorString().c_str());
return;
}
}

#pragma once
#include "ServerOprationImpl.h"
class CRCFNetService
{
public:
CRCFNetService();
virtual ~CRCFNetService();

//RPC init
RCF::RcfInitDeinit rcfInit;
//定义RCF SERVER
RCF::RcfServer server;
//播放指定IP设备告警音频API
void play_alarm_audio(const std::string &ipaddress);
//云台控制API
void ipc_ptz_control(const std::string &ipaddress, const int &lcommand, const int &wStop);
//语音数据转发
void forwarding_audio_data(const RCF::ByteBuffer &buffer, const std::string &ipaddress);
//操作控制声音对讲通道,iCommand=1时,返回值为音频编码类型
//音频编码类型:0- G722,1- G711_U,2- G711_A,5- MP2L2,6- G726,7- AAC,8- PCM
int operate_talk_channel(const std::string &ipaddress, const int &iCommand);

private:

//绑定服务端应用
ServerOprationImpl oprationImpl;
};

#include "stdafx.h"
#include "RCFNetService.h"
#include "DeviceSupervisor.h"
#include "DeviceBase.h"


CRCFNetService::CRCFNetService()
{
//RCF::enableLogging(RCF::LogToDebugWindow(), 3);
RCF::setDefaultMaxMessageLength(5* 1024 * 1024);
//设置线程池
RCF::ThreadPoolPtr threadPoolPtr(new RCF::ThreadPool(1, boost::thread::hardware_concurrency()));
//设置线程池
server.setThreadPool(threadPoolPtr);
//添加Endpoint
server.addEndpoint(RCF::TcpEndpoint("0.0.0.0",8898));
//绑定服务
server.bind<ISERVER_OPRATION_OF_SBZVMS>(oprationImpl);
//启动网络服务
server.start();
////绑定回调,验证topic name是否存在
//oprationImpl.detect_topic_name = boost::bind(&CRCFNetService::detect_topic_name,this,_1);

oprationImpl.play_device_alarm_audio = boost::bind(&CRCFNetService::play_alarm_audio, this, _1);
oprationImpl.device_ptz_control = boost::bind(&CRCFNetService::ipc_ptz_control, this, _1, _2, _3);
oprationImpl.forwarding_audio_data_to_ipc = boost::bind(&CRCFNetService::forwarding_audio_data, this, _1, _2);
oprationImpl.control_ipc_talk_channel = boost::bind(&CRCFNetService::operate_talk_channel, this, _1, _2);

return;
}

CRCFNetService::~CRCFNetService()
{
}

//播放指定IP设备告警音频API
void CRCFNetService::play_alarm_audio(const std::string &ipaddress)
{
CDeviceSupervisor &ds = singleton<CDeviceSupervisor>::get_mutable_instance();
ds.get_device_ptr(ipaddress)->play_alarm_audio_async();
};
//云台控制API
void CRCFNetService::ipc_ptz_control(const std::string &ipaddress, const int &lcommand, const int &wStop)
{
CDeviceSupervisor &ds = singleton<CDeviceSupervisor>::get_mutable_instance();
ds.get_device_ptr(ipaddress)->ipc_ptz_control(lcommand, wStop);
};

//语音数据转发
void CRCFNetService::forwarding_audio_data(const RCF::ByteBuffer &buffer, const std::string &ipaddress)
{
CDeviceSupervisor &ds = singleton<CDeviceSupervisor>::get_mutable_instance();
ds.get_device_ptr(ipaddress)->audio_forwarding(buffer);
};

//操作控制声音对讲通道,iCommand=1时,返回值为音频编码类型
//音频编码类型:0- G722,1- G711_U,2- G711_A,5- MP2L2,6- G726,7- AAC,8- PCM
int CRCFNetService::operate_talk_channel(const std::string &ipaddress, const int &iCommand)
{
//
CDeviceSupervisor &ds = singleton<CDeviceSupervisor>::get_mutable_instance();
if (iCommand == 1)
{
return ds.get_device_ptr(ipaddress)->open_audio_channel();

}
else
{
return ds.get_device_ptr(ipaddress)->close_audio_channel();
}
};

wulixue
Posts: 8
Joined: Sun Dec 20, 2015 6:26 pm

Re: publish/subscribe has a bug --code of subscriper

Post by wulixue »

#pragma once
#include <IDataPush.hpp>
#include "DataService.h"
class DataPushImpl
{
public:
/**************************实现IDataPush接口*************************/
//高清数据推送API
void push_hd_video_data(RCF::ByteBuffer &buffer)
{
hdVideoDataRecerver(buffer.getPtr(), buffer.getLength());
};
//标清数据推送API
void push_sd_video_data(RCF::ByteBuffer &buffer)
{
sdVideoDataRecerver(buffer.getPtr(), buffer.getLength());
};
//智能分析数据推送API
void push_alarm_data(int wX, int wY, int wWidth, int wHeight, int isAlarm)
{
alarmDataRecerver(wX, wY, wWidth, wHeight, isAlarm);
};

/**************************实现IDataPush接口*************************/
//高清视频接收器,供外部应用绑定回调
boost::function<void(const char*, const size_t)> hdVideoDataRecerver;
//标清视频接收器,供外部应用绑定回调
boost::function<void(const char*, const size_t)> sdVideoDataRecerver;
//告警数据接收器,供外部应用绑定回调
boost::function<void(int, int, int, int,int)> alarmDataRecerver;
//IP地址
std::string ipaddress = "";
//定义topic_name
std::string topic_name = "";
//配置IP地址
void setIPAddress(std::string &ipaddress)
{
this->ipaddress = ipaddress;
};

RCF::SubscriptionParms & getSubscriptionParms(SERVICE_TYPE service)
{
//topicName前缀
std::string topicName = "topic_name_";
//数据库服务
CDataService &dataService = singleton<CDataService>::get_mutable_instance();
//CMS服务器IP
std::string cms_server_ip;
//获取CMS服务器IP
dataService.getValue("cms_server_ip", cms_server_ip);

switch (service)
{
case SERVICE_HD_VIDEO:
topicName += ipaddress + "_hd_video";
hd_parms.setPublisherEndpoint(RCF::TcpEndpoint(cms_server_ip, 8898));
hd_parms.setOnSubscriptionDisconnect(boost::bind(&DataPushImpl::onHDSubscriptionDisconnected,this,_1));
hd_parms.setTopicName(topicName);
this->topic_name = topicName;
return hd_parms;
break;
case SERVICE_SD_VIDEO:
topicName += ipaddress + "_sd_video";
sd_parms.setPublisherEndpoint(RCF::TcpEndpoint(cms_server_ip, 8898));
sd_parms.setOnSubscriptionDisconnect(boost::bind(&DataPushImpl::onSDSubscriptionDisconnected, this, _1));
sd_parms.setTopicName(topicName);
this->topic_name = topicName;
return sd_parms;
break;
case SERVICE_ALARM_DATA:
topicName += ipaddress + "_alarm";
alarm_parms.setPublisherEndpoint(RCF::TcpEndpoint(cms_server_ip, 8898));
alarm_parms.setOnSubscriptionDisconnect(boost::bind(&DataPushImpl::onAlarmSubscriptionDisconnected, this, _1));
alarm_parms.setTopicName(topicName);
this->topic_name = topicName;
return alarm_parms;
break;
default:
return hd_parms;
break;
}
};

BOOL getHDSubscriptionConnected()
{
if (!hd_data_sub_ptr)
{
return FALSE;
}
else
{
return hd_data_sub_ptr->isConnected();
}
};

BOOL getSDSubscriptionConnected()
{
if (!sd_data_sub_ptr)
{
return FALSE;
}
else
{
return sd_data_sub_ptr->isConnected();
}
};

BOOL getAlarmSubscriptionConnected()
{
if (!alarm_data_sub_ptr)
{
return FALSE;
}
else
{
return alarm_data_sub_ptr->isConnected();
}
};

//HD视频Subscription指针
RCF::SubscriptionPtr hd_data_sub_ptr;
//SD视频Subscription指针
RCF::SubscriptionPtr sd_data_sub_ptr;
//Alarm Subscription指针
RCF::SubscriptionPtr alarm_data_sub_ptr;

private:


//HD视频Subscription参数
RCF::SubscriptionParms hd_parms;
//HD视频Subscription Disconnected回调
void onHDSubscriptionDisconnected(RCF::RcfSession &session)
{
hd_data_sub_ptr->close();
};

//SD视频Subscription参数
RCF::SubscriptionParms sd_parms;
//SD视频Subscription Disconnected回调
void onSDSubscriptionDisconnected(RCF::RcfSession &session)
{
sd_data_sub_ptr->close();
};

//Alarm Subscription参数
RCF::SubscriptionParms alarm_parms;
//Alarm Subscription Disconnected回调
void onAlarmSubscriptionDisconnected(RCF::RcfSession &session)
{
alarm_data_sub_ptr->close();
};
};

#pragma once
#include <RCF/RCF.hpp>
#include "DataPushImpl.h"

class CRCFNetClient
{
public:
CRCFNetClient();
virtual ~CRCFNetClient();

//数据库查询API
void query(const std::string &, std::vector<std::vector<std::string>> &);
//操作控制声音对讲通道,iCommand=1时,返回值为音频编码类型
//音频编码类型:0- G722,1- G711_U,2- G711_A,5- MP2L2,6- G726,7- AAC,8- PCM
int operate_talk_channel(const std::string &ipaddress, const int &iCommand);
//通过服务器转发语音到IPC
void forwarding_audio_data_to_ipc(const std::string &ipaddress, char *buf, const DWORD dwSize);
//云台控制API
void ipc_ptz_control(const std::string &, const int &, const int &);
////验证topic name是否存在
//BOOL is_exist_topic_name(const std::string &topicName);

//RCF INIT
RCF::RcfInitDeinit rcfInit;
//定义RCF SERVER
RCF::RcfServer server;
//订阅高清、标清、智能分析数据服务
BOOL subscriber(SERVICE_TYPE service, DataPushImpl &data_push_impl);
//获取个Topic Name的状态
BOOL getSubscriberConnected(char *topicName);
private:

//定义一个map
std::map<std::string, RCF::SubscriptionPtr> subscription_map;

std::string cms_server_ip;

void onSubscriptionDisconnected(RCF::RcfSession & session);
};

// RCFNetClient.cpp : 定义 DLL 应用程序的导出函数。
//
#include "stdafx.h"
#include "RCFNetClient.h"
#include "IServerOpration.hpp"

// 这是已导出类的构造函数。
CRCFNetClient::CRCFNetClient()
{
CDataService &dataService = singleton<CDataService>::get_mutable_instance();
std::string cms_server = "";
dataService.getValue("cms_server_ip", cms_server);
cms_server_ip = cms_server;
RcfClient<ISERVER_OPRATION_OF_SBZVMS> client(RCF::TcpEndpoint(cms_server, 8898));

//初始化网络环境
//rcfInit.isRootInstance();
//RCF::enableLogging();
RCF::setDefaultMaxMessageLength(1024 * 1024);
server.addEndpoint(RCF::TcpEndpoint(-1));
//设置线程池
RCF::ThreadPoolPtr threadPoolPtr(new RCF::ThreadPool(1, 30));
server.setThreadPool(threadPoolPtr);
//RCF::disableLogging();
//启动网络服务
server.start();
return;
}

// 这是已导出类的构造函数。
CRCFNetClient::~CRCFNetClient()
{
}

////验证topic name是否存在
//BOOL CRCFNetClient::is_exist_topic_name(const std::string &topicName)
//{
// BOOL isExist = FALSE;
// try
// {
// RcfClient<ISERVER_OPRATION_OF_SBZVMS> client(RCF::TcpEndpoint(CMSServerIPAddress, 8898));
// if (client.is_exist_topic_name(topicName))
// {
// isExist = TRUE;
// }
// }
// catch (const RCF::Exception & e)
// {
// TRACE(L"RcfClient<ISERVER_OPRATION_OF_SBZVMS> query exception:%S\n", e.getError().getErrorString());
// }
//
// return isExist;
//}

//数据库查询API
void CRCFNetClient::query(const std::string &sql, std::vector<std::vector<std::string>> &query_result)
{
try
{
RcfClient<ISERVER_OPRATION_OF_SBZVMS> client(RCF::TcpEndpoint(cms_server_ip, 8898));
client.query(sql, query_result);
}
catch (const RCF::Exception & e)
{
TRACE(L"RcfClient<ISERVER_OPRATION_OF_SBZVMS> query exception:%S", e.getError().getErrorString());
return;
}
}
//播放指定IP设备告警音频API
//音频编码类型:0- G722,1- G711_U,2- G711_A,5- MP2L2,6- G726,7- AAC,8- PCM
int CRCFNetClient::operate_talk_channel(const std::string &ipaddress, const int &iCommand)
{
try
{
RcfClient<ISERVER_OPRATION_OF_SBZVMS> client(RCF::TcpEndpoint(cms_server_ip, 8898));
return client.operate_talk_channel(ipaddress, iCommand);
}
catch (const RCF::Exception & e)
{
TRACE(L"RcfClient<ISERVER_OPRATION_OF_SBZVMS> play_alarm_audio exception:%S", e.getError().getErrorString());
return 0xFF;
}
}

//通过服务器转发语音到IPC
void CRCFNetClient::forwarding_audio_data_to_ipc(const std::string &ipaddress, char *buf, DWORD dwSize)
{
try
{
RcfClient<ISERVER_OPRATION_OF_SBZVMS> client(RCF::TcpEndpoint(cms_server_ip, 8898));
RCF::ByteBuffer byteBuffer((char*)buf, (size_t)dwSize);
client.forwarding_audio_data(byteBuffer,ipaddress);
}
catch (const RCF::Exception & e)
{
TRACE(L"RcfClient<ISERVER_OPRATION_OF_SBZVMS> play_alarm_audio exception:%S", e.getError().getErrorString());
return;
}
}
//云台控制API
void CRCFNetClient::ipc_ptz_control(const std::string &ipaddress, const int &lcommand, const int &speed)
{
try
{
RcfClient<ISERVER_OPRATION_OF_SBZVMS> client(RCF::TcpEndpoint(cms_server_ip, 8898));
client.ipc_ptz_control(ipaddress, lcommand, speed);
}
catch (const RCF::Exception & e)
{
TRACE(L"RcfClient<ISERVER_OPRATION_OF_SBZVMS> ipc_ptz_control exception:%S", e.getError().getErrorString());
return;
}
}


//订阅高清、标清、智能分析数据服务
BOOL CRCFNetClient::subscriber(SERVICE_TYPE service,DataPushImpl &data_push_impl)
{
BOOL isConnected = FALSE;
RCF::SubscriptionParms &parms = data_push_impl.getSubscriptionParms(service);
switch (service)
{
case SERVICE_HD_VIDEO:
try
{
data_push_impl.hd_data_sub_ptr = server.createSubscription<IDATAPUSH_OF_SBZVMS>(data_push_impl, parms);
}
catch (const RCF::Exception & e)
{
TRACE(L"createSubscription exception:%S, topicName:%S\n", e.getError().getErrorString().c_str(), data_push_impl.topic_name);
return isConnected;
}
isConnected = data_push_impl.hd_data_sub_ptr->isConnected();

/*if (is_exist_topic_name(data_push_impl.topic_name))
{

} */
break;
case SERVICE_SD_VIDEO:
/*if (is_exist_topic_name(data_push_impl.topic_name))
{

} */

try
{
data_push_impl.sd_data_sub_ptr = server.createSubscription<IDATAPUSH_OF_SBZVMS>(data_push_impl, parms);
}
catch (const RCF::Exception & e)
{
TRACE(L"createSubscription exception:%S, topicName:%S\n", e.getError().getErrorString().c_str(), data_push_impl.topic_name);
return isConnected;
}

isConnected = data_push_impl.sd_data_sub_ptr->isConnected();

break;
case SERVICE_ALARM_DATA:
/*if (is_exist_topic_name(data_push_impl.topic_name))
{

}*/

try
{
data_push_impl.alarm_data_sub_ptr = server.createSubscription<IDATAPUSH_OF_SBZVMS>(data_push_impl, parms);
}
catch (const RCF::Exception & e)
{
TRACE(L"createSubscription exception:%S, topicName:%S\n", e.getError().getErrorString().c_str(), data_push_impl.topic_name);
return isConnected;
}
isConnected = data_push_impl.alarm_data_sub_ptr->isConnected();
break;
default:
break;
}

return isConnected;

////公共参数
//Common &common = (Common)boost::serialization::singleton<Common>::get_const_instance();
////整体通讯配置参数
//RCF::SubscriptionParms parms;
//parms.setPublisherEndpoint(RCF::TcpEndpoint(common.getCMSServerIPAddress(), 8898));
////parms.setOnSubscriptionDisconnect(boost::bind(&DataPushImpl::onSubscriptionDisconnectedForAlarm, this, _1));
////topic_name定义
//char topic_name[50];

////Subscription HD_DATA_SERVICE
//sprintf_s(topic_name, 50, "topic_name_%s_hd_video", ipaddress.c_str());
//parms.setTopicName(string(topic_name));
//parms.setOnSubscriptionDisconnect(boost::bind(&CRCFNetClient::onSubscriptionDisconnected,this,_1));
//RCF::SubscriptionPtr hd_data_sub_ptr;//Subscription指针
//try
//{
// hd_data_sub_ptr = server.createSubscription<IDATAPUSH_OF_SBZVMS>(data_push_impl, parms);

// hd_data_sub_ptr->
//}
//catch (const RCF::Exception & e)
//{
// TRACE(L"createSubscription exception:%S, topicName:%S\n", e.getError().getErrorString().c_str(), topic_name);
// return;
//}
//subscription_map[string(topic_name)] = hd_data_sub_ptr;

////Subscription SD_DATA_SERVICE
//sprintf_s(topic_name, 50, "topic_name_%s_sd_video", ipaddress.c_str());
//parms.setTopicName(string(topic_name));
//RCF::SubscriptionPtr sd_data_sub_ptr;//Subscription指针
//try
//{
// sd_data_sub_ptr = server.createSubscription<IDATAPUSH_OF_SBZVMS>(data_push_impl, parms);
//}
//catch (const RCF::Exception & e)
//{
// TRACE(L"createSubscription exception:%S, topicName:%S\n", e.getError().getErrorString().c_str(), topic_name);
// return;
//}
//subscription_map[string(topic_name)] = sd_data_sub_ptr;

////Subscription ALARM_DATA_SERVICE
//sprintf_s(topic_name, 50, "topic_name_%s_alarm", ipaddress.c_str());
//parms.setTopicName(string(topic_name));
//RCF::SubscriptionPtr alarm_data_sub_ptr;//Subscription指针
//try
//{
// alarm_data_sub_ptr = server.createSubscription<IDATAPUSH_OF_SBZVMS>(data_push_impl, parms);
//}
//catch (const RCF::Exception & e)
//{
// TRACE(L"createSubscription exception:%S, topicName:%S\n", e.getError().getErrorString().c_str(), topic_name);
// return;
//}
//subscription_map[string(topic_name)] = alarm_data_sub_ptr;
}

//获取个Topic Name的状态
BOOL CRCFNetClient::getSubscriberConnected(char *topicName)
{
if (subscription_map.find(string(topicName)) != subscription_map.end())
{
return subscription_map.find(string(topicName))->second->isConnected();
}
else
{
return FALSE;
}
}

void CRCFNetClient::onSubscriptionDisconnected(RCF::RcfSession &session)
{
session.disconnect();
}

wulixue
Posts: 8
Joined: Sun Dec 20, 2015 6:26 pm

Re: publish/subscribe has a bug

Post by wulixue »

Start running normal, may be a problem after 30 minutes usually , random, uncertain in 30 minutes to 3 days. using memory analysis tools ,result no memory leak

Post Reply