269 lines
5.5 KiB
C++
269 lines
5.5 KiB
C++
// CD5KMsgRegStream.cxx: implementation of the CD5KMsgRegStream class.
|
|
//
|
|
//////////////////////////////////////////////////////////////////////
|
|
#ifdef TH_WITH_D5K
|
|
|
|
#if defined(_UNIX_)
|
|
#include <signal.h>
|
|
#include <unistd.h>
|
|
#include <pthread.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#endif
|
|
|
|
#include "compat_process.h"
|
|
#include "compat_thread.h"
|
|
#include "netapi_d5k.h"
|
|
|
|
#ifdef WIN32
|
|
#ifdef _DEBUG
|
|
#undef THIS_FILE
|
|
static char THIS_FILE[]=__FILE__;
|
|
#define new DEBUG_NEW
|
|
#endif
|
|
#endif
|
|
|
|
void CD5KMsgRegStreamCheckThread(void *arg)
|
|
{
|
|
CD5KMsgRegStream *srv = (CD5KMsgRegStream*)arg;
|
|
while(TRUE)
|
|
{
|
|
srv->CheckLink();
|
|
_delay_time(1000*50);
|
|
}
|
|
}
|
|
|
|
//接收数据线程
|
|
void CD5KMsgRegStreamRecvProcThread(void *arg)
|
|
{
|
|
#if defined(_UNIX_)
|
|
int oldtype,oldstate;
|
|
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&oldtype);
|
|
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,&oldstate);
|
|
#endif
|
|
CD5KMsgRegStream *srv = (CD5KMsgRegStream*)arg;
|
|
srv->RecvLoop();
|
|
}
|
|
|
|
CD5KMsgRegStream::CD5KMsgRegStream(const char *name,reg_type regtype,int nPort)
|
|
{
|
|
m_recvThread = 0;
|
|
m_checkThread = 0;
|
|
m_bLinkOK = FALSE;
|
|
m_regtype = regtype;
|
|
}
|
|
|
|
//析构函数,如果CD5KMsgRegStream队列中已空,则终止监视线程并删除队列指针
|
|
CD5KMsgRegStream::~CD5KMsgRegStream()
|
|
{
|
|
TRACE("begin ~CD5KMsgRegStream\n");
|
|
UnRegister();
|
|
TRACE("end ~CD5KMsgRegStream\n");
|
|
|
|
killthread(m_checkThread);
|
|
killthread(m_recvThread);
|
|
TRACE("begin ~Close\n");
|
|
Close();
|
|
TRACE("end ~CD5KMsgRegStream\n");
|
|
}
|
|
|
|
void CD5KMsgRegStream::Reset()
|
|
{
|
|
printf("链路复位\n");
|
|
m_bLinkOK = FALSE;
|
|
Close();
|
|
}
|
|
void CD5KMsgRegStream::Close()
|
|
{
|
|
if(m_nProcKey > 0)
|
|
{
|
|
m_oMsgBus.messageExit(m_nProcKey);
|
|
}
|
|
}
|
|
int CD5KMsgRegStream::UnRegister()
|
|
{
|
|
/*
|
|
txhead.cmdtype = m_regtype.cmdtype;
|
|
txhead.len = 0;
|
|
txhead.userdata1 = m_regtype.userdata1;
|
|
txhead.userdata2 = 0;//=0:取消注册
|
|
char val;
|
|
return TxStream(&txhead,&val);
|
|
*/
|
|
m_oMsgBus.messageUnSubscribe(m_nSetID,m_szCtxName);
|
|
}
|
|
|
|
void CD5KMsgRegStream::CheckLink()
|
|
{
|
|
m_bLinkOK = LinkSrv();
|
|
}
|
|
|
|
BOOL CD5KMsgRegStream::TryLinkSrv()
|
|
{
|
|
m_bLinkOK = LinkSrv();
|
|
return m_bLinkOK;
|
|
}
|
|
|
|
/*
|
|
void CD5KMsgRegStream::LinkTo(const char * szCtxName,int set_id,int event_id)
|
|
{
|
|
|
|
}
|
|
*/
|
|
//检查并维护连接,
|
|
BOOL CD5KMsgRegStream::LinkSrv()
|
|
{
|
|
if( m_bLinkOK )
|
|
{
|
|
return TRUE;
|
|
}
|
|
|
|
Reset();
|
|
|
|
pid_t nCurrPID = getprocessid();
|
|
thread_t nCurrSID = getthreadid();
|
|
char szName[64];
|
|
time_t t_time = time(NULL);
|
|
|
|
sprintf(szName,"%d_%d_%d_%d",(int)nCurrPID,rand()%1000,t_time%10,nCurrSID%100000);
|
|
szName[19]= 0;
|
|
m_nProcKey = m_oMsgBus.messageInit("realtime","avc",szName);
|
|
if(m_nProcKey <=0)
|
|
{
|
|
return FALSE;
|
|
}
|
|
printf("begin message subscribe : %d \n",m_nSetID);
|
|
int ret_code = m_oMsgBus.messageSubscribe(m_nSetID,"realtime");
|
|
printf("end subscribe: %d\n",ret_code);
|
|
if(ret_code < 0)
|
|
{
|
|
m_bLinkOK = FALSE;
|
|
return FALSE;
|
|
}
|
|
m_bLinkOK = TRUE;
|
|
return TRUE;
|
|
}
|
|
void CD5KMsgRegStream::RecvLoop(void)
|
|
{
|
|
CD5KMsgHeaderInfo oHeaderInfo;
|
|
BYTE * pBuffer = NULL;
|
|
|
|
int ret_code;
|
|
Message oMsg; //消息
|
|
NET_HEAD rxhead;
|
|
int nHeaderLen = sizeof(oHeaderInfo);
|
|
int nTotalDataLen;
|
|
int nCurrPacketNum = 0;
|
|
int nDataLen = sizeof(oMsg.Msg_buf) - nHeaderLen;
|
|
int nRandCode = 0;
|
|
char * pRealData = NULL;
|
|
|
|
while(TRUE)
|
|
{
|
|
ret_code = m_oMsgBus.messageReceive(&oMsg);
|
|
|
|
if(oMsg.header.serv != m_nSetID)
|
|
{
|
|
continue;
|
|
}
|
|
if(oMsg.header.event != m_nEventID)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
if(oMsg.header.len < nHeaderLen)
|
|
{
|
|
printf("error msg : %d %d\n",oMsg.header.len,nHeaderLen);
|
|
continue;
|
|
}
|
|
memcpy(&oHeaderInfo,&(oMsg.Msg_buf),nHeaderLen);
|
|
|
|
|
|
if(pBuffer != NULL)
|
|
{
|
|
if(nRandCode != oHeaderInfo.m_nRandCode)
|
|
{ //存在非法包,则立刻停止上一包的接收,转入下一包的接收!
|
|
printf("存在非法包,则立刻停止上一包的接收,转入下一包的接收: %d -> %d\n",nRandCode,
|
|
(int)oHeaderInfo.m_nRandCode);
|
|
delete [] pBuffer;
|
|
pBuffer = NULL;
|
|
}
|
|
pRealData = oMsg.Msg_buf + nHeaderLen;
|
|
}
|
|
|
|
if(pBuffer == NULL)
|
|
{ //新包来了.
|
|
nTotalDataLen = (int)oHeaderInfo.m_nTotalLen;
|
|
pBuffer = new BYTE[nTotalDataLen];
|
|
nRandCode = oHeaderInfo.m_nRandCode;
|
|
nCurrPacketNum = 0;
|
|
pRealData = oMsg.Msg_buf + nHeaderLen;
|
|
printf("New Packet is comming %d %d %d\n",nTotalDataLen,(int)nRandCode,(int)oHeaderInfo.m_nCurrPos);
|
|
}
|
|
if(oHeaderInfo.m_nCurrPos == 0)
|
|
{
|
|
memcpy(&rxhead,pRealData,sizeof(rxhead));
|
|
pRealData += sizeof(rxhead);
|
|
}
|
|
nCurrPacketNum ++;
|
|
|
|
BYTE * pCurrBuffer = pBuffer+(int)oHeaderInfo.m_nCurrPos;
|
|
int nRealDataLen = nTotalDataLen - (int)oHeaderInfo.m_nCurrPos;
|
|
if(nRealDataLen > nDataLen)
|
|
{
|
|
nRealDataLen = nDataLen;
|
|
}
|
|
memcpy(pCurrBuffer,pRealData,nRealDataLen);
|
|
|
|
if(oHeaderInfo.m_nTotalPacket == nCurrPacketNum)
|
|
{
|
|
printf("%d %d begin %x\n",(int)oHeaderInfo.m_nTotalPacket,nCurrPacketNum,m_pProcFunc);
|
|
if( m_pProcFunc!=NULL )
|
|
{
|
|
m_pProcFunc(this,&rxhead,pBuffer);
|
|
}
|
|
delete [] pBuffer;
|
|
pBuffer = NULL;
|
|
}
|
|
}//while
|
|
}
|
|
|
|
void CD5KMsgRegStream::MakeStream()
|
|
{
|
|
Reset();
|
|
/*
|
|
txhead.cmdtype = m_regtype.cmdtype;
|
|
txhead.src.user[0] = 0;
|
|
txhead.userdata1 = m_regtype.userdata1;
|
|
txhead.userdata2 = 1;//=1:注册
|
|
txhead.len = 0;
|
|
*/
|
|
|
|
m_nSetID = m_regtype.cmdtype;
|
|
m_nEventID = m_regtype.userdata1 * 256 + m_regtype.userdata2;
|
|
printf("set_id = %d event_id = %d\n",m_nSetID,m_nEventID);
|
|
|
|
|
|
if( (m_checkThread=beginthread( (X_PROC) (CD5KMsgRegStreamCheckThread),0,this))==0)
|
|
{
|
|
printf("线程生成失败\n");
|
|
}
|
|
else
|
|
{
|
|
printf( "线程生成成功\n");
|
|
}
|
|
sleep(1);
|
|
// CD5KMsgRegStreamRecvProcThread(this);
|
|
if( (m_recvThread=beginthread( (X_PROC) (CD5KMsgRegStreamRecvProcThread),0,this))==0)
|
|
{
|
|
printf("线程生成失败\n");
|
|
}
|
|
else
|
|
{
|
|
printf( "线程生成成功\n");
|
|
}
|
|
}
|
|
|
|
#endif
|