Files
ict/client/ewsc/net/netlib/regstream_withname.cpp
2026-01-23 08:57:55 +08:00

409 lines
7.8 KiB
C++

// RegStreamWithName.cxx: implementation of the RegStreamWithName class.
//
//////////////////////////////////////////////////////////////////////
#if defined(_UNIX_)
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#endif
#include "netapi.h"
#include "regstream_withname.h"
#ifdef WIN32
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
#endif
void RegStreamWithNameCheckThread(void *arg)
{
RegStreamWithName *srv = (RegStreamWithName*)arg;
while(TRUE)
{
srv->CheckLink();
_delay_time(1000*50);
}
}
//接收数据线程
void RegStreamWithNameRecvProcThread(void *arg)
{
#if defined(_UNIX_)
int oldtype,oldstate;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&oldtype);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,&oldstate);
#endif
RegStreamWithName *srv = (RegStreamWithName*)arg;
srv->RecvLoop();
}
RegStreamWithName::RegStreamWithName():Tcpip()
{
#if defined(WIN32)
AFX_MANAGE_STATE(AfxGetStaticModuleState());
#endif
m_recvThread = 0;
m_checkThread = 0;
m_bLinkOK = FALSE;
m_szServer[0] = 0;
m_nPort = 0;
// add by wangbin [4/24/2009]
m_pParent = NULL;
}
RegStreamWithName::RegStreamWithName(reg_type_withname & regtype,void * parent):Tcpip()
{
#if defined(WIN32)
AFX_MANAGE_STATE(AfxGetStaticModuleState());
#endif
m_recvThread = 0;
m_checkThread = 0;
m_bLinkOK = FALSE;
m_regtype = regtype;
m_szServer[0] = 0;
// add by wangbin [4/24/2009]
m_pParent = parent;
}
RegStreamWithName::RegStreamWithName(const char *name,reg_type_withname & regtype,int nPort,void * parent):Tcpip()
{
#if defined(WIN32)
AFX_MANAGE_STATE(AfxGetStaticModuleState());
#endif
m_recvThread = 0;
m_checkThread = 0;
m_bLinkOK = FALSE;
m_regtype = regtype;
if(name!=NULL)
strcpy(m_szServer,name);
else
strcpy(m_szServer,"");
m_nPort = nPort;
// add by wangbin [4/24/2009]
m_pParent = parent;
}
RegStreamWithName::RegStreamWithName(reg_type_withname &regtype,const char * szServName,void * parent):Tcpip()
{
m_regtype = regtype;
strcpy(m_oServPara.m_szServName,szServName);
m_nPort = 0;
m_szServer[0] = 0;
m_bLinkOK = FALSE;
}
RegStreamWithName::RegStreamWithName(const char *srv_name,const int nAppNo,const int nCtxNo,reg_type_withname & regtype,void * parent):Tcpip()
{
#if defined(WIN32)
AFX_MANAGE_STATE(AfxGetStaticModuleState());
#endif
m_recvThread = 0;
m_checkThread = 0;
m_bLinkOK = FALSE;
m_regtype = regtype;
m_szServer[0] = 0;
m_nPort = 0;
strcpy(m_oServPara.m_szServName,srv_name);
m_oServPara.m_nAppNo = nAppNo;
m_oServPara.m_nCtxNo = nCtxNo;
// add by wangbin [4/24/2009]
m_pParent = parent;
}
//析构函数,如果RegStreamWithName队列中已空,则终止监视线程并删除队列指针
RegStreamWithName::~RegStreamWithName()
{
TRACE("begin ~RegStreamWithName\n");
UnRegister();
TRACE("end ~RegStreamWithName\n");
killthread(m_checkThread);
killthread(m_recvThread);
TRACE("begin ~Close\n");
Close();
TRACE("end ~RegStreamWithName\n");
}
//重新设置服务器名
void RegStreamWithName::operator = (char *name)
{
#if defined(WIN32)
AFX_MANAGE_STATE(AfxGetStaticModuleState());
#endif
strcpy(m_szServer,name);
Reset();
}
void RegStreamWithName::Reset()
{
printf("链路复位\n");
m_bLinkOK = FALSE;
Close();
}
int RegStreamWithName::UnRegister()
{
txhead.cmdtype = m_regtype.cmdtype;
txhead.len = 0;
txhead.userdata1 = m_regtype.userdata1;
txhead.userdata2 = 0;//=0:取消注册
txhead.src.n_reg_type = 0;
char val;
return TxStream(&txhead,&val);
}
void RegStreamWithName::CheckLink()
{
m_bLinkOK = LinkSrv();
}
bool RegStreamWithName::TryLinkSrv()
{
m_bLinkOK = LinkSrv();
return m_bLinkOK;
}
//检查并维护连接,
bool RegStreamWithName::LinkSrv()
{
/*
if(m_szServer[0]==0)
return FALSE;
*/
if( m_bLinkOK )
return TRUE;
Reset();
printf("begin LinkTo %s %d\n",m_szServer,m_nPort);
if(!LinkByPara(m_oServPara))
{
printf( "连接 %s错误",m_szServer);
Close();
return FALSE;
}
XByteArray ba;
ba.Reset();
strcpy(txhead.src.regname,m_regtype.cmdtypename);
txhead.userdata1 = m_regtype.userdata1;
txhead.userdata2 = m_regtype.userdata2;
txhead.src.n_reg_type = 1;
if(!TxHead(&txhead))
{
printf("发送报文头失败\n");
Close();
return FALSE;
}
else
{
printf("发送报文头成功\n");
}
if(m_regtype.cmdtypename[0] != 0)
{
// add by wangbin 将名称 写回服务器... [4/24/2009]
if(!TxData(&ba,ba.m_nLength))
{
printf("发送报文数据失败\n");
Close();
return FALSE;
}
else
{
printf("发送报文数据成功\n");
}
// end add [4/24/2009]
}
return TRUE;
}
void RegStreamWithName::RecvLoop(void)
{
fd_set rd;
struct timeval tv;
void* pBuffer = NULL;
while(TRUE)
{
tv.tv_sec = 0;
tv.tv_usec = 10;
if(!m_bLinkOK)
{
printf("等待建立流连接...\n");
_delay_time(1000*100);
continue;
}
FD_ZERO(&rd);
if(socket_id<0) continue;
FD_SET(socket_id,&rd);
int nb = select(socket_id+1,&rd,0,0,&tv);
if(nb<0)
{
int err=_get_last_error();
if(err==EINTR || err==EWOULDBLOCK
#if !defined(WIN32)
|| err==EAGAIN
#endif
)
continue;
Reset();
continue;
}
else if(nb==0)
continue;
if(!FD_ISSET(socket_id,&rd))
continue;
memset(&rxhead,0,sizeof(rxhead));
if(!RxHead(&rxhead))
{
PutDbg(NET_DBGTYPE_STREAM," RecvLoop RxHead fail");
Reset();
continue;
}
pBuffer = NULL;
if(rxhead.len>0)
{
if(!RxData(ResizeMem(rxhead.len),rxhead.len))
{
PutDbg(NET_DBGTYPE_STREAM," RecvLoop RxData fail");
Reset();
continue;
}
pBuffer = GetMemPtr();
}
if( m_pProcFunc!=NULL )
m_pProcFunc(this,&rxhead,pBuffer);
}//while
}
void RegStreamWithName::MakeStream()
{
Reset();
txhead.cmdtype = m_regtype.cmdtype;
txhead.src.user[0] = 0;
txhead.userdata1 = m_regtype.userdata1;
txhead.userdata2 = 1;//=1:注册
txhead.src.n_reg_type = 1;
txhead.len = 0;
if( (m_checkThread=beginthread( (X_PROC) (RegStreamWithNameCheckThread),0,this))==0)
{
printf("线程生成失败\n");
}
else
printf( "线程生成成功\n");
if( (m_recvThread=beginthread( (X_PROC) (RegStreamWithNameRecvProcThread),0,this))==0)
{
printf("线程生成失败\n");
}
else
printf( "线程生成成功\n");
}
#if 0
//////////////////////////////////////////////////////////////////////////
// 支持注册多种类型数据的RegStrea
// 汤磊 2006/11/10
RegStreamWithNameMulti::RegStreamWithNameMulti(char *name,reg_type_withname regtype,int nPort)
: RegStreamWithName(name,regtype,nPort)
{
AddRegType(regtype);
}
RegStreamWithNameMulti::~RegStreamWithNameMulti()
{
for( int i=0;i<m_arrRegType.GetSize();i++ )
{
reg_type_withname* pp = (reg_type_withname*)m_arrRegType.GetAt(i);
delete pp;
}
}
bool RegStreamWithNameMulti::LinkSrv()
{
/*
if(m_szServer[0]==0)
return FALSE;
*/
if( m_bLinkOK )
return TRUE;
Reset();
if(!LinkTo(m_szServer,m_nPort))
{
printf( "连接 %s错误\n",m_szServer);
Close();
return FALSE;
}
for( int i=0;i<m_arrRegType.GetSize();i++ )
{
reg_type_withname* pp = (reg_type_withname*)m_arrRegType.GetAt(i);
txhead.cmdtype = pp->cmdtype;
txhead.src.user[0] = 0;
txhead.userdata1 = pp->userdata1;
txhead.userdata2 = 1;//=1:注册
txhead.src.n_reg_type = 1;
txhead.len = 0;
if(!TxHead(&txhead))
{
printf("发送报文头失败\n");
Close();
return FALSE;
}
else
printf("发送报文头成功\n");
Sleep(10);
}
return TRUE;
}
void RegStreamWithNameMulti::AddRegType(reg_type_withname regtype)
{
reg_type_withname* pp= new reg_type_withname;
*pp=regtype;
m_arrRegType.Add(pp);
}
int RegStreamWithNameMulti::UnRegister()
{
int i;
for( i=0;i<m_arrRegType.GetSize();i++ )
{
reg_type_withname* pp = (reg_type_withname*)m_arrRegType.GetAt(i);
txhead.cmdtype = pp->cmdtype;
txhead.src.user[0] = 0;
txhead.userdata1 = pp->userdata1;
txhead.userdata2 = 0;//=0:撤消
txhead.src.n_reg_type = 0;
txhead.len = 0;
if(!TxHead(&txhead))
{
printf("发送报文头失败");
return 0;
}
// Sleep(10);
}
return i;
}
#endif