Files
ict/client/ewsc/net/netlib/regstream_d5k.cpp
2026-01-23 08:57:55 +08:00

269 lines
5.5 KiB
C++

// CD5KMsgRegStream.cxx: implementation of the CD5KMsgRegStream class.
//
//////////////////////////////////////////////////////////////////////
#ifdef TH_WITH_D5K
#if defined(_UNIX_)
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#endif
#include "compat_process.h"
#include "compat_thread.h"
#include "netapi_d5k.h"
#ifdef WIN32
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
#endif
void CD5KMsgRegStreamCheckThread(void *arg)
{
CD5KMsgRegStream *srv = (CD5KMsgRegStream*)arg;
while(TRUE)
{
srv->CheckLink();
_delay_time(1000*50);
}
}
//接收数据线程
void CD5KMsgRegStreamRecvProcThread(void *arg)
{
#if defined(_UNIX_)
int oldtype,oldstate;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&oldtype);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,&oldstate);
#endif
CD5KMsgRegStream *srv = (CD5KMsgRegStream*)arg;
srv->RecvLoop();
}
CD5KMsgRegStream::CD5KMsgRegStream(const char *name,reg_type regtype,int nPort)
{
m_recvThread = 0;
m_checkThread = 0;
m_bLinkOK = FALSE;
m_regtype = regtype;
}
//析构函数,如果CD5KMsgRegStream队列中已空,则终止监视线程并删除队列指针
CD5KMsgRegStream::~CD5KMsgRegStream()
{
TRACE("begin ~CD5KMsgRegStream\n");
UnRegister();
TRACE("end ~CD5KMsgRegStream\n");
killthread(m_checkThread);
killthread(m_recvThread);
TRACE("begin ~Close\n");
Close();
TRACE("end ~CD5KMsgRegStream\n");
}
void CD5KMsgRegStream::Reset()
{
printf("链路复位\n");
m_bLinkOK = FALSE;
Close();
}
void CD5KMsgRegStream::Close()
{
if(m_nProcKey > 0)
{
m_oMsgBus.messageExit(m_nProcKey);
}
}
int CD5KMsgRegStream::UnRegister()
{
/*
txhead.cmdtype = m_regtype.cmdtype;
txhead.len = 0;
txhead.userdata1 = m_regtype.userdata1;
txhead.userdata2 = 0;//=0:取消注册
char val;
return TxStream(&txhead,&val);
*/
m_oMsgBus.messageUnSubscribe(m_nSetID,m_szCtxName);
}
void CD5KMsgRegStream::CheckLink()
{
m_bLinkOK = LinkSrv();
}
BOOL CD5KMsgRegStream::TryLinkSrv()
{
m_bLinkOK = LinkSrv();
return m_bLinkOK;
}
/*
void CD5KMsgRegStream::LinkTo(const char * szCtxName,int set_id,int event_id)
{
}
*/
//检查并维护连接,
BOOL CD5KMsgRegStream::LinkSrv()
{
if( m_bLinkOK )
{
return TRUE;
}
Reset();
pid_t nCurrPID = getprocessid();
thread_t nCurrSID = getthreadid();
char szName[64];
time_t t_time = time(NULL);
sprintf(szName,"%d_%d_%d_%d",(int)nCurrPID,rand()%1000,t_time%10,nCurrSID%100000);
szName[19]= 0;
m_nProcKey = m_oMsgBus.messageInit("realtime","avc",szName);
if(m_nProcKey <=0)
{
return FALSE;
}
printf("begin message subscribe : %d \n",m_nSetID);
int ret_code = m_oMsgBus.messageSubscribe(m_nSetID,"realtime");
printf("end subscribe: %d\n",ret_code);
if(ret_code < 0)
{
m_bLinkOK = FALSE;
return FALSE;
}
m_bLinkOK = TRUE;
return TRUE;
}
void CD5KMsgRegStream::RecvLoop(void)
{
CD5KMsgHeaderInfo oHeaderInfo;
BYTE * pBuffer = NULL;
int ret_code;
Message oMsg; //消息
NET_HEAD rxhead;
int nHeaderLen = sizeof(oHeaderInfo);
int nTotalDataLen;
int nCurrPacketNum = 0;
int nDataLen = sizeof(oMsg.Msg_buf) - nHeaderLen;
int nRandCode = 0;
char * pRealData = NULL;
while(TRUE)
{
ret_code = m_oMsgBus.messageReceive(&oMsg);
if(oMsg.header.serv != m_nSetID)
{
continue;
}
if(oMsg.header.event != m_nEventID)
{
continue;
}
if(oMsg.header.len < nHeaderLen)
{
printf("error msg : %d %d\n",oMsg.header.len,nHeaderLen);
continue;
}
memcpy(&oHeaderInfo,&(oMsg.Msg_buf),nHeaderLen);
if(pBuffer != NULL)
{
if(nRandCode != oHeaderInfo.m_nRandCode)
{ //存在非法包,则立刻停止上一包的接收,转入下一包的接收!
printf("存在非法包,则立刻停止上一包的接收,转入下一包的接收: %d -> %d\n",nRandCode,
(int)oHeaderInfo.m_nRandCode);
delete [] pBuffer;
pBuffer = NULL;
}
pRealData = oMsg.Msg_buf + nHeaderLen;
}
if(pBuffer == NULL)
{ //新包来了.
nTotalDataLen = (int)oHeaderInfo.m_nTotalLen;
pBuffer = new BYTE[nTotalDataLen];
nRandCode = oHeaderInfo.m_nRandCode;
nCurrPacketNum = 0;
pRealData = oMsg.Msg_buf + nHeaderLen;
printf("New Packet is comming %d %d %d\n",nTotalDataLen,(int)nRandCode,(int)oHeaderInfo.m_nCurrPos);
}
if(oHeaderInfo.m_nCurrPos == 0)
{
memcpy(&rxhead,pRealData,sizeof(rxhead));
pRealData += sizeof(rxhead);
}
nCurrPacketNum ++;
BYTE * pCurrBuffer = pBuffer+(int)oHeaderInfo.m_nCurrPos;
int nRealDataLen = nTotalDataLen - (int)oHeaderInfo.m_nCurrPos;
if(nRealDataLen > nDataLen)
{
nRealDataLen = nDataLen;
}
memcpy(pCurrBuffer,pRealData,nRealDataLen);
if(oHeaderInfo.m_nTotalPacket == nCurrPacketNum)
{
printf("%d %d begin %x\n",(int)oHeaderInfo.m_nTotalPacket,nCurrPacketNum,m_pProcFunc);
if( m_pProcFunc!=NULL )
{
m_pProcFunc(this,&rxhead,pBuffer);
}
delete [] pBuffer;
pBuffer = NULL;
}
}//while
}
void CD5KMsgRegStream::MakeStream()
{
Reset();
/*
txhead.cmdtype = m_regtype.cmdtype;
txhead.src.user[0] = 0;
txhead.userdata1 = m_regtype.userdata1;
txhead.userdata2 = 1;//=1:注册
txhead.len = 0;
*/
m_nSetID = m_regtype.cmdtype;
m_nEventID = m_regtype.userdata1 * 256 + m_regtype.userdata2;
printf("set_id = %d event_id = %d\n",m_nSetID,m_nEventID);
if( (m_checkThread=beginthread( (X_PROC) (CD5KMsgRegStreamCheckThread),0,this))==0)
{
printf("线程生成失败\n");
}
else
{
printf( "线程生成成功\n");
}
sleep(1);
// CD5KMsgRegStreamRecvProcThread(this);
if( (m_recvThread=beginthread( (X_PROC) (CD5KMsgRegStreamRecvProcThread),0,this))==0)
{
printf("线程生成失败\n");
}
else
{
printf( "线程生成成功\n");
}
}
#endif