HI,
I have a multithreaded application which uses OCI. The application has a master thread and spwans 4 child threads that connect to the db and execute specific queries. The problem occurs while trying to execute the last PL/SQL command which includes some bfile operations. I had read that OCI does not mutex properly on LOB operations, but I do not get any error, the application just hangs. Attached is part of the code that has all OCI calls.
The culprit is the update_tables function. The application hangs in there when processing is very close in time for threads.
[] 20080411 10:40:52.255 Oracle NOTICE - [2000][id 1093229] Executing PL/SQL to update all tables
[] 20080411 10:40:52.259 Oracle NOTICE - [2000][id 1093230] Executing PL/SQL to update all tables
[] 20080411 10:40:52.321 Oracle NOTICE - [2000][id 1093231] Executing PL/SQL to update all tables
this is where is hangs.
//!This program is the server portion of the distributed job system.
#include "application.h"
#include "log4cwrapper.h"
//#include "sql.h"
#include <oci.h>
OCIEnv* env;
int main(int argc, char* argv[]){
OCIError* err;
OCIServer* srv;
OCISvcCtx* svc;
OCISession* ses;
sword r;
OCIStmt* initslc;
OCIStmt* comm;
OCIBind *bnd1hp = (OCIBind*) 0;
char stmt[]="commit";
char update_initial[] ="update CONV_IMAGE_JOBS set conv_attempts_cnt = :1";
FILE *log_fd_main;
int sockfd, logfd, pidfd, count = 0, oci_error = 0;
pthread_t job_thread; //!<Threads for image conversion
pthread_attr_t thread_attr; //!<Thread attribute
int *clientfd; //!<Non-negative file descriptor for new socket
pthread_mutexattr_t mattr; //!<Mutex attribute used to set the properties of the mutex
char masterpid[100];
log_fd_main = fopen(log_file, "a");
if (log_fd_main == NULL) {
printf(" ERROR - DjsMaster [0002] Failed to open log file.\n");
exit(1);
}
r=OCIEnvCreate( &env,
OCI_THREADED
, 0, 0, 0, 0, 0, 0);
if (r != OCI_SUCCESS) {
strcpy(err_msg,"Couldn't create environment (OCIEnvCreate)");
oci_error = 1;
cleanup();
}
OCIHandleAlloc(env, (dvoid**)&err, OCI_HTYPE_ERROR, 0, 0);
OCIHandleAlloc(env, (dvoid**)&srv, OCI_HTYPE_SERVER, 0, 0);
OCIHandleAlloc(env, (dvoid**)&svc, OCI_HTYPE_SVCCTX, 0, 0);
OCIHandleAlloc(env, (dvoid**)&ses, OCI_HTYPE_SESSION, 0, 0);
r=OCIServerAttach(srv, err, dbname, strlen((const char*)dbname), (ub4) OCI_DEFAULT);
if (r != OCI_SUCCESS) {
checkerrr(err, r, err_msg);
oci_error = 1;
cleanup();
}
OCIAttrSet(svc, OCI_HTYPE_SVCCTX, srv, 0, OCI_ATTR_SERVER, err);
OCIAttrSet(ses, OCI_HTYPE_SESSION, username,
strlen((const char*)username), OCI_ATTR_USERNAME, err);
OCIAttrSet(ses, OCI_HTYPE_SESSION, password,
strlen((const char*)password), OCI_ATTR_PASSWORD, err);
if ( (r=OCIAttrSet((dvoid *) svc, (ub4) OCI_HTYPE_SVCCTX,
(dvoid *) ses, (ub4) 0, (ub4) OCI_ATTR_SESSION, err)) ) {
checkerrr(err, r,err_msg);
oci_error = 1;
cleanup();
}
r=OCISessionBegin (svc, err, ses, OCI_CRED_RDBMS, OCI_DEFAULT);
if (r != OCI_SUCCESS) {
checkerrr(err, r,err_msg);
oci_error = 1;
cleanup();
}
if (OCIHandleAlloc((dvoid *) env, (dvoid **) &initslc,
(ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r=OCIStmtPrepare(initslc, err, (text*)(void*)update_initial,
(ub4) strlen((char *) update_initial),
(ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) initslc, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
}
int coun=0;
if ((r = OCIBindByPos(initslc, &bnd1hp, err, (ub4) 1, &coun, sizeof(count), SQLT_INT,
(dvoid *) 0, (ub2 *) 0, (ub2 *) 0, (ub4) 0, (ub4 *) 0, OCI_DEFAULT))){
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r = OCIStmtExecute(svc, initslc, err, (ub4) 1, (ub4) 0, (CONST OCISnapshot *) 0,
(OCISnapshot *) 0, (ub4) OCI_DEFAULT))){
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) sh, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
}
if ((r=OCIHandleAlloc((dvoid *) env, (dvoid **) &comm,
(ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r=OCIStmtPrepare(comm, err, (text*)(void*)stmt,
(ub4) strlen((char *) stmt), (ub4) OCI_NTV_SYNTAX,
(ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
}
if ((r=OCIStmtExecute(svc, comm, err, (ub4) 1, (ub4) 0,
(CONST OCISnapshot *) 0, (OCISnapshot *) 0,
(ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
}
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
OCIHandleFree((dvoid *) initslc, (ub4) OCI_HTYPE_STMT);
if ((r = OCISessionEnd(svc, err, ses, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
oci_error = 1;
cleanup();
}
if ( oci_error == 1) {
cleanup();
}
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd_main);
mylog_msg("DjsMaster", LOG4C_PRIORITY_DEBUG,
" [0000] daemon running, ready to accept connections,"
"waiting for to connect......");
fclose(log_fd_main);
while (1) {
/*!
Accept a connection from a slave.
*/
clientfd = malloc(sizeof(int));
if ((*clientfd = serv_accept(sockfd)) < 0) {
exit(1);
}
/*!
Fork off a child thread to process this slave.
*/
pthread_create(&job_thread, &thread_attr, jobfunction, clientfd);
//count++;
sleep(1);
}
}
//!Dump Oracle Error
void cleanup() {
log4c_appender_t* myappender;
myappender = log4c_appender_get("myappender");
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, fopen(log_file, "a"));
mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR,
" [2001] Oracle error %s", err_msg);
}
void *jobfunction(void *arg) {
OCIError* err;
OCIServer* srv;
OCISvcCtx* svc;
OCISession* ses;
sword r;
OCIBind *bnd1hp = (OCIBind*) 0;
OCIBind *bnd2hp = (OCIBind*) 0;
OCIBind *bnd3hp = (OCIBind*) 0;
OCIDefine *def1hp = (OCIDefine *) 0;
OCIDefine *def2hp = (OCIDefine *) 0;
OCIDefine *def3hp = (OCIDefine *) 0;
OCIDefine *def4hp = (OCIDefine *) 0;
OCIDefine *def5hp = (OCIDefine *) 0;
OCIDefine *def6hp = (OCIDefine *) 0;
OCIDefine *def7hp = (OCIDefine *) 0;
OCIDefine *def8hp = (OCIDefine *) 0;
OCIDefine *def9hp = (OCIDefine *) 0;
char select_all[] =" select filename, filepath, image_system_id, quality_status_cd,"
" format_cd,global_id, id, conv_attempts_cnt, image_resolution"
" from TABLE_NAME where conv_attempts_cnt = 0 order by created_datetime ";
char select_count[] ="select conv_attempts_cnt, image_resolution from TABLE_NAME"
" where id = :1 ";
char update_single[] ="update TABLENAME set conv_attempts_cnt = :1 where id = :2 ";
char stmt[]="commit";
int oci_error = 0;
/*!
Variables to hold the values returned from SQL SELECT
*/
int id, global_id, format_cd, quality_status;
int resolution, image_id, conv_attempts_cnt;
char filename[101];
char filepath[201];
char filepath_full[MAX_BUF];
char tif_filename[101];
char return_filepath[MAX_BUF];
char conv_details[MAX_BUF];
char *hold_filename;
char *hold_filepath;
char *hold_tiffile;
char *progress;
char *tempstr_p;
char *jobnum_p;
char *returnname_p;
char *returnsize_p;
char tempstr[MAX_BUF];
int count = 0;
int success_flag = 0;
int reprocess_flag = 0;
char s_resolution[5];
char s_format[5];
struct stat filebuf;
int myfd, datafd, jobid, i, update, holdid;
char sockread[1024], sockwrite[1024],
jobnum[10], dataname[1024],
line_to_log[1024], databuf[1024];
int datasize, bufsize, returnsize, filesize;
FILE *log_fd;
log4c_appender_t* myappender;
myappender = log4c_appender_get("myappender");
log_fd = fopen(log_file, "a");
if (log_fd == NULL) {
perror(" ERROR - DjsMaster [1000] Failed to open log file.\n");
exit(1);
}
myfd = *((int *)arg);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000] Starting jobs on %d.",
(int)pthread_self());
Readline_r(myfd, sockread, 255);
/*!
Continue to read requests received from the slaves.
*/
while (strcmp(sockread, SLAVE_DONE) != 0) {
OCIStmt* slcth;
OCIStmt* updth;
OCIStmt* slcth2;
OCIStmt* comm;
sleep(15);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000] Continue to read request from %d, looping with %s ",
(int)pthread_self(), sockread);
if (strcmp(sockread, REQUEST_JOB) == 0) {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000] %d requesting job. Connecting to database.",
(int)pthread_self());
if (oci_error == 1) {
if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
cleanup();
}
if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
checkerrr(err, r, (text*)err_msg);
cleanup();
}
if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
checkerrr(err, r, (text*)err_msg);
cleanup();
}
if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
checkerrr(err, r, (text*)err_msg);
cleanup();
}
if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
cleanup();
}
oci_error = 0;
}
OCIHandleAlloc(env, (dvoid**)&err, OCI_HTYPE_ERROR, 0, 0);
OCIHandleAlloc(env, (dvoid**)&srv, OCI_HTYPE_SERVER, 0, 0);
OCIHandleAlloc(env, (dvoid**)&svc, OCI_HTYPE_SVCCTX, 0, 0);
OCIHandleAlloc(env, (dvoid**)&ses, OCI_HTYPE_SESSION, 0, 0);
r=OCIServerAttach(srv, err, dbname,
strlen((const char*)dbname), (ub4) OCI_DEFAULT);
if (r != OCI_SUCCESS) {
checkerrr(err, r, err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
OCIAttrSet(svc, OCI_HTYPE_SVCCTX, srv, 0, OCI_ATTR_SERVER, err);
OCIAttrSet(ses, OCI_HTYPE_SESSION, username,
strlen((const char*)username), OCI_ATTR_USERNAME, err);
OCIAttrSet(ses, OCI_HTYPE_SESSION, password,
strlen((const char*)password), OCI_ATTR_PASSWORD, err);
if ( (r=OCIAttrSet((dvoid *) svc, (ub4) OCI_HTYPE_SVCCTX,
(dvoid *) ses, (ub4) 0, (ub4) OCI_ATTR_SESSION, err)) ) {
checkerrr(err, r,err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
r=OCISessionBegin (svc, err, ses, OCI_CRED_RDBMS, OCI_DEFAULT);
if (r != OCI_SUCCESS) {
checkerrr(err, r,err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if (OCIHandleAlloc((dvoid *) env, (dvoid **) &slcth,
(ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIStmtPrepare(slcth, err, (text*)(void*)select_all,
(ub4) strlen((char *) select_all),
(ub4) OCI_NTV_SYNTAX,
(ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) slcth, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth, &def1hp, err, 1,
filename, 100, SQLT_CHR, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth, &def2hp, err, 2,
filepath, 100, SQLT_CHR, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth, &def3hp, err, 3,
&image_id, sizeof(int), SQLT_INT, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth, &def4hp, err, 4,
&quality_status, sizeof(int), SQLT_INT,
(dvoid *) 0, (ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth, &def5hp, err, 5,
&format_cd, sizeof(int), SQLT_INT, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth, &def6hp, err, 6,
&global_id, sizeof(int), SQLT_INT, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth, &def7hp, err, 7,
&id, sizeof(int), SQLT_INT, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth, &def8hp, err, 8,
&conv_attempts_cnt, sizeof(int), SQLT_INT, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
}
if ((r=OCIDefineByPos(slcth, &def9hp, err, 9,
&resolution, sizeof(int), SQLT_INT, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIStmtExecute(svc, slcth, err, (ub4) 0, (ub4) 0,
(CONST OCISnapshot *) 0, (OCISnapshot *) 0,
(ub4) OCI_DEFAULT))){
checkerr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) slcth, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
r = OCIStmtFetch(slcth, err, (ub4) 1, (ub4) OCI_FETCH_NEXT, (ub4) OCI_DEFAULT);
if (r == OCI_SUCCESS) {
success_flag = 1;
reprocess_flag = 0;
}
else if (r == OCI_SUCCESS_WITH_INFO) {
success_flag = 1;
reprocess_flag = 0;
}
else {
if (r != OCI_NO_DATA) {
checkerr(err, r, (text*)err_msg);
success_flag = 0;
}
success_flag = 0;
}
OCIHandleFree((dvoid *) slcth, (ub4) OCI_HTYPE_STMT);
/*!
No job in the table. Continue listening. Tell the slave
there are no current jobs to process.
*/
if (! (success_flag)) {
if ((r = OCISessionEnd(svc, err, ses, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
oci_error = 1;
cleanup();
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000] No new jobs, writing to %d %s",
(int)pthread_self(), sockwrite);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
success_flag = 0;
continue;
}
/* New job is fetched, process it. */
else {
reprocess_flag = 0;
reprocess: log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("Oracle", LOG4C_PRIORITY_DEBUG,
" [2000][id %d] Fetching data for new job from databse", id);
mylog_log("Oracle", LOG4C_PRIORITY_DEBUG,
" [2000][id %d]Fetched job_id=%d, format=%d, resolution=%d, conv_attemtps=%d, imagefile=%s, imagepath=%s",
id, id, format_cd, resolution,
conv_attempts_cnt, filename, filepath);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000][id %d] New job for %d. Job will be processed.",
id, (int)pthread_self());
holdid = id;
if ( reprocess_flag == 1 )
{
OCIHandleAlloc(env, (dvoid**)&err, OCI_HTYPE_ERROR, 0, 0);
OCIHandleAlloc(env, (dvoid**)&srv, OCI_HTYPE_SERVER, 0, 0);
OCIHandleAlloc(env, (dvoid**)&svc, OCI_HTYPE_SVCCTX, 0, 0);
OCIHandleAlloc(env, (dvoid**)&ses, OCI_HTYPE_SESSION, 0, 0);
r=OCIServerAttach(srv, err, dbname, strlen((const char*)dbname), (ub4) OCI_DEFAULT);
if (r != OCI_SUCCESS) {
checkerrr(err, r, err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
OCIAttrSet(svc, OCI_HTYPE_SVCCTX, srv, 0, OCI_ATTR_SERVER, err);
OCIAttrSet(ses, OCI_HTYPE_SESSION, username, strlen((const char*)username), OCI_ATTR_USERNAME, err);
OCIAttrSet(ses, OCI_HTYPE_SESSION, password, strlen((const char*)password), OCI_ATTR_PASSWORD, err);
if ( (r=OCIAttrSet((dvoid *) svc, (ub4) OCI_HTYPE_SVCCTX, (dvoid *) ses, (ub4) 0, (ub4) OCI_ATTR_SESSION, err)) ) {
checkerrr(err, r,err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
r=OCISessionBegin (svc, err, ses, OCI_CRED_RDBMS, OCI_DEFAULT);
if (r != OCI_SUCCESS) {
checkerrr(err, r,err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
}
if (OCIHandleAlloc((dvoid *) env, (dvoid **) &slcth2,
(ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIStmtPrepare(slcth2, err, (text*)(void*)select_count,
(ub4) strlen((char *) select_count),
(ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) slcth2, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIDefineByPos(slcth2, &def1hp, err, 1,
&conv_attempts_cnt, sizeof(int), SQLT_INT, (dvoid *) 0,
(ub2 *) 0, (ub2 *) 0, OCI_DEFAULT))) {
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIBindByPos(slcth2, &bnd1hp, err, (ub4) 1,
&holdid, sizeof(int), SQLT_INT,
(dvoid *) 0, (ub2 *) 0, (ub2 *) 0, (ub4) 0,
(ub4 *) 0, OCI_DEFAULT))){
checkerr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIStmtExecute(svc, slcth2, err, (ub4) 0, (ub4) 0,
(CONST OCISnapshot *) 0,
(OCISnapshot *) 0, (ub4) OCI_DEFAULT))){
checkerr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) slcth2, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
r = OCIStmtFetch(slcth2, err, (ub4) 1,
(ub4) OCI_FETCH_NEXT, (ub4) OCI_DEFAULT);
if (r == OCI_SUCCESS) {
success_flag = 1;
}
else if (r == OCI_SUCCESS_WITH_INFO) {
success_flag = 1;
}
else {
if (r != OCI_NO_DATA) {
checkerr(err, r, (text*)err_msg);
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
OCIHandleFree((dvoid *) slcth2, (ub4) OCI_HTYPE_STMT);
count = conv_attempts_cnt;
count++;
/* Updating conv_attemtpts_cnt for the current job */
if (OCIHandleAlloc((dvoid *) env, (dvoid **) &updth,
(ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIStmtPrepare(updth, err, (text*)(void*)update_single,
(ub4) strlen((char *) update_single), (ub4) OCI_NTV_SYNTAX,
(ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) updth, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIBindByPos(updth, &bnd2hp, err, (ub4) 1, &count,
sizeof(count), SQLT_INT, (dvoid *) 0, (ub2 *) 0,
(ub2 *) 0, (ub4) 0, (ub4 *) 0, OCI_DEFAULT))){
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIBindByPos(updth, &bnd3hp, err, (ub4) 2, &holdid,
sizeof(holdid), SQLT_INT, (dvoid *) 0, (ub2 *) 0,
(ub2 *) 0, (ub4) 0, (ub4 *) 0, OCI_DEFAULT))){
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIStmtExecute(svc, updth, err, (ub4) 1, (ub4) 0,
(CONST OCISnapshot *) 0, (OCISnapshot *) 0,
(ub4) OCI_DEFAULT))){
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) updth, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIHandleAlloc((dvoid *) env, (dvoid **) &comm,
(ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIStmtPrepare(comm, err, (text*)(void*)stmt,
(ub4) strlen((char *) stmt),
(ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r=OCIStmtExecute(svc, comm, err, (ub4) 1,
(ub4) 0, (CONST OCISnapshot *) 0, (OCISnapshot *) 0,
(ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
OCIHandleFree((dvoid *) updth, (ub4) OCI_HTYPE_STMT);
if ((r = OCISessionEnd(svc, err, ses, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
oci_error = 1;
cleanup();
success_flag = 0;
sprintf(sockwrite, "%s", NO_JOB);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
continue;
}
hold_filename = strtok_r(filename, " ", &progress);
strcpy(filename,hold_filename);
strcpy(tif_filename, filename);
hold_tiffile = strtok_r(tif_filename, ".", &progress);
strcpy(tif_filename, hold_tiffile);
strcat(tif_filename, ".tif");
hold_filepath = strtok_r(filepath, " ", &progress);
strcpy(filepath, hold_filepath);
strcpy(return_filepath, filepath);
strcat(return_filepath, "/");
strcat(return_filepath, filename);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_DEBUG,
" [1000][id %d] Image %s located in %s will be converted to image filename %s ",
holdid, tif_filename, filepath, filename);
if (tif_filename == NULL) {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_WARN,
" [1001][id %d] No file to convert. No file name associated with the job",
holdid);
sprintf(sockwrite, "%s", NO_JOB);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000][id %d] No job to perform. Writing to %d %s",
holdid, (int)pthread_self(), sockwrite);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
if ( count <= max_failure ) {
success_flag = 1;
reprocess_flag = 1;
goto reprocess;
}
else {
success_flag = 0;
continue;
}
//continue;
}
strcpy(filepath_full, filepath);
strcat(filepath_full, "/");
strcat(filepath_full, tif_filename);
if (stat(filepath_full, &filebuf) < 0) {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_WARN,
" [1002][id %d] Could not stat the file %s. Nothing to send to %d ",
holdid, filepath_full, (int)pthread_self());
sprintf (sockwrite, "%s", NO_JOB);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000][id %d] No job to perform. Writing to %d %s",
holdid, (int)pthread_self(), sockwrite);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
Readline_r(myfd, sockread, 255);
if ( count <= max_failure ) {
success_flag = 1;
reprocess_flag = 1;
goto reprocess;
}
else {
success_flag = 0;
continue;
}
//continue;
}
datasize = (int)filebuf.st_size;
sprintf(s_resolution, "%d", resolution);
sprintf(s_format, "%d", format_cd);
strcpy(conv_details, "./conv.sh:");
strcat(conv_details, tif_filename);
strcat(conv_details, ":");
strcat(conv_details, s_resolution);
strcat(conv_details, ":");
strcat(conv_details, s_format);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000][id %d] Writing to socket all conversion details for %d ",
holdid, (int)pthread_self());
sprintf(sockwrite, "%d=%s%%%s%%%d%%%s\n", id, conv_details,
tif_filename, datasize, return_filepath);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_DEBUG, " [1000][id %d] Sending %s",
holdid, sockwrite);
Writen_r(myfd, sockwrite, (int)strlen(sockwrite));
if ((datafd = open(filepath_full, O_RDONLY)) < 0) {
close(myfd);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR,
" [1003][id %d] Open of data file failed. FATAL exiting %d",
holdid, (int)pthread_self());
pthread_exit(NULL);
}
filesize = 0;
bufsize = Readn_r(datafd, databuf, MAX_BUF);
if (bufsize < 0) {
close(myfd);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR,
" [1003][id %d] Read of data file failed. FATAL exiting %d",
holdid, (int)pthread_self());
pthread_exit(NULL);
}
filesize += bufsize;
while (bufsize > 0) {
net_send(myfd, databuf, bufsize);
bufsize = Readn_r(datafd, databuf, MAX_BUF);
if (bufsize < 0) {
close(myfd);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR,
" [1003][id %d] Read of data file failed. FATAL exiting %d",
holdid, (int)pthread_self());
pthread_exit(NULL);
}
filesize += bufsize;
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_DEBUG,
" [1000][id %d] Sending file %s with size=%d to %d",
holdid, filepath_full, filesize, (int)pthread_self());
}
close(datafd);
}
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000][id %d] Reading next message from %d",
holdid, (int)pthread_self());
Readline_r(myfd, sockread, 255);
}
else if (strcmp (sockread, FAIL_JOB) == 0) {
/* We get a job failed from the slave */
Readline_r(myfd, sockread, 255);
strncpy(jobnum, sockread, 10);
id = atoi(jobnum);
Readline_r(myfd, sockread, 255);
if ( count <= max_failure) {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_WARN,
" [1004][id %d] Job on %d failed. Will retry until conv_attemtps_cnt <= max_failure. Processing AGAIN",
id, (int)pthread_self());
success_flag = 1;
reprocess_flag = 1;
goto reprocess;
}
else {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_WARN,
" [1005][id %d] Job on %d failed. Exceeded max_failure. Will NOT RETRY.",
id, (int)pthread_self());
success_flag = 0;
continue;
}
//continue;
}
else if (strcmp(sockread, FINISH_JOB) == 0) {
/* We get a clean job finish from the slave */
Readline_r(myfd, sockread, 255);
tempstr_p = tempstr;
jobnum_p = strtok_r(sockread, SEPARATOR3, &tempstr_p);
strcpy(jobnum, jobnum_p);
for (i = 0; i < (int)strlen(jobnum); i++) {
if (jobnum[i] == '\n') {
jobnum[i] = '\0';
}
}
jobid = atoi(jobnum);
returnname_p = strtok_r(NULL, SEPARATOR3, &tempstr_p);
returnsize_p = strtok_r(NULL, SEPARATOR3, &tempstr_p);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000][id %d] Got a clean job finish from %d. Image conversion successful",
jobid, (int)pthread_self());
/* check if there is a file being returned */
if (returnname_p != NULL) {
/* We are getting a file back */
return_file = strdup(returnname_p);
returnsize = atoi(returnsize_p);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_DEBUG,
" [1000][id %d] Getting back converted file %s : size of file %d",
jobid, returnname_p, returnsize);
filesize = 0;
if ((datafd = open(returnname_p, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR
| S_IWUSR | S_IRGRP | S_IROTH)) < 0) {
perror("Failure opening data file");
close(myfd);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR,
" [1003][id %d] Failure opening converted image file. FATAL exiting %d",
jobid, (int)pthread_self());
pthread_exit(NULL);
}
while (filesize < returnsize) {
bufsize = net_recv(myfd, databuf, MAX_BUF);
if (bufsize < 0) {
perror("Error reading from the network");
close(myfd);
close(datafd);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_ERROR,
" [1006][id %d] Error reading from the network. FATAL exiting %d",
jobid, (int)pthread_self());
pthread_exit(NULL);
}
Writen_r(datafd, databuf, bufsize);
filesize += bufsize;
}
close(datafd);
sprintf(sockwrite, "%s", FILE_DONE);
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000][id %d] Job processed successfully. File save to location. Wiriting to %d %s",
jobid, (int)pthread_self(), sockwrite);
Writen_r(myfd, sockwrite, strlen(sockwrite));
if (return_file != NULL) {
free(return_file);
}
}
else {
return_file = NULL;
}
if ( (update = update_tables (jobid)) < 0) {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1008][id %d] Error Updating all tables after conversion.", jobid);
Readline_r(myfd, sockread, 255);
if ( count <= max_failure) {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_WARN,
" [1004][id %d] Job on %d failed."
"Will retry until conv_attemtps_cnt <= max_failure. Processing AGAIN",
jobid, (int)pthread_self());
success_flag = 1;
reprocess_flag = 1;
goto reprocess;
}
else {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_WARN,
" [1005][id %d] Job on %d failed. Exceeded max_failure. Will NOT RETRY.",
jobid, (int)pthread_self());
success_flag = 0;
continue;
}
}
else {
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_NOTICE,
" [1000][id %d] Successful Update. Tables updated after conversion.", jobid);
Readline_r(myfd, sockread, 255);
success_flag = 0;
continue;
}
}
}
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("DjsMaster", LOG4C_PRIORITY_WARN, " [1007] Thread %d got slave finished", (int)pthread_self());
mylog_msg("DjsMaster", LOG4C_PRIORITY_WARN, " [1007] Exiting thread...");
fclose(log_fd);
close(myfd);
pthread_exit(NULL);
return NULL;
}
int update_tables (int job_id) {
OCIError* err;
OCIServer* srv;
OCISvcCtx* svc;
OCISession* ses;
sword r;
OCIBind *bnd1hp = (OCIBind*) 0;
int oci_error = 0;
FILE *log_fd;
OCIStmt* updth2;
OCIStmt* comm;
text stmt_comm[]="commit";
/* SQL Proc to update tables */
text stmt[] = "BEGIN P_POST_CONV_IMAGE(:1);END;"; ";
log4c_appender_t* myappender;
myappender = log4c_appender_get("myappender");
log_fd = fopen(log_file, "a");
log4c_appender_set_type(myappender, log4c_appender_type_get("stream"));
log4c_appender_set_udata(myappender, log_fd);
mylog_log("Oracle", LOG4C_PRIORITY_NOTICE, " [2000][id %d] Executing PL/SQL to update all tables", job_id);
OCIHandleAlloc(env, (dvoid**)&err, OCI_HTYPE_ERROR, 0, 0);
OCIHandleAlloc(env, (dvoid**)&srv, OCI_HTYPE_SERVER, 0, 0);
OCIHandleAlloc(env, (dvoid**)&svc, OCI_HTYPE_SVCCTX, 0, 0);
OCIHandleAlloc(env, (dvoid**)&ses, OCI_HTYPE_SESSION, 0, 0);
r=OCIServerAttach(srv, err, dbname, strlen((const char*)dbname), (ub4) OCI_DEFAULT);
if (r != OCI_SUCCESS) {
checkerrr(err, r, err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
OCIAttrSet(svc, OCI_HTYPE_SVCCTX, srv, 0, OCI_ATTR_SERVER, err);
OCIAttrSet(ses, OCI_HTYPE_SESSION, username,
strlen((const char*)username), OCI_ATTR_USERNAME, err);
OCIAttrSet(ses, OCI_HTYPE_SESSION, password,
strlen((const char*)password), OCI_ATTR_PASSWORD, err);
if ( (r=OCIAttrSet((dvoid *) svc, (ub4) OCI_HTYPE_SVCCTX,
(dvoid *) ses, (ub4) 0, (ub4) OCI_ATTR_SESSION, err)) ) {
checkerrr(err, r, err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
r=OCISessionBegin (svc, err, ses, OCI_CRED_RDBMS, OCI_DEFAULT);
if (r != OCI_SUCCESS) {
checkerrr(err, r, err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if (OCIHandleAlloc((dvoid *) env, (dvoid **) &updth2,
(ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)) {
checkerrr(err, r, err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r=OCIStmtPrepare(updth2, err, (text*)(void*)stmt,
(ub4) strlen((char *) stmt), (ub4) OCI_NTV_SYNTAX,
(ub4) OCI_DEFAULT))) {
checkerrr(err, r, err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r = OCIBindByPos(updth2, &bnd1hp, err, (ub4) 1,
&job_id, sizeof(job_id), SQLT_INT,
(dvoid *) 0, (ub2 *) 0, (ub2 *) 0,
(ub4) 0, (ub4 *) 0, OCI_DEFAULT))){
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r = OCIStmtExecute(svc, updth2, err, (ub4) 1, (ub4) 0, (CONST OCISnapshot *) 0,
(OCISnapshot *) 0, (ub4) OCI_DEFAULT))){
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) updth2, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r=OCIHandleAlloc((dvoid *) env, (dvoid **) &comm,
(ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r=OCIStmtPrepare(comm, err, (text*)(void*)stmt_comm,
(ub4) strlen((char *) stmt_comm),
(ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r=OCIStmtExecute(svc, comm, err, (ub4) 1,
(ub4) 0, (CONST OCISnapshot *) 0,
(OCISnapshot *) 0, (ub4) OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
OCIHandleFree((dvoid *) comm, (ub4) OCI_HTYPE_STMT);
OCIHandleFree((dvoid *) updth2, (ub4) OCI_HTYPE_STMT);
if ((r = OCISessionEnd(svc, err, ses, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r = OCIServerDetach(srv, err, OCI_DEFAULT))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r = OCIHandleFree((dvoid*) ses, (ub4) OCI_HTYPE_SESSION))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r = OCIHandleFree((dvoid*) svc, (ub4) OCI_HTYPE_SVCCTX))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r = OCIHandleFree((dvoid*) srv, (ub4) OCI_HTYPE_SERVER))) {
checkerrr(err, r, (text*)err_msg);
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
if ((r = OCIHandleFree((dvoid*) err, (ub4) OCI_HTYPE_ERROR))) {
oci_error = 1;
//fclose(log_fd);
cleanup();
//return -1;
}
fclose(log_fd);
if (oci_error == 1) {
return -1;
}
else {
return 1;
}
}
and here is the stack trace
bash-3.00# pstack 4755
4755: ./
----------------- lwp# 1 / thread# 1 --------------------
fe6c4f08 accept (0, ffbff7a0, ffbff7b0, 1)
00013140 main (2, ffbff9b4, ffbff9c0, 2f490, fe560100, fe560140) +
d7c
00011d64 _start (0, 0, 0, 0, 0, 0) + 5c
----------------- lwp# 2 / thread# 2 -------------------- fe6c4a30
lwp_park (0, 0, 0)
fe9ae0c4 kpuhhfre (94c88, 12c18c, ff1c27b8, a580c, 1a18, 0) + 238
fea43188 kwfcinit (a20bc, 2f420, 5, fe4792c4, fe4792c0, 94c88) + 168
fe9828f4 kpuatch (0, 94c88, a20cc, 0, a1fbc, 0) + 848
fe96eb78 OCIServerAttach (a1f7c, 13e978, 2f420, 5, 0, 5) + 7c 00019b80
update_tables (10d433, 1000, fe6f3700, fe540200, fe6f1818,
ffffebb4) + 1b0
0001959c jobfunction (34b80, fe47c000, 0, 0, 138e8, 1) + 5cb4 fe6c4990
_lwp_start (0, 0, 0, 0, 0, 0)
----------------- lwp# 3 / thread# 3 -------------------- fe6c4a30
lwp_park (0, 0, 0)
fe9c74f8 kpufhndl0 (1604, 1604, ff21805c, ffffffff, 0, 0) + 2228
0001a224 update_tables (10d434, 1000, fe6f3700, fe540a00, fe6f1818,
ffffebb4) + 854
0001959c jobfunction (34b70, fe37c000, 0, 0, 138e8, 1) + 5cb4 fe6c4990
_lwp_start (0, 0, 0, 0, 0, 0)
----------------- lwp# 4 / thread# 4 -------------------- fe6c4a30
lwp_park (0, 0, 0) fe9ab6e0 kpuhhaloc (94c88, 550, fe279328, 1004000,
ff208a5e, fe6f3470)
+ 26c
fe9c46e0 kpughndl0 (2, 0, 0, f8e9dacb, f8e9d800, 0) + 3e8 00019ad0
update_tables (10d435, 1000, fe6f3700, fe541200, fe6f1818,
ffffebb4) + 100
0001959c jobfunction (34e20, fe27c000, 0, 0, 138e8, 1) + 5cb4 fe6c4990
_lwp_start (0, 0, 0, 0, 0, 0)
----------------- lwp# 5 / thread# 5 -------------------- fe6c4a30
lwp_park (0, 0, 0) fe9ab6e0 kpuhhaloc (94c88, 550, fe179328, 1004000,
ff208a5e, fe6f3470)
+ 26c
fe9c46e0 kpughndl0 (2, 0, 0, f8e9dacb, f8e9d800, 0) + 3e8 00019ad0
update_tables (10d436, 1000, fe6f3700, fe541a00, fe6f1818,
ffffebb4) + 100
0001959c jobfunction (34ee0, fe17c000, 0, 0, 138e8, 1) + 5cb4 fe6c4990
_lwp_start (0, 0, 0, 0, 0, 0)
I have tried creating a new environment handle in update_tables function and free it on return from the function, but that did not resolve the issue as well.
Any help or suggestion is appreciated
Thanks