/* Procedures for inter processor */
/* communication for simulations */
/* P.J. Franaszczuk, JHU 2001 */
//#define DEB
#include "clust_cn.h"
extern unsigned short PORT; // for version with command line parameter port
extern char *buf;
int nproc=0; /* number of processors to use */
int this_proc=-1; /* number of the processor for this host */
char *this_ip="000.000.000.000"; /* IP of this machine */
char **proc_ip; /* table of ip addresses of processors in dot notation */
char **proc_name; /* table of names of processors */
char *this_name=NULL; /* name of this processor */
int *conn_sock=NULL; /* array of sockets descriptors */
int orig_sock=-1; /* socket for accept */
extern char *path; /* working path */
struct hostent *host; /* temp pointer needed by gethostip*/
struct CONN cn_all; /* all connections for this host */
struct CONN cn_inp; /* input conn.sockets fo host */
struct CONN cn_out; /* out conn.sockets for host */
int nfd; /* largest file descriptor to check in select */
fd_set fdr0; /* descriptor set for select */
int *buf_in; //was short
void close_all(int); // defined in netclust.c
void close_socks()
{
int i;
if(conn_sock!=NULL)
for(i=0;i<nproc;i++)
if(conn_sock[i]>-1)
{ /*short buf[2]={-1,0};
send(conn_sock[i],buf,2*sizeof(short),0); */
close(conn_sock[i]);
conn_sock[i]=-1;
}
if(orig_sock>-1){close(orig_sock); orig_sock=-1;}
fflush(stderr);
if(this_proc==0)
{
printf("\n Terminated \n");
fflush(stdout);
}
}
void init_path(char *argv0)
{
char *c, buf[400];
FILE* fil;
int pid;
this_name=NULL;
if((c=gethostip(this_name))==NULL)
{
char erbuf[100];
sprintf(erbuf, "gethostip failed for this");
print_err(ERROR,"init_path",erbuf,NULL,NULL);
}
this_ip=strdup(c);
this_name=strdup(host->h_name);
pid=getpid();
fil=fopen(strcat(strcpy(buf,"netclust"),".pid"),"w");
if(fil==NULL)
print_err(FERROR,"init_path","pid",buf,fil);
fprintf(fil,"%i",pid);
fclose(fil);
}
void print_err(enum error_flag f, char* procnam, char * text, char* fname,FILE*fil )
{
char buf[1000];
switch(f)
{
case INFO:
fprintf(stderr,"%s\t%s: %s\n",this_name,procnam,text);fflush(stderr);
return;
case DEBUG:
#ifdef DEB
// fprintf(stderr,"%s\tDEBUG:%s: %s\n",this_name,procnam,text);fflush(stderr);
#endif
return;
case WARNING:
fprintf(stderr,"%s\tWARNING:%s: %s\n",this_name,procnam,text);fflush(stderr);
return;
case ERROR:
fprintf(stderr,"%s\tERROR:%s: %s\n",this_name,procnam,text);
break;
case PERROR:
sprintf(buf,"%s\tERROR:%s: %s",this_name,procnam,text);
perror(buf);
break;
case FERROR:
if(fil==NULL)
fprintf(stderr,"%s\tERROR:%s: %s file: %s\n",this_name,procnam,text,fname);
else if(feof(fil))
fprintf(stderr,"%s\tEOF:%s: %s file: %s\n",this_name,procnam,text,fname);
else if(ferror(fil))
fprintf(stderr,"%s\tFERROR:%s: %s file: %s\n",this_name,procnam,text,fname);
else
fprintf(stderr,"%s\tERROR:%s: %s file: %s\n",this_name,procnam,text,fname);
break;;
}
// close_socks();exit(3);
close_all(-1);
return;
}
char* file_name(char *fname,char*name,char *ext)
{ char buf[400];
/* create name of file path is global */
strcat(strcpy(buf,path),name);
if(ext)sprintf(fname,"%s%s.%i",buf,ext,this_proc);
else
strcpy(fname,buf);
return fname;
}
int compint(const void * i, const void * j)
{ return (*(int*)i-*(int*)j);}
int read_conn(char *name)
{
/* reads file with connections CONNTAB_NAME
format of the file:
first line no of processors to use
next lines: n nc: c1 c2 c3
where: n proc no(starting with 0)
nc number of connections for this processor
c1, c2... c(nc) processors connected to this one
reads also nodes_table from file IPTAB_NAME
*/
FILE* fil;
int i,j,l,*k;
struct CONN *cn;
char ** ip, **in, fname[250],buf[100], *c;
char *procname="read_conn";
struct CONN *tab_conn;/* list of connection struct. for all processors */
/* read ip table */
strcat(strcpy(buf,name),IPTAB_NAME);
if((fil=fopen(file_name(fname,buf,NULL),"r"))==NULL)
print_err(FERROR,procname,"fopen",fname,fil);
if(fscanf(fil,"%i\n",&nproc)!=1)
print_err(FERROR,procname,"fscanf(nproc)",fname,fil);
if(nproc<1)
print_err(FERROR,procname,"nproc < 1",fname,fil);
proc_ip=ip=(char**)malloc(sizeof(char*)*nproc);
if(proc_ip==NULL)print_err(ERROR,procname,"malloc(proc_ip)",NULL,NULL);
proc_name=in=(char**)malloc(sizeof(char*)*nproc);
if(proc_name==NULL)print_err(ERROR,procname,"malloc(proc_name)",NULL,NULL);
for(i=0;i<nproc;i++,ip++,in++)
{
if(fscanf(fil,"%s",buf)!=1 )
{
char erbuf[100];
sprintf(erbuf, "fscanf:line %i",i);
print_err(FERROR,procname,erbuf,fname,fil);
}
*in=strdup(buf);
if( *in==NULL )print_err(ERROR,procname,"strdup(*in)",NULL,NULL);
*in=strdup(buf);
if((c=gethostip(*in))==NULL)
{
char erbuf[100];
sprintf(erbuf, "gethostip failed for %s",*in);
print_err(ERROR,procname,erbuf,NULL,NULL);
}
*ip=strdup(c);
if( *ip==NULL )print_err(ERROR,procname,"strdup(*ip)",NULL,NULL);
for(j=0;j<i;j++)
if(!strcmp(proc_ip[j],*ip))
{
char erbuf[100];
sprintf(erbuf, "repeated node %s in line %i and %i",*ip,j,i);
print_err(FERROR,procname,erbuf,fname,fil);
}
}
fclose(fil);
for(i=0;i<nproc;i++)
if(!strcmp(this_ip,proc_ip[i]))break;
if(i==nproc)
{
char erbuf[100];
sprintf(erbuf,"%s not in the table",this_name);
print_err(ERROR,procname,erbuf,NULL,NULL);
}
this_proc=i;
/* read connections */
strcat(strcpy(buf,name),CONNTAB_NAME);
if((fil=fopen(file_name(fname,buf,NULL),"r"))==NULL)
print_err(FERROR,procname,"fopen",fname,fil);
if(fscanf(fil,"%i\n",&i)!=1 || nproc!=i)
print_err(FERROR,procname,"fscanf(nproc)",fname,fil);
if(i<1)
{
print_err(WARNING,procname,"no interprocessor connections",NULL,NULL);
cn_inp.nconn=cn_out.nconn=cn_all.nconn=0;
cn_inp.lconn=cn_out.lconn=cn_all.lconn=NULL;
return 0;
}
tab_conn = cn= (struct CONN *)malloc( sizeof( struct CONN )*nproc);
if(cn==NULL) print_err(ERROR,procname,"malloc(tab_conn)",NULL,NULL);
for(i=0;i<nproc;i++,cn++)
{
char col=0;
if(fscanf(fil,"%i %i%c",&j,&cn->nconn,&col)!=3
|| cn->nconn < 0
|| cn->nconn > nproc
|| col!=':'
|| i!=j)
{
char erbuf[100];
sprintf(erbuf, "fscanf: proc no %i(%i)",i,j);
print_err(FERROR,procname,erbuf,fname,fil);
}
else
if(cn->nconn>0){
cn->lconn=k=(int*)malloc(sizeof(int)*cn->nconn);
if(k==NULL)print_err(ERROR,procname,"malloc(cn->lconn)",NULL,NULL);
for(j=0;j<cn->nconn;j++,k++)
{
if(fscanf(fil,"%i",k)!=1 || *k<0 || *k==i)
{ char erbuf[100];
sprintf(erbuf, "read proc no %i(%i)",i,j);
print_err(FERROR,procname,erbuf,fname,fil);
}
}
fscanf(fil,"%[^\n]\n",buf); /* skip to end of line */
qsort(cn->lconn,cn->nconn,sizeof(int),compint);
}
else cn->lconn=NULL;
}
fclose(fil);
cn_inp.lconn=(int*)malloc(sizeof(int)*nproc);
if(cn_inp.lconn==NULL)print_err(ERROR,procname,"malloc(cn_inp)",NULL,NULL);
/* find inputs to this */
for(i=0,l=0,cn=tab_conn;i<nproc;i++,cn++)
{
if(i!=this_proc)
{
for(j=0;j<cn->nconn;j++)
if(cn->lconn[j]==this_proc)
{
cn_inp.lconn[l++]=i;
break;
}
free(cn->lconn);
}
else
{
cn_out.nconn=cn->nconn;
cn_out.lconn=cn->lconn;
}
}
if(l==0 && nproc>1)
print_err(ERROR,procname,"no input connections to this",NULL,NULL);
cn_inp.nconn=l;
free(tab_conn);
if(cn_out.nconn)
{ int ** b; //was short
cn_out.buf=b=(int**)malloc(sizeof(int*)*cn_out.nconn); //was cn_out.buf=b=(short**)malloc(sizeof(short*)*cn_out.nconn)
if(b==NULL)print_err(ERROR,procname,"malloc(cn_out.buf)",NULL,NULL);
for(i=0;i<cn_out.nconn;i++,b++)
if((*b=(int*)malloc(PACKETBUFFER+2))==NULL) //was if((*b=(short*)malloc(PACKETBUFFER+2))==NULL)
print_err(ERROR,procname,"malloc(*cn_out.buf)",NULL,NULL);
}
if(cn_inp.nconn)
{ /* buffers for input */
int ** b; //was short
buf_in=(int*)malloc(PACKETBUFFER*nproc+2); //was buf_in=(short*)malloc(PACKETBUFFER*nproc+2);
if(buf_in==NULL)print_err(ERROR,procname,"malloc(buf_in)",NULL,NULL);
cn_inp.buf=b=(int**)malloc(sizeof(int*)*cn_inp.nconn); //was cn_inp.buf=b=(short**)malloc(sizeof(short*)*cn_inp.nconn)
if(b==NULL)print_err(ERROR,procname,"malloc(cn_inp.buf)",NULL,NULL);
for(i=0;i<cn_inp.nconn;i++,b++){
if((*b=(int*)malloc(PACKETBUFFER*nproc+2))==NULL) //was if((*b=(short*)malloc(PACKETBUFFER*nproc+2))==NULL)
print_err(ERROR,procname,"malloc(*cn_inp.buf)",NULL,NULL);
*b[0]=0;}
}
/* find all connections (inp & out) */
cn_all.lconn=(int*)malloc(sizeof(int)*(nproc-1));
if(cn_all.lconn==NULL)print_err(ERROR,procname,"malloc(cn_all)",NULL,NULL);
for(i=0,j=0,l=0;i<cn_inp.nconn && j<cn_out.nconn;)
{
if(cn_inp.lconn[i]<cn_out.lconn[j])
cn_all.lconn[l++]=cn_inp.lconn[i++];
else if(cn_inp.lconn[i]==cn_out.lconn[j])
{ cn_all.lconn[l++]=cn_inp.lconn[i++];j++;}
else
cn_all.lconn[l++]=cn_out.lconn[j++];
}
while(i<cn_inp.nconn)cn_all.lconn[l++]=cn_inp.lconn[i++];
while(j<cn_out.nconn)cn_all.lconn[l++]=cn_out.lconn[j++];
cn_all.nconn=l;
/*
fprintf(stderr,"\ncn_all:\n nconn=%i: ", cn_all.nconn);
for(i=0;i<cn_all.nconn;i++)
fprintf(stderr,"%i,",cn_all.lconn[i]);
fprintf(stderr,"\ncn_inp:\n nconn=%i: ", cn_inp.nconn);
for(i=0;i<cn_inp.nconn;i++)
fprintf(stderr,"%i,",cn_inp.lconn[i]);
fprintf(stderr,"\ncn_out:\n nconn=%i: ", cn_out.nconn);
for(i=0;i<cn_out.nconn;i++)
fprintf(stderr,"%i,",cn_out.lconn[i]);
fprintf(stderr,"\n");
*/
return nproc;
}
char * gethostip(char* name)
{
/* return IP of named host as string
and host structure in host
if name == NULL returns this host IP
side effect midifies global struct host
*/
char buf[100];
size_t len=100;
struct in_addr in;
if(name==NULL)
{
if(gethostname(buf,len)<0)
{
return NULL;
}
host=gethostbyname(buf);
}
else
host=gethostbyname(name);
if(host){
memcpy(&in.s_addr,*host->h_addr_list, sizeof(in.s_addr));
return inet_ntoa(in);
}
else
{
return NULL;
}
}
void proc_conn()
{
char *pname="proc_conn";
int i,j,iconn,icc,nconn,mconn,*l,flag=1; /* flag for ioctl */
struct sockaddr_in conn_adr, serv_adr;
int conn_len=sizeof(conn_adr);
fd_set fdset,fdacc, fdex;
struct timeval tim = {10,0}; /* timeout for select*/
int nrep = 20; /* repetition for connect */
if((conn_sock =(int*)malloc(sizeof(int)*nproc))==NULL)
print_err(ERROR,pname,"memory:conn_sock",NULL,NULL);
for(i=0;i<nproc;conn_sock[i++]=-1);
if(this_proc < nproc-1)
{
memset(&serv_adr, 0, conn_len); /* Clear it out */
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY); /* accept any */
serv_adr.sin_port = htons(PORT);
if ((orig_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) /* SOCKET */
print_err(PERROR,pname,"orig_socket",NULL,NULL);
if (ioctl(orig_sock, FIONBIO, &flag) < 0 ) /* ioctl */
print_err(ERROR,pname,"orig_ioctl",NULL,NULL);
if (bind(orig_sock,(struct sockaddr*)&serv_adr,sizeof(serv_adr))<0) /* BIND */
print_err(PERROR,pname,"bind",NULL,NULL);
if (listen(orig_sock,nproc-1)<0) /* listen */
print_err(PERROR,pname,"bind",NULL,NULL);
nfd=orig_sock+1;
}
memset(&conn_adr, 0, sizeof(conn_adr)); /* Clear it out */
conn_adr.sin_family = AF_INET;
conn_adr.sin_port = htons( PORT );
FD_ZERO(&fdset); /* set for connect */
FD_ZERO(&fdacc); /* set for access */
FD_ZERO(&fdex); /* set for exceptions */
/* open sockets for processors to connect to (less < this; inp&out!! ) */
i=0;
while(i < cn_all.nconn && ((j=cn_all.lconn[i]) < this_proc))
{
if((conn_sock[j]=socket(AF_INET,SOCK_STREAM,0)) < 0 )
print_err(PERROR,pname,"conn_socket",NULL,NULL);
if ((ioctl(conn_sock[j], FIONBIO, &flag)) < 0 )
print_err(PERROR,pname,"conn_ioctl",NULL,NULL);
nfd=conn_sock[j]+1;
i++;
}
iconn=i; /* no of connections to make with connect*/
nconn=cn_all.nconn-i; /* no of connections to accept */
for(i=0;i<nfd;i++) /* exceptions */
FD_SET(i,&fdex);
if(nconn>0)
{
nconn=try_accept(orig_sock,nconn);
if(nconn>0)
FD_SET(orig_sock,&fdacc);
}
mconn=iconn;
for(i=0;i<mconn;i++)
{
if(try_connect(cn_all.lconn[i],&conn_adr))
iconn--;
else
FD_SET(conn_sock[cn_all.lconn[i]],&fdset);
}
/* check if all connections made and no errors */
icc=0;
while(iconn>0 || nconn>0)
{
int j;
fd_set fds,fdr,fde;
memcpy(&fds,&fdset,sizeof(fdset));
memcpy(&fdr,&fdacc,sizeof(fdacc));
memcpy(&fde,&fdex,sizeof(fdacc));
if((i=select(nfd,&fdr,&fds,&fde,&tim))<0)
print_err(PERROR,pname,"select",NULL,NULL);
if(i==0)
{
char erbuf[100];
sprintf(erbuf," select timeout iconn=%i, nconn=%i",iconn,nconn);
print_err(ERROR,pname,erbuf,NULL,NULL);
}
for(j=0;j<nfd;j++)
if(FD_ISSET(j,&fde))
{ char erbuf[100];
sprintf( erbuf,"exception in fd %d",j);
print_err(PERROR,pname,erbuf,NULL,NULL);
}
if(nconn && FD_ISSET(orig_sock,&fdr))
{
nconn=try_accept(orig_sock,nconn);
i--;
}
for(j=0;j<mconn && i>0;j++)
{
char erbuf[100];
int er_so,er_len=sizeof(int);
int jj=cn_all.lconn[j];
if(!FD_ISSET(conn_sock[jj],&fds))continue;
sprintf(erbuf,"connection to IP %s",proc_ip[jj]);
if(getsockopt(conn_sock[jj],SOL_SOCKET,SO_ERROR,&er_so,&er_len))
{
strcat(erbuf,":getsockopt");
print_err(PERROR,pname,erbuf,NULL,NULL);
}
else if(er_so)
{
if(er_so==ECONNREFUSED)
{
close(conn_sock[jj]);
if((conn_sock[jj]=socket(AF_INET,SOCK_STREAM,0)) < 0 )
print_err(PERROR,pname,"reconn_socket",NULL,NULL);
if ((ioctl(conn_sock[jj], FIONBIO, &flag)) < 0 )
print_err(PERROR,pname,"reconn_ioctl",NULL,NULL);
if(try_connect(jj,&conn_adr))
{
iconn--;
FD_CLR(conn_sock[jj],&fdset);
icc=0;
}
else
{
// if(icc++>tim.tv_sec)
if(icc++>nrep)
{
strcat(erbuf,"\n\tconnect repetition limit");
print_err(ERROR,pname,erbuf,NULL,NULL);
}
}
/* strcat(erbuf,"getsockopt:ECONREFUSED");
print_err(DEBUG,pname,erbuf,NULL,NULL); */
continue;
}
else
{
errno=er_so;
strcat(erbuf,":getsockopt\n\tSO_ERROR");
print_err(PERROR,pname,erbuf,NULL,NULL);
}
}
FD_CLR(conn_sock[jj],&fdset);
i--;iconn--;icc=0;
strcat(erbuf," successful");
print_err(DEBUG,pname,erbuf,NULL,NULL);
}
sleep(1);
}
if(orig_sock>-1) close(orig_sock); /* no longer needed */
/* set mask for select for future reading */
FD_ZERO(&fdr0);
nfd=-1;
l=cn_inp.lconn;
for(i=0;i<cn_inp.nconn;i++,l++)
{
FD_SET(conn_sock[*l],&fdr0);
if(conn_sock[*l]>nfd)nfd=conn_sock[*l];
}
nfd++;
}
int try_accept(int orig_sock, int nconn)
{
char *pname="try_accept";
struct sockaddr_in conn_adr;
int conn_len=sizeof(conn_adr),new_sock;
memset(&conn_adr, 0, sizeof(conn_adr));
print_err(DEBUG,pname,"entry",NULL,NULL);
while(nconn>0) {
if((new_sock=accept(orig_sock,(struct sockaddr*)&conn_adr,&conn_len))<0) /*accept */
{
if(errno!=EWOULDBLOCK)
print_err(PERROR,pname,"accept",NULL,NULL);
return nconn;
}
else
{ char erbuf[100];
int j;
char * ip = inet_ntoa(conn_adr.sin_addr); /* get client ip */
for(j=0;j<nproc;j++)
if(!strcmp(ip,proc_ip[j]))break;
if(j==nproc)
{
sprintf(erbuf,"accepted IP %s not in the table",ip);
print_err(ERROR,pname,erbuf,NULL,NULL);
}
else if(j<this_proc)
{
sprintf(erbuf,"accepted IP %s < this",ip);
print_err(ERROR,pname,erbuf,NULL,NULL);
}
sprintf(erbuf,"accepted connection from %s",ip);
print_err(DEBUG,pname,erbuf,NULL,NULL);
conn_sock[j]=new_sock;
nconn--;
}
}
return 0;
}
int try_connect(int i, struct sockaddr_in *conn_adr)
{
char * pname="try_connect";
if(!inet_aton(proc_ip[i],&(conn_adr->sin_addr))) /* IP address */
print_err(ERROR,pname,"inet_aton",NULL,NULL);
if(connect(conn_sock[i],(struct sockaddr *)conn_adr, sizeof(struct sockaddr)) < 0)
{ char erbuf[100];
if(errno!=EINPROGRESS)
{
sprintf(erbuf,"connect to %s",proc_ip[i]);
print_err(PERROR,pname,erbuf,NULL,NULL);
}
sprintf(erbuf,"trying to connect to %s",proc_ip[i]);
print_err(DEBUG,pname,erbuf,NULL,NULL);
return 0;
}
else
{ char erbuf[100];
sprintf(erbuf,"connected to %s",proc_ip[i]);
print_err(DEBUG,pname,erbuf,NULL,NULL);
return 1;
}
}