Files
ict/client/ewsc/net/netlib/regstream_withname_d5k_unsyn.cpp
2026-01-23 08:57:55 +08:00

240 lines
4.9 KiB
C++

// CD5KNewServicesRegStream.cxx: implementation of the CD5KNewServicesRegStream class.
//
//////////////////////////////////////////////////////////////////////
#ifdef TH_WITH_D5K
#if defined(_UNIX_)
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#endif
#include "compat_process.h"
#include "compat_thread.h"
#include "netapi_d5k.h"
#ifdef WIN32
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
#endif
void CD5KNewServicesRegStreamCheckThread(void *arg)
{
CD5KNewServicesRegStream *srv = (CD5KNewServicesRegStream*)arg;
while(TRUE)
{
srv->CheckLink();
_delay_time(1000*50);
}
}
//接收数据线程
void CD5KNewServicesRegStreamRecvProcThread(void *arg)
{
CD5KNewServicesRegStream *srv = (CD5KNewServicesRegStream*)arg;
srv->RecvLoop();
}
CD5KNewServicesRegStream::CD5KNewServicesRegStream(reg_type_withname & regtype,void * parent)
{
m_recvThread = 0;
m_checkThread = 0;
m_bLinkOK = FALSE;
m_regtype = regtype;
m_pParent = parent;
}
//析构函数,如果CD5KNewServicesRegStream队列中已空,则终止监视线程并删除队列指针
CD5KNewServicesRegStream::~CD5KNewServicesRegStream()
{
TRACE("begin ~CD5KNewServicesRegStream\n");
UnRegister();
TRACE("end ~CD5KNewServicesRegStream\n");
killthread(m_checkThread);
killthread(m_recvThread);
TRACE("begin ~Close\n");
Close();
TRACE("end ~CD5KNewServicesRegStream\n");
}
void CD5KNewServicesRegStream::Reset()
{
printf("链路复位\n");
m_bLinkOK = FALSE;
Close();
}
void CD5KNewServicesRegStream::Close()
{
}
int CD5KNewServicesRegStream::UnRegister()
{
/*
txhead.cmdtype = m_regtype.cmdtype;
txhead.len = 0;
txhead.userdata1 = m_regtype.userdata1;
txhead.userdata2 = 0;//=0:取消注册
char val;
return TxStream(&txhead,&val);
*/
// m_oMsgBus.messageUnSubscribe(m_nSetID,m_szCtxName);
}
int CD5KNewServicesRegStream::DoCallBack(NET_HEAD * pHeader,char * pData)
{
if(m_pProcFunc == NULL)
{
return 0;
}
// printf(" %d %d\n",(int)pHeader->cmdtype,(int)pHeader->userdata1);
if(pHeader->cmdtype != m_regtype.cmdtype)
{
return 0;
}
if(pHeader->userdata1 != m_regtype.userdata1)
{
return 0;
}
m_pProcFunc(this,pHeader,pData);
return 1;
}
void CD5KNewServicesRegStream::CheckLink()
{
m_bLinkOK = LinkSrv();
}
BOOL CD5KNewServicesRegStream::TryLinkSrv()
{
m_bLinkOK = LinkSrv();
return m_bLinkOK;
}
/*
void CD5KNewServicesRegStream::LinkTo(const char * szCtxName,int set_id,int event_id)
{
}
*/
//检查并维护连接,
BOOL CD5KNewServicesRegStream::LinkSrv()
{
if( m_bLinkOK )
{
return TRUE;
}
Reset();
if(!LinkByPara(m_oServPara))
{
printf( "连接 %s错误\n",m_regtype.cmdtypename);
Close();
return FALSE;
}
printf( "连接 %s 成功 ,开始注册服务\n",m_regtype.cmdtypename);
NET_HEAD oHeader;
oHeader.cmdtype = m_regtype.cmdtype;
oHeader.userdata1 = m_regtype.userdata1;
oHeader.userdata2 = m_regtype.userdata2;
oHeader.len = 0;
FillNetHeadInfo(&oHeader);
XByteArray ba;
ba.Reset();
strcpy(oHeader.src.regname,m_regtype.cmdtypename);
oHeader.userdata1 = m_regtype.userdata1;
oHeader.userdata2 = m_regtype.userdata2;
oHeader.src.n_reg_type = 1;
int ret = serviceRequestAsync(m_oServiceInfo, (char *)&(oHeader), sizeof(NET_HEAD), 5, &m_oHandle);
if(0 > ret)
{
serviceRequestFree(m_oHandle);
printf("serviceRequestSync error : %s", _service_errdes);
return FALSE;
}
return TRUE;
}
void CD5KNewServicesRegStream::RecvLoop(void)
{
char *response_buffer;
int response_buffer_len;
int ret_code;
while(TRUE)
{
if(!m_bLinkOK)
{
Sleep(1000);
continue;
}
ret_code = serviceReqAsyncTest(m_oHandle, &response_buffer, &response_buffer_len);
if(0 > ret_code)
{
fprintf(stderr, "serviceReqAsyncTest error\n");
m_bLinkOK = FALSE;
Sleep(1000);
continue;
}
if(0 == ret_code)
{
printf("serviceReqAsyncTest, still waiting\n");
Sleep(2000);
continue;
}
else if(1 == ret_code)
{
printf("response buff len %d\n", response_buffer_len);
// int * pInt = (int *)response_buffer;
// nValue = *pInt;
// printf("nValue = %d !!!!!!!!!\n",nValue);
// nValue ++;
NET_HEAD oHead;
int nHeadLen = sizeof(oHead);
// printf("buflen:%d %d\n",response_buffer_len ,nHeadLen);
if(response_buffer_len >= nHeadLen)
{
memcpy(&oHead,response_buffer,nHeadLen);
DoCallBack(&oHead,(char *)response_buffer+nHeadLen);
}
serviceRequestFree(m_oHandle);
}
else if(2 == ret_code)
{
fprintf(stderr, "serviceReqAsyncTest error : %s",_service_errdes);
m_bLinkOK = FALSE;
continue;
}
}//while
}
void CD5KNewServicesRegStream::MakeStream()
{
Reset();
if( (m_checkThread=beginthread( (X_PROC) (CD5KNewServicesRegStreamCheckThread),0,this))==0)
{
printf("线程生成失败\n");
}
else
{
printf( "线程生成成功\n");
}
if( (m_checkThread=beginthread( (X_PROC) (CD5KNewServicesRegStreamRecvProcThread),0,this))==0)
{
printf("线程生成失败\n");
}
else
{
printf( "线程生成成功\n");
}
}
#endif