Skip to Main Content

DevOps, CI/CD and Automation

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

OCI crash while executing PL/SQL

581207Apr 23 2008 — edited Apr 29 2008
HI,

I have a multithreaded application which uses OCI. The application has a master thread and spwans 4 child threads that connect to the db and execute specific queries. The problem occurs while trying to execute the last PL/SQL command which includes some bfile operations. I had read that OCI does not mutex properly on LOB operations, but I do not get any error, the application just hangs. Attached is part of the code that has all OCI calls.
The culprit is the update_tables function. The application hangs in there when processing is very close in time for threads.

[] 20080411 10:40:52.255 Oracle NOTICE - [2000][id 1093229] Executing PL/SQL to update all tables
[] 20080411 10:40:52.259 Oracle NOTICE - [2000][id 1093230] Executing PL/SQL to update all tables
[] 20080411 10:40:52.321 Oracle NOTICE - [2000][id 1093231] Executing PL/SQL to update all tables

this is where is hangs.
//!This program is the server portion of the distributed job system.

#include "application.h"
#include "log4cwrapper.h"
//#include "sql.h"
#include <oci.h>

OCIEnv*		  env;


int main(int argc, char* argv[]){
   	
	
	OCIError*     err;
	OCIServer*    srv;
	OCISvcCtx*    svc;
	OCISession*   ses;
	sword r;
	OCIStmt*	  initslc;
	OCIStmt*	  comm;
	OCIBind 	*bnd1hp = (OCIBind*) 0;
	char stmt[]="commit";
	char update_initial[] ="update CONV_IMAGE_JOBS set conv_attempts_cnt = :1";

	FILE *log_fd_main;
	int sockfd, logfd, pidfd, count = 0, oci_error = 0;
	pthread_t job_thread;			//!<Threads for image conversion
    pthread_attr_t thread_attr;		//!<Thread attribute 
    int *clientfd;					//!<Non-negative file descriptor for new socket
    pthread_mutexattr_t mattr;		//!<Mutex attribute used to set the properties of the mutex
	char masterpid[100];
	
  
	log_fd_main = fopen(log_file, "a");
	if (log_fd_main == NULL) {
		printf(" ERROR - DjsMaster [0002] Failed to open log file.\n");
        exit(1);
    }
    r=OCIEnvCreate( &env, 
          OCI_THREADED
         , 0, 0, 0, 0, 0, 0); 

    if (r != OCI_SUCCESS) {
		strcpy(err_msg,"Couldn't create environment (OCIEnvCreate)");
		oci_error = 1;
		cleanup();
    }
    
	OCIHandleAlloc(env, (dvoid**)&err, OCI_HTYPE_ERROR,   0, 0);
	OCIHandleAlloc(env, (dvoid**)&srv, OCI_HTYPE_SERVER,  0, 0);
	OCIHandleAlloc(env, (dvoid**)&svc, OCI_HTYPE_SVCCTX,  0, 0);
	OCIHandleAlloc(env, (dvoid**)&ses, OCI_HTYPE_SESSION, 0, 0);

	r=OCIServerAttach(srv, err, dbname, strlen((const char*)dbname), (ub4) OCI_DEFAULT);
	if (r != OCI_SUCCESS) {
		checkerrr(err, r, err_msg);
		oci_error = 1;
		cleanup();
	}

	OCIAttrSet(svc, OCI_HTYPE_SVCCTX, srv, 0, OCI_ATTR_SERVER,  err);
	OCIAttrSet(ses, OCI_HTYPE_SESSION, username, 
            strlen((const char*)username), OCI_ATTR_USERNAME, err); 
	OCIAttrSet(ses, OCI_HTYPE_SESSION, password, 
            strlen((const char*)password), OCI_ATTR_PASSWORD, err); 

	if ( (r=OCIAttrSet((dvoid *) svc, (ub4) OCI_HTYPE_SVCCTX, 
            (dvoid *) ses, (ub4) 0, (ub4) OCI_ATTR_SESSION, err)) ) {
		checkerrr(err, r,err_msg);
		oci_error = 1;
		cleanup();
	}

	r=OCISessionBegin (svc, err, ses, OCI_CRED_RDBMS, OCI_DEFAULT);
	if (r != OCI_SUCCESS) {
		checkerrr(err, r,err_msg);
		oci_error = 1;
		cleanup();
	}

	if (OCIHandleAlloc((dvoid *) env, (dvoid **) &initslc, 
            (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		cleanup();
	}
	if ((r=OCIStmtPrepare(initslc, err, (text*)(void*)update_initial,
            (ub4) strlen((char *) update_initial),
            (ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		OCIHandleFree((dvoid *) initslc, (ub4) OCI_HTYPE_STMT);
		oci_error = 1;
		cleanup();
	}
	int coun=0;
	if ((r = OCIBindByPos(initslc, &bnd1hp, err, (ub4) 1, &coun, sizeof(count), SQLT_INT,
                             (dvoid *) 0, (ub2 *) 0, (ub2 *) 0, (ub4) 0, (ub4 *) 0, OCI_DEFAULT))){
      	checkerrr(err, r, (text*)err_msg);
        oci_error = 1;
		cleanup();
    }
	if ((r = OCIStmtExecute(svc, initslc, err, (ub4) 1, (ub4) 0, (CONST OCISnapshot *) 0,
                               (OCISnapshot *) 0, (ub4) OCI_DEFAULT))){
                 
	     checkerrr(err, r, (text*)err_msg);
         OCIHandleFree((dvoid *) sh, (ub4) OCI_HTYPE_STMT);
         oci_error = 1;
		 cleanup();
    }

	if ((r=OCIHandleAlloc((dvoid *) env, (dvoid **) &comm, 
            (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		cleanup();
	}

	if ((r=OCIStmtPrepare(comm, err, (text*)(void*)stmt, 
            (ub4) strlen((char *) stmt), (ub4) OCI_NTV_SYNTAX, 
            (ub4) OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
		oci_error = 1;
		cleanup();
	}
	if ((r=OCIStmtExecute(svc, comm, err, (ub4) 1, (ub4) 0, 
            (CONST OCISnapshot *) 0, (OCISnapshot *) 0, 
            (ub4) OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
		oci_error = 1;
		cleanup();
	}

	OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
	OCIHandleFree((dvoid *) initslc, (ub4) OCI_HTYPE_STMT);
	if ((r = OCISessionEnd(svc, err, ses, OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		cleanup();
    }
    if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		cleanup();
    }
	if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		cleanup();
    }
    if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		cleanup();
    }
    if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		cleanup();
    }
    if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
		oci_error = 1;
		cleanup();
    }
	
	if ( oci_error == 1) {
		cleanup();
	}
	
   	log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
	log4c_appender_set_udata(myappender, log_fd_main);
	mylog_msg("DjsMaster", LOG4C_PRIORITY_DEBUG,
      " [0000]  daemon running, ready to accept connections,"
      "waiting for to connect......");	
	fclose(log_fd_main);
	while (1) {
		/*!  
		   Accept a connection from a slave.  
		*/
        clientfd = malloc(sizeof(int));
        if ((*clientfd = serv_accept(sockfd)) < 0) {
           	exit(1);
      	}
		
		/*!  
		   Fork off a child thread to process this slave.  
		*/
        pthread_create(&job_thread, &thread_attr, jobfunction, clientfd);
	    //count++;
      	sleep(1);
    }
}


//!Dump Oracle Error
void cleanup() {
    
	log4c_appender_t* myappender;
	myappender = log4c_appender_get("myappender");
	log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
	log4c_appender_set_udata(myappender, fopen(log_file, "a"));
	mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR, 
      " [2001] Oracle error %s", err_msg);
}


void *jobfunction(void *arg) {

	OCIError*     err;
	OCIServer*    srv;
	OCISvcCtx*    svc;
	OCISession*   ses;
	sword r;
	OCIBind 	*bnd1hp = (OCIBind*) 0;
	OCIBind 	*bnd2hp = (OCIBind*) 0;
	OCIBind 	*bnd3hp = (OCIBind*) 0;
	OCIDefine *def1hp = (OCIDefine *) 0;
	OCIDefine *def2hp = (OCIDefine *) 0;
	OCIDefine *def3hp = (OCIDefine *) 0;
	OCIDefine *def4hp = (OCIDefine *) 0;
	OCIDefine *def5hp = (OCIDefine *) 0;
	OCIDefine *def6hp = (OCIDefine *) 0;
	OCIDefine *def7hp = (OCIDefine *) 0;
	OCIDefine *def8hp = (OCIDefine *) 0;
	OCIDefine *def9hp = (OCIDefine *) 0;
	char select_all[] =" select filename, filepath, image_system_id, quality_status_cd,"
                      " format_cd,global_id, id, conv_attempts_cnt, image_resolution"
				          " from TABLE_NAME where conv_attempts_cnt = 0 order by created_datetime ";
	char select_count[] ="select conv_attempts_cnt, image_resolution from TABLE_NAME"
                        " where id = :1 ";
	char update_single[] ="update TABLENAME set conv_attempts_cnt = :1 where id = :2 ";
	char stmt[]="commit";
	int oci_error = 0;

	/*!
	   Variables to hold the values returned from SQL SELECT 
	*/
	int id, global_id, format_cd, quality_status;
	int resolution, image_id, conv_attempts_cnt;
    
	char filename[101];				   
   char filepath[201];				   
	char filepath_full[MAX_BUF];	   
	char tif_filename[101];			   
	char return_filepath[MAX_BUF];	
	char conv_details[MAX_BUF];		
	char *hold_filename;
	char *hold_filepath;
	char *hold_tiffile;
   char *progress;
	char *tempstr_p;
   char *jobnum_p;
   char *returnname_p;
	char *returnsize_p;
	char tempstr[MAX_BUF];

	int count = 0;			   
	int success_flag = 0;	
   int reprocess_flag = 0; 
	char s_resolution[5];	
	char s_format[5];		   
	struct stat filebuf;	  
	
	int myfd, datafd, jobid, i, update, holdid;
	char sockread[1024], sockwrite[1024], 
        jobnum[10], dataname[1024], 
        line_to_log[1024], databuf[1024];
	int datasize, bufsize, returnsize, filesize;
	FILE *log_fd;

	log4c_appender_t* myappender;
	myappender = log4c_appender_get("myappender");
	log_fd = fopen(log_file, "a");
	if (log_fd == NULL) {
		perror(" ERROR - DjsMaster [1000] Failed to open log file.\n");
        exit(1);
    }
	myfd = *((int *)arg);		
   log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
	log4c_appender_set_udata(myappender, log_fd);
	mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
            " [1000] Starting jobs on %d.", 
            (int)pthread_self());	
	    
	Readline_r(myfd, sockread, 255);
    
	/*!
	   Continue to read requests received from the slaves.  
	*/
	while (strcmp(sockread, SLAVE_DONE) != 0) {
		OCIStmt* slcth;			
		OCIStmt* updth;			
		OCIStmt* slcth2;		  
		OCIStmt*	  comm;

		sleep(15);
      log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
		log4c_appender_set_udata(myappender, log_fd);
		mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
         " [1000] Continue to read request from %d, looping with %s ", 
         (int)pthread_self(), sockread);	
				
        if (strcmp(sockread, REQUEST_JOB) == 0) {
		
			log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
			log4c_appender_set_udata(myappender, log_fd);
			mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
            " [1000] %d  requesting job. Connecting to database.", 
            (int)pthread_self());	
	      if (oci_error == 1) {
         
            if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
				checkerrr(err, r, (text*)err_msg);
				cleanup();
			   }
			   if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
				   checkerrr(err, r, (text*)err_msg);
				   cleanup();
				}
			   if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
				   checkerrr(err, r, (text*)err_msg);
				   cleanup();
				   
			   }
			   if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
			   	checkerrr(err, r, (text*)err_msg);
				   cleanup();
				}
			   if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
				   cleanup();
			   }
            oci_error = 0;
         }
			
			OCIHandleAlloc(env, (dvoid**)&err, OCI_HTYPE_ERROR,   0, 0);
			OCIHandleAlloc(env, (dvoid**)&srv, OCI_HTYPE_SERVER,  0, 0);
			OCIHandleAlloc(env, (dvoid**)&svc, OCI_HTYPE_SVCCTX,  0, 0);
			OCIHandleAlloc(env, (dvoid**)&ses, OCI_HTYPE_SESSION, 0, 0);

			r=OCIServerAttach(srv, err, dbname, 
               strlen((const char*)dbname), (ub4) OCI_DEFAULT);
			if (r != OCI_SUCCESS) {
				checkerrr(err, r, err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}

			OCIAttrSet(svc, OCI_HTYPE_SVCCTX, srv, 0, OCI_ATTR_SERVER,  err);
			
         OCIAttrSet(ses, OCI_HTYPE_SESSION, username, 
               strlen((const char*)username), OCI_ATTR_USERNAME, err); 
			
         OCIAttrSet(ses, OCI_HTYPE_SESSION, password, 
               strlen((const char*)password), OCI_ATTR_PASSWORD, err); 

			if ( (r=OCIAttrSet((dvoid *) svc, (ub4) OCI_HTYPE_SVCCTX, 
               (dvoid *) ses, (ub4) 0, (ub4) OCI_ATTR_SESSION, err)) ) {
				checkerrr(err, r,err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}

			r=OCISessionBegin (svc, err, ses, OCI_CRED_RDBMS, OCI_DEFAULT);
			if (r != OCI_SUCCESS) {
				checkerrr(err, r,err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}
			
			if (OCIHandleAlloc((dvoid *) env, (dvoid **) &slcth, 
               (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
				checkerrr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}
			if ((r=OCIStmtPrepare(slcth, err, (text*)(void*)select_all, 
                  (ub4) strlen((char *) select_all), 
                  (ub4) OCI_NTV_SYNTAX, 
                  (ub4) OCI_DEFAULT))) {
				checkerrr(err, r, (text*)err_msg);
				OCIHandleFree((dvoid *) slcth, (ub4) OCI_HTYPE_STMT);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}
			
			if ((r=OCIDefineByPos(slcth, &def1hp, err, 1, 
                  filename, 100, SQLT_CHR, (dvoid *) 0, 
                  (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}

			if ((r=OCIDefineByPos(slcth, &def2hp, err, 2, 
                  filepath, 100, SQLT_CHR, (dvoid *) 0, 
                  (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}

			if ((r=OCIDefineByPos(slcth, &def3hp, err, 3, 
                  &image_id, sizeof(int), SQLT_INT, (dvoid *) 0, 
                  (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}
			
			if ((r=OCIDefineByPos(slcth, &def4hp, err, 4, 
                  &quality_status, sizeof(int), SQLT_INT, 
                  (dvoid *) 0, (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}

			if ((r=OCIDefineByPos(slcth, &def5hp, err, 5, 
                  &format_cd, sizeof(int), SQLT_INT, (dvoid *) 0, 
                  (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}

			if ((r=OCIDefineByPos(slcth, &def6hp, err, 6, 
                  &global_id, sizeof(int), SQLT_INT, (dvoid *) 0, 
                  (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}
			
			if ((r=OCIDefineByPos(slcth, &def7hp, err, 7, 
                  &id, sizeof(int), SQLT_INT, (dvoid *) 0, 
                  (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}
			
			if ((r=OCIDefineByPos(slcth, &def8hp, err, 8, 
                  &conv_attempts_cnt, sizeof(int), SQLT_INT, (dvoid *) 0, 
                  (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
			}
			
			if ((r=OCIDefineByPos(slcth, &def9hp, err, 9, 
                  &resolution, sizeof(int), SQLT_INT, (dvoid *) 0, 
                  (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
				checkerr(err, r, (text*)err_msg);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}

			if ((r=OCIStmtExecute(svc, slcth, err, (ub4) 0, (ub4) 0, 
                  (CONST OCISnapshot *) 0, (OCISnapshot *) 0, 
                  (ub4) OCI_DEFAULT))){
				checkerr(err, r, (text*)err_msg);
				OCIHandleFree((dvoid *) slcth, (ub4) OCI_HTYPE_STMT);
				oci_error = 1;
				cleanup();
				sprintf(sockwrite, "%s", NO_JOB);
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				Readline_r(myfd, sockread, 255);
				continue;
			}
			
			r = OCIStmtFetch(slcth, err, (ub4) 1, (ub4) OCI_FETCH_NEXT, (ub4) OCI_DEFAULT);

			if (r == OCI_SUCCESS) {
				success_flag = 1;
            reprocess_flag = 0;
			}
			else if (r == OCI_SUCCESS_WITH_INFO) {
				success_flag = 1;
            reprocess_flag = 0;
			}
			else {
				if (r != OCI_NO_DATA) {
					checkerr(err, r, (text*)err_msg);
					success_flag = 0;
				}
				success_flag = 0;
			}

			OCIHandleFree((dvoid *) slcth, (ub4) OCI_HTYPE_STMT);
				
			/*!
			    No job in the table. Continue listening. Tell the slave
			    there are no current jobs to process.  
			*/
        	if (! (success_flag)) {
            if ((r = OCISessionEnd(svc, err, ses, OCI_DEFAULT))) {
				   checkerrr(err, r, (text*)err_msg);
				   oci_error = 1;
				   cleanup();
				   sprintf(sockwrite, "%s", NO_JOB);
				   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				   Readline_r(myfd, sockread, 255);
				   continue;
			   }
            if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
				   checkerrr(err, r, (text*)err_msg);
				   oci_error = 1;
				   cleanup();
				   sprintf(sockwrite, "%s", NO_JOB);
				   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				   Readline_r(myfd, sockread, 255);
				   continue;
			   }
			   if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
				   checkerrr(err, r, (text*)err_msg);
				   oci_error = 1;
				   cleanup();
				   sprintf(sockwrite, "%s", NO_JOB);
				   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				   Readline_r(myfd, sockread, 255);
				   continue;
			   }
			   if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
				   checkerrr(err, r, (text*)err_msg);
				   oci_error = 1;
				   cleanup();
				   sprintf(sockwrite, "%s", NO_JOB);
				   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				   Readline_r(myfd, sockread, 255);
				   continue;
			   }
			   if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
				   checkerrr(err, r, (text*)err_msg);
				   oci_error = 1;
				   cleanup();
				   sprintf(sockwrite, "%s", NO_JOB);
				   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				   Readline_r(myfd, sockread, 255);
				   continue;
			   }
			   if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
				   oci_error = 1;
				   cleanup();
				   sprintf(sockwrite, "%s", NO_JOB);
				   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				   Readline_r(myfd, sockread, 255);
				   continue;
			   }
				success_flag = 0;
				sprintf(sockwrite, "%s", NO_JOB);
		      log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				log4c_appender_set_udata(myappender, log_fd);
				mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
                  " [1000] No new jobs, writing to %d %s", 
                  (int)pthread_self(), sockwrite);	
		
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
            Readline_r(myfd, sockread, 255);
				success_flag = 0;
            continue;
			}
			
			/*  New job is fetched, process it.  */
		  	else {
            reprocess_flag = 0;
reprocess:	log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				log4c_appender_set_udata(myappender, log_fd);
				mylog_log("Oracle", LOG4C_PRIORITY_DEBUG,
               " [2000][id %d] Fetching data for new job from databse", id);
				mylog_log("Oracle", LOG4C_PRIORITY_DEBUG, 
               " [2000][id %d]Fetched job_id=%d, format=%d, resolution=%d, conv_attemtps=%d, imagefile=%s, imagepath=%s",
               id, id, format_cd, resolution, 
               conv_attempts_cnt, filename, filepath);
				
				log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				log4c_appender_set_udata(myappender, log_fd);
				mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
               " [1000][id %d] New job for %d. Job will be processed.", 
               id, (int)pthread_self());	
				holdid = id;
            if ( reprocess_flag == 1 )
            {

				   OCIHandleAlloc(env, (dvoid**)&err, OCI_HTYPE_ERROR,   0, 0);
				   OCIHandleAlloc(env, (dvoid**)&srv, OCI_HTYPE_SERVER,  0, 0);
				   OCIHandleAlloc(env, (dvoid**)&svc, OCI_HTYPE_SVCCTX,  0, 0);
				   OCIHandleAlloc(env, (dvoid**)&ses, OCI_HTYPE_SESSION, 0, 0);

				   r=OCIServerAttach(srv, err, dbname, strlen((const char*)dbname), (ub4) OCI_DEFAULT);
				   if (r != OCI_SUCCESS) {
					   checkerrr(err, r, err_msg);
					   oci_error = 1;
					   cleanup();
					   success_flag = 0;
					   sprintf(sockwrite, "%s", NO_JOB);
					   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					   Readline_r(myfd, sockread, 255);
					   continue;
				   }
           
				   OCIAttrSet(svc, OCI_HTYPE_SVCCTX, srv, 0, OCI_ATTR_SERVER,  err);
				   OCIAttrSet(ses, OCI_HTYPE_SESSION, username, strlen((const char*)username), OCI_ATTR_USERNAME, err); 
				   OCIAttrSet(ses, OCI_HTYPE_SESSION, password, strlen((const char*)password), OCI_ATTR_PASSWORD, err); 

				   if ( (r=OCIAttrSet((dvoid *) svc, (ub4) OCI_HTYPE_SVCCTX, (dvoid *) ses, (ub4) 0, (ub4) OCI_ATTR_SESSION, err)) ) {
					   checkerrr(err, r,err_msg);
					   oci_error = 1;
					   cleanup();
					   success_flag = 0;
					   sprintf(sockwrite, "%s", NO_JOB);
					   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					   Readline_r(myfd, sockread, 255);
					   continue;
				   }

				   r=OCISessionBegin (svc, err, ses, OCI_CRED_RDBMS, OCI_DEFAULT);
				   if (r != OCI_SUCCESS) {
					   checkerrr(err, r,err_msg);
					   oci_error = 1;
					   cleanup();
					   success_flag = 0;
					   sprintf(sockwrite, "%s", NO_JOB);
					   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					   Readline_r(myfd, sockread, 255);
					   continue;
				   }
            }
			
				if (OCIHandleAlloc((dvoid *) env, (dvoid **) &slcth2, 
                  (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
					checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r=OCIStmtPrepare(slcth2, err, (text*)(void*)select_count, 
                  (ub4) strlen((char *) select_count), 
                  (ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT))) {
				
					checkerrr(err, r, (text*)err_msg);
					OCIHandleFree((dvoid *) slcth2, (ub4) OCI_HTYPE_STMT);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
			
				if ((r=OCIDefineByPos(slcth2, &def1hp, err, 1, 
                     &conv_attempts_cnt, sizeof(int), SQLT_INT, (dvoid *) 0, 
                     (ub2   *) 0, (ub2   *) 0, OCI_DEFAULT))) {
					checkerr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}

				if ((r = OCIBindByPos(slcth2, &bnd1hp, err, (ub4) 1, 
                     &holdid, sizeof(int), SQLT_INT,
                     (dvoid *) 0, (ub2 *) 0, (ub2 *) 0, (ub4) 0, 
                     (ub4 *) 0, OCI_DEFAULT))){
					checkerr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
        
				if ((r = OCIStmtExecute(svc, slcth2, err, (ub4) 0, (ub4) 0, 
                     (CONST OCISnapshot *) 0,
                     (OCISnapshot *) 0, (ub4) OCI_DEFAULT))){
					checkerr(err, r, (text*)err_msg);
					OCIHandleFree((dvoid *) slcth2, (ub4) OCI_HTYPE_STMT);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				r = OCIStmtFetch(slcth2, err, (ub4) 1, 
                  (ub4) OCI_FETCH_NEXT, (ub4) OCI_DEFAULT);

				if (r == OCI_SUCCESS) {
					success_flag = 1;
				}
				else if (r == OCI_SUCCESS_WITH_INFO) {
					success_flag = 1;
				}	
				else {
					if (r != OCI_NO_DATA) {
						checkerr(err, r, (text*)err_msg);
						success_flag = 0;
						sprintf(sockwrite, "%s", NO_JOB);
						Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
						Readline_r(myfd, sockread, 255);
						continue;
					}
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}

				OCIHandleFree((dvoid *) slcth2, (ub4) OCI_HTYPE_STMT);
	
				count = conv_attempts_cnt;
				count++;

				/*   Updating conv_attemtpts_cnt for the current job  */
				if (OCIHandleAlloc((dvoid *) env, (dvoid **) &updth, 
                  (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
					checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r=OCIStmtPrepare(updth, err, (text*)(void*)update_single,
                  (ub4) strlen((char *) update_single), (ub4) OCI_NTV_SYNTAX, 
                  (ub4) OCI_DEFAULT))) {
					checkerrr(err, r, (text*)err_msg);
					OCIHandleFree((dvoid *) updth, (ub4) OCI_HTYPE_STMT);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
	
				if ((r = OCIBindByPos(updth, &bnd2hp, err, (ub4) 1, &count, 
                     sizeof(count), SQLT_INT, (dvoid *) 0, (ub2 *) 0, 
                     (ub2 *) 0, (ub4) 0, (ub4 *) 0, OCI_DEFAULT))){
      			checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r = OCIBindByPos(updth, &bnd3hp, err, (ub4) 2, &holdid, 
                     sizeof(holdid), SQLT_INT, (dvoid *) 0, (ub2 *) 0,
                     (ub2 *) 0, (ub4) 0, (ub4 *) 0, OCI_DEFAULT))){
      			checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}

				if ((r = OCIStmtExecute(svc, updth, err, (ub4) 1, (ub4) 0, 
                     (CONST OCISnapshot *) 0, (OCISnapshot *) 0,
                     (ub4) OCI_DEFAULT))){
                 
					checkerrr(err, r, (text*)err_msg);
					OCIHandleFree((dvoid *) updth, (ub4) OCI_HTYPE_STMT);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}

				if ((r=OCIHandleAlloc((dvoid *) env, (dvoid **) &comm, 
                     (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0))) {
					checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}

				if ((r=OCIStmtPrepare(comm, err, (text*)(void*)stmt, 
                     (ub4) strlen((char *) stmt), 
                     (ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT))) {
					checkerrr(err, r, (text*)err_msg);
					OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r=OCIStmtExecute(svc, comm, err, (ub4) 1, 
                  (ub4) 0, (CONST OCISnapshot *) 0, (OCISnapshot *) 0, 
                  (ub4) OCI_DEFAULT))) {
					checkerrr(err, r, (text*)err_msg);
					OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}

				OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
				OCIHandleFree((dvoid *) updth, (ub4) OCI_HTYPE_STMT);
				if ((r = OCISessionEnd(svc, err, ses, OCI_DEFAULT))) {
					checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
					checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
					checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
					checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
					checkerrr(err, r, (text*)err_msg);
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
					oci_error = 1;
					cleanup();
					success_flag = 0;
					sprintf(sockwrite, "%s", NO_JOB);
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					continue;
				}
				
								
				hold_filename = strtok_r(filename, " ", &progress);
				strcpy(filename,hold_filename);
				strcpy(tif_filename, filename);
		      hold_tiffile = strtok_r(tif_filename, ".", &progress);
				strcpy(tif_filename, hold_tiffile);
				strcat(tif_filename, ".tif");
				hold_filepath = strtok_r(filepath, " ", &progress);
				strcpy(filepath, hold_filepath);
				strcpy(return_filepath, filepath);
				strcat(return_filepath, "/");
				strcat(return_filepath, filename);
				log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				log4c_appender_set_udata(myappender, log_fd);
				mylog_log("DjsMaster", LOG4C_PRIORITY_DEBUG,
               " [1000][id %d]  Image %s located in %s will be converted to image filename %s ",
               holdid, tif_filename, filepath, filename);	
				
				if (tif_filename == NULL) {
					log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
					log4c_appender_set_udata(myappender, log_fd);
					mylog_log("DjsMaster", LOG4C_PRIORITY_WARN, 
                  " [1001][id %d]  No file to convert. No file name associated with the job", 
                  holdid);	
					
               sprintf(sockwrite, "%s", NO_JOB);
               log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
					log4c_appender_set_udata(myappender, log_fd);
					mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
                  " [1000][id %d] No job to perform. Writing to %d %s", 
                  holdid, (int)pthread_self(), sockwrite);	
					Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
					Readline_r(myfd, sockread, 255);
					if ( count <= max_failure ) {
						success_flag = 1;
                  reprocess_flag = 1;
						goto reprocess;
					}
					else {
						success_flag = 0;
						continue;

					}
					//continue;
                }
				
				
				strcpy(filepath_full, filepath);
				strcat(filepath_full, "/");
				strcat(filepath_full, tif_filename);
								
				
				if (stat(filepath_full, &filebuf) < 0) {
               log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				   log4c_appender_set_udata(myappender, log_fd);
					mylog_log("DjsMaster", LOG4C_PRIORITY_WARN, 
                  " [1002][id %d]  Could not stat the file %s. Nothing to send to %d ", 
                  holdid, filepath_full, (int)pthread_self());	
			   	sprintf (sockwrite, "%s", NO_JOB);
               log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
					log4c_appender_set_udata(myappender, log_fd);
					mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
                  " [1000][id %d] No job to perform. Writing to %d %s", 
                  holdid, (int)pthread_self(), sockwrite);	
				   Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
               Readline_r(myfd, sockread, 255);
					if ( count <= max_failure ) {
						success_flag = 1;
                  reprocess_flag = 1;
						goto reprocess;
					}
					else {
						success_flag = 0;
						continue;

					}
                    //continue;
                }

				datasize = (int)filebuf.st_size;
				sprintf(s_resolution, "%d", resolution);
				sprintf(s_format, "%d", format_cd);
				strcpy(conv_details, "./conv.sh:");
				strcat(conv_details, tif_filename);
				strcat(conv_details, ":");
				strcat(conv_details, s_resolution);
				strcat(conv_details, ":");
				strcat(conv_details, s_format);
				log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				log4c_appender_set_udata(myappender, log_fd);
				mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
               " [1000][id %d] Writing to socket all conversion details for %d ", 
               holdid, (int)pthread_self());	
					
				sprintf(sockwrite, "%d=%s%%%s%%%d%%%s\n", id, conv_details, 
                  tif_filename, datasize, return_filepath);
                
				log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				log4c_appender_set_udata(myappender, log_fd);
				mylog_log("DjsMaster", LOG4C_PRIORITY_DEBUG, " [1000][id %d] Sending %s", 
                  holdid, sockwrite);	
					
				Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
				if ((datafd = open(filepath_full, O_RDONLY)) < 0) {
               close(myfd);
	            log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
					log4c_appender_set_udata(myappender, log_fd);
					mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR, 
                  " [1003][id %d] Open of data file failed. FATAL exiting %d", 
                  holdid, (int)pthread_self());	
				   pthread_exit(NULL);
				}
                filesize = 0;
                bufsize = Readn_r(datafd, databuf, MAX_BUF);
                if (bufsize < 0) {
                  close(myfd);
	               log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
					   log4c_appender_set_udata(myappender, log_fd);
					   mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR, 
                     " [1003][id %d] Read of data file failed. FATAL exiting %d", 
                     holdid, (int)pthread_self());	
				      pthread_exit(NULL);

				}
                filesize += bufsize;
                while (bufsize > 0) {
                	net_send(myfd, databuf, bufsize);
                	bufsize = Readn_r(datafd, databuf, MAX_BUF);
                	if (bufsize < 0) {
                  	close(myfd);
		           		log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
						   log4c_appender_set_udata(myappender, log_fd);
						   mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR, 
                        " [1003][id %d] Read of data file failed. FATAL exiting %d", 
                        holdid, (int)pthread_self());	
				       	pthread_exit(NULL);
						}
					filesize += bufsize;
					log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
					log4c_appender_set_udata(myappender, log_fd);
					mylog_log("DjsMaster", LOG4C_PRIORITY_DEBUG, 
                  " [1000][id %d] Sending file %s with size=%d to %d", 
                  holdid, filepath_full, filesize, (int)pthread_self());	
				       	
                }
                close(datafd);
			}

			log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
			log4c_appender_set_udata(myappender, log_fd);
			mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
            " [1000][id %d] Reading next message from %d", 
            holdid, (int)pthread_self());	
		    Readline_r(myfd, sockread, 255);
        } 
		

	  	else if (strcmp (sockread, FAIL_JOB) == 0) {
			/* We get a job failed from the slave */
         Readline_r(myfd, sockread, 255);
         strncpy(jobnum, sockread, 10);
			id = atoi(jobnum);
			Readline_r(myfd, sockread, 255);
			if ( count <= max_failure) {
				log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				log4c_appender_set_udata(myappender, log_fd);
				mylog_log("DjsMaster", LOG4C_PRIORITY_WARN, 
               " [1004][id %d] Job on %d failed. Will retry until conv_attemtps_cnt <= max_failure. Processing AGAIN",
               id, (int)pthread_self());	
				success_flag = 1;
            reprocess_flag = 1;
				goto reprocess;
			}
			else {
				log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				log4c_appender_set_udata(myappender, log_fd);
				mylog_log("DjsMaster", LOG4C_PRIORITY_WARN, 
               " [1005][id %d] Job on %d failed. Exceeded max_failure. Will NOT RETRY.", 
               id, (int)pthread_self());	
				success_flag = 0;
				continue;
			}
			//continue;
		}
		else if (strcmp(sockread, FINISH_JOB) == 0) {
            
         /* We get a clean job finish from the slave */
         Readline_r(myfd, sockread, 255);
         tempstr_p = tempstr;
         jobnum_p = strtok_r(sockread, SEPARATOR3, &tempstr_p);
         strcpy(jobnum, jobnum_p);
			for (i = 0; i < (int)strlen(jobnum); i++) {
           	if (jobnum[i] == '\n') {
              		jobnum[i] = '\0';
           	}
         }
			jobid = atoi(jobnum);
         returnname_p = strtok_r(NULL, SEPARATOR3, &tempstr_p);
         returnsize_p = strtok_r(NULL, SEPARATOR3, &tempstr_p);
			log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
			log4c_appender_set_udata(myappender, log_fd);
			mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
            " [1000][id %d] Got a clean job finish from %d. Image conversion successful", 
            jobid, (int)pthread_self());	
			/* check if there is a file being returned */
            if (returnname_p != NULL) {
            	/* We are getting a file back */
            	return_file = strdup(returnname_p);
            	returnsize = atoi(returnsize_p);
            	log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
			   	log4c_appender_set_udata(myappender, log_fd);
				   mylog_log("DjsMaster", LOG4C_PRIORITY_DEBUG, 
                  " [1000][id %d] Getting back converted file %s : size of file %d", 
                  jobid, returnname_p, returnsize);	
				   filesize = 0;
            	if ((datafd = open(returnname_p, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR 
							       | S_IWUSR | S_IRGRP | S_IROTH)) < 0) {
               	perror("Failure opening data file");
               	close(myfd);
	           		log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
					   log4c_appender_set_udata(myappender, log_fd);
					   mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR, 
                     " [1003][id %d] Failure opening converted image file. FATAL exiting %d", 
                     jobid, (int)pthread_self());	
				      pthread_exit(NULL);
            	}
            	while (filesize < returnsize) {
                	bufsize = net_recv(myfd, databuf, MAX_BUF);
                	if (bufsize < 0) {
                  	perror("Error reading from the network");
                  	close(myfd);
                  	close(datafd);
		           		log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
						   log4c_appender_set_udata(myappender, log_fd);
						   mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR, 
                        " [1006][id %d] Error reading from the network. FATAL exiting %d", 
                        jobid, (int)pthread_self());	
				    	   pthread_exit(NULL);
                	}
                	Writen_r(datafd, databuf, bufsize);
                	filesize += bufsize;
               }
               close(datafd);
               sprintf(sockwrite, "%s", FILE_DONE);
               log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				   log4c_appender_set_udata(myappender, log_fd);
				   mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
                  " [1000][id %d] Job processed successfully. File save to location. Wiriting to %d %s", 
                  jobid, (int)pthread_self(), sockwrite);	
				   Writen_r(myfd, sockwrite, strlen(sockwrite));
                if (return_file != NULL) {
                	free(return_file);
                }
            } 
			else {
            	return_file = NULL;
            }
        if ( (update = update_tables (jobid)) < 0) {
			   log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
			   log4c_appender_set_udata(myappender, log_fd);
			   mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
               " [1008][id %d] Error Updating all tables after conversion.", jobid);	
			   Readline_r(myfd, sockread, 255);
			   if ( count <= max_failure) {
				   log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				   log4c_appender_set_udata(myappender, log_fd);
				   mylog_log("DjsMaster", LOG4C_PRIORITY_WARN, 
                  " [1004][id %d] Job on %d failed."
                  "Will retry until conv_attemtps_cnt <= max_failure. Processing AGAIN",
                  jobid, (int)pthread_self());	
				   success_flag = 1;
               reprocess_flag = 1;
				   goto reprocess;
			   }
			   else {
				   log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
				   log4c_appender_set_udata(myappender, log_fd);
				   mylog_log("DjsMaster", LOG4C_PRIORITY_WARN, 
                  " [1005][id %d] Job on %d failed. Exceeded max_failure. Will NOT RETRY.", 
                  jobid, (int)pthread_self());	
				   success_flag = 0;
				   continue;
			   }
		  }
		  else {
			   log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
			   log4c_appender_set_udata(myappender, log_fd);
			   mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE, 
               " [1000][id %d] Successful Update. Tables updated after conversion.", jobid);	
			   Readline_r(myfd, sockread, 255);
			   success_flag = 0;
			   continue;
        }
      }
    }
    
   log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
	log4c_appender_set_udata(myappender, log_fd);
	mylog_log("DjsMaster", LOG4C_PRIORITY_WARN, " [1007] Thread %d got slave finished", (int)pthread_self());	
	mylog_msg("DjsMaster", LOG4C_PRIORITY_WARN, " [1007] Exiting thread...");				    	
	fclose(log_fd);
	close(myfd);
   pthread_exit(NULL);
	return NULL;
}

int update_tables (int job_id) {
	
	OCIError*     err;
	OCIServer*    srv;
	OCISvcCtx*    svc;
	OCISession*   ses;
	sword r;
	OCIBind 	*bnd1hp = (OCIBind*) 0;
	int oci_error = 0;
   
	FILE *log_fd;
	OCIStmt* updth2;
	OCIStmt* comm;

	text stmt_comm[]="commit";
	/*  SQL Proc to update tables  */
	text stmt[] = "BEGIN P_POST_CONV_IMAGE(:1);END;";									";
	log4c_appender_t* myappender;
	myappender = log4c_appender_get("myappender");
	log_fd = fopen(log_file, "a");
	log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
	log4c_appender_set_udata(myappender, log_fd);
	mylog_log("Oracle", LOG4C_PRIORITY_NOTICE, " [2000][id %d] Executing PL/SQL to update all tables", job_id);	
	
	OCIHandleAlloc(env, (dvoid**)&err, OCI_HTYPE_ERROR,   0, 0);
	OCIHandleAlloc(env, (dvoid**)&srv, OCI_HTYPE_SERVER,  0, 0);
	OCIHandleAlloc(env, (dvoid**)&svc, OCI_HTYPE_SVCCTX,  0, 0);
	OCIHandleAlloc(env, (dvoid**)&ses, OCI_HTYPE_SESSION, 0, 0);
	r=OCIServerAttach(srv, err, dbname, strlen((const char*)dbname), (ub4) OCI_DEFAULT);
	if (r != OCI_SUCCESS) {
		checkerrr(err, r, err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
		
	}

	OCIAttrSet(svc, OCI_HTYPE_SVCCTX, srv, 0, OCI_ATTR_SERVER,  err);
	OCIAttrSet(ses, OCI_HTYPE_SESSION, username, 
         strlen((const char*)username), OCI_ATTR_USERNAME, err); 
	OCIAttrSet(ses, OCI_HTYPE_SESSION, password, 
         strlen((const char*)password), OCI_ATTR_PASSWORD, err); 

	if ( (r=OCIAttrSet((dvoid *) svc, (ub4) OCI_HTYPE_SVCCTX,
         (dvoid *) ses, (ub4) 0, (ub4) OCI_ATTR_SESSION, err)) ) {
		checkerrr(err, r, err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
	}

	r=OCISessionBegin (svc, err, ses, OCI_CRED_RDBMS, OCI_DEFAULT);
	if (r != OCI_SUCCESS) {
		checkerrr(err, r, err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
	}
			
	if (OCIHandleAlloc((dvoid *) env, (dvoid **) &updth2, 
         (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
		checkerrr(err, r, err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
	}
	if ((r=OCIStmtPrepare(updth2, err, (text*)(void*)stmt, 
         (ub4) strlen((char *) stmt), (ub4) OCI_NTV_SYNTAX,
         (ub4) OCI_DEFAULT))) {
		checkerrr(err, r, err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
	}

	if ((r = OCIBindByPos(updth2, &bnd1hp, err, (ub4) 1, 
            &job_id, sizeof(job_id), SQLT_INT,
            (dvoid *) 0, (ub2 *) 0, (ub2 *) 0, 
            (ub4) 0, (ub4 *) 0, OCI_DEFAULT))){
      	checkerrr(err, r, (text*)err_msg);
        oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
    }
	if ((r = OCIStmtExecute(svc, updth2, err, (ub4) 1, (ub4) 0, (CONST OCISnapshot *) 0,
                               (OCISnapshot *) 0, (ub4) OCI_DEFAULT))){
                 
	     checkerrr(err, r, (text*)err_msg);
         OCIHandleFree((dvoid *) updth2, (ub4) OCI_HTYPE_STMT);
         oci_error = 1;
		 //fclose(log_fd);
		 cleanup();
		 //return -1;
    }

	if ((r=OCIHandleAlloc((dvoid *) env, (dvoid **) &comm, 
            (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
	}

	if ((r=OCIStmtPrepare(comm, err, (text*)(void*)stmt_comm, 
            (ub4) strlen((char *) stmt_comm),
            (ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
	}
	if ((r=OCIStmtExecute(svc, comm, err, (ub4) 1, 
            (ub4) 0, (CONST OCISnapshot *) 0, 
            (OCISnapshot *) 0, (ub4) OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
	}

	OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
	OCIHandleFree((dvoid *) updth2, (ub4) OCI_HTYPE_STMT);
	if ((r = OCISessionEnd(svc, err, ses, OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
    }
    if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
    }
	if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
    }
    if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
    }
    if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
		checkerrr(err, r, (text*)err_msg);
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
    }
    if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
		oci_error = 1;
		//fclose(log_fd);
		cleanup();
		//return -1;
    }		
	
	fclose(log_fd);
   if (oci_error == 1) {
      return -1;
   }
   else {
      return 1;
   }
}
and here is the stack trace

bash-3.00# pstack 4755
4755: ./
----------------- lwp# 1 / thread# 1 --------------------
fe6c4f08 accept (0, ffbff7a0, ffbff7b0, 1)
00013140 main (2, ffbff9b4, ffbff9c0, 2f490, fe560100, fe560140) +
d7c
00011d64 _start (0, 0, 0, 0, 0, 0) + 5c
----------------- lwp# 2 / thread# 2 -------------------- fe6c4a30
lwp_park (0, 0, 0)
fe9ae0c4 kpuhhfre (94c88, 12c18c, ff1c27b8, a580c, 1a18, 0) + 238
fea43188 kwfcinit (a20bc, 2f420, 5, fe4792c4, fe4792c0, 94c88) + 168
fe9828f4 kpuatch (0, 94c88, a20cc, 0, a1fbc, 0) + 848
fe96eb78 OCIServerAttach (a1f7c, 13e978, 2f420, 5, 0, 5) + 7c 00019b80
update_tables (10d433, 1000, fe6f3700, fe540200, fe6f1818,
ffffebb4) + 1b0
0001959c jobfunction (34b80, fe47c000, 0, 0, 138e8, 1) + 5cb4 fe6c4990
_lwp_start (0, 0, 0, 0, 0, 0)
----------------- lwp# 3 / thread# 3 -------------------- fe6c4a30
lwp_park (0, 0, 0)
fe9c74f8 kpufhndl0 (1604, 1604, ff21805c, ffffffff, 0, 0) + 2228
0001a224 update_tables (10d434, 1000, fe6f3700, fe540a00, fe6f1818,
ffffebb4) + 854
0001959c jobfunction (34b70, fe37c000, 0, 0, 138e8, 1) + 5cb4 fe6c4990
_lwp_start (0, 0, 0, 0, 0, 0)
----------------- lwp# 4 / thread# 4 -------------------- fe6c4a30
lwp_park (0, 0, 0) fe9ab6e0 kpuhhaloc (94c88, 550, fe279328, 1004000,
ff208a5e, fe6f3470)
+ 26c
fe9c46e0 kpughndl0 (2, 0, 0, f8e9dacb, f8e9d800, 0) + 3e8 00019ad0
update_tables (10d435, 1000, fe6f3700, fe541200, fe6f1818,
ffffebb4) + 100
0001959c jobfunction (34e20, fe27c000, 0, 0, 138e8, 1) + 5cb4 fe6c4990
_lwp_start (0, 0, 0, 0, 0, 0)
----------------- lwp# 5 / thread# 5 -------------------- fe6c4a30
lwp_park (0, 0, 0) fe9ab6e0 kpuhhaloc (94c88, 550, fe179328, 1004000,
ff208a5e, fe6f3470)
+ 26c
fe9c46e0 kpughndl0 (2, 0, 0, f8e9dacb, f8e9d800, 0) + 3e8 00019ad0
update_tables (10d436, 1000, fe6f3700, fe541a00, fe6f1818,
ffffebb4) + 100
0001959c jobfunction (34ee0, fe17c000, 0, 0, 138e8, 1) + 5cb4 fe6c4990
_lwp_start (0, 0, 0, 0, 0, 0)

I have tried creating a new environment handle in update_tables function and free it on return from the function, but that did not resolve the issue as well.
Any help or suggestion is appreciated
Thanks
Comments
Locked Post
New comments cannot be posted to this locked post.
Post Details
Locked on May 27 2008
Added on Apr 23 2008
4 comments
2,116 views