Hi
I use message-driven bean to asynchronously send newsletters. To avoid restarting process from the beginning in case of any error, I save the recipients in database table, and remove row by row after email has been sent. To not mark my mail server as spammer I send it in chunks of 90 per minute. My recipients list is quite big, about 30k addresses. The code works perfect, but only few minutes... Then transaction is timed out, deleting of already sent recipients is not commited and the process begins again! That of course results in delivering the same message multiple times to the same recipients.
I tried setting autocommit to false and forcing commit before thread going sleep, but I kept getting error "cannot commit managed transaction".
Can anybody help?
Michal
int sentCount = 0;
con = locator.getPGDataSource().getConnection();
PreparedStatement stmt = con.prepareStatement("SELECT * " +
"FROM mail_recipient " +
"WHERE task_id = ? " +
"ORDER BY id ASC " +
"LIMIT " + CHUNK_SIZE + " ");
stmt.setInt(1, Integer.parseInt(id));
ResultSet rs = stmt.executeQuery();
StringBuffer sentIds = new StringBuffer();
while(rs.next()) {
if(!checkInProgressStatus(file))
return;
email = rs.getString("email");
String from_address = getMailProperty(account, "mail");
String display_name = getMailProperty(account, "display_name");
InternetAddress inetAddr = new InternetAddress(from_address,
display_name);
Mail mail = new Mail(inetAddr,
getMailProperty(account, "username"),
getMailProperty(account, "password"));
try {
mail.sendEmail(email, subject, content);
} catch (MessagingException ex) {
log.error("Cannot send to: " + email + " - user " + rs.getInt("customer_id"));
}
sentIds.append(rs.getInt("id") + ",");
if(++sentCount % CHUNK_SIZE == 0) {
try {
try {
rs.close();
stmt.close();
con.close();
} catch (SQLException ex) {
}
Connection con2 = null;
con2 = locator.getPGDataSource().getConnection();
PreparedStatement stmt2 =
con2.prepareStatement("DELETE FROM mail_recipient " +
"WHERE id IN(" +
sentIds.substring(0, sentIds.length()-1) + ")");
stmt2.executeUpdate();
sentIds = new StringBuffer();
stmt2 = con2.prepareStatement("SELECT COUNT(*) " +
"FROM mail_recipient " +
"WHERE task_id = ?");
stmt2.setInt(1, Integer.parseInt(id));
ResultSet rs2 = stmt2.executeQuery();
rs2.next();
int count = rs2.getInt(1);
log.error(count + "");
taskNode.setAttribute("recipients_left", count + "");
saveXML(doc, file);
try {
rs2.close();
stmt2.close();
con2.close();
} catch (SQLException ex) {
}
Thread.sleep(60*1000);
} catch (InterruptedException ex) {
}
con = locator.getPGDataSource().getConnection();
stmt = con.prepareStatement("SELECT * " +
"FROM mail_recipient " +
"WHERE task_id = ? " +
"ORDER BY id ASC " +
"LIMIT " + CHUNK_SIZE + " ");
stmt.setInt(1, Integer.parseInt(id));
rs = stmt.executeQuery();
}
}