Hi,
I am getting some unexpected behaviour when testing the distributed
resource recovery functionality in Java Edition.
As I understand it, a prepared but not committed or rolledback
transaction should stay in the recovery list. The X/Open standard
says:
"Two consecutive complete recovery scans return the same list of
transaction branches unless a transaction manager calls xa_commit(),
xa_forget(), xa_prepare(), or xa_rollback() for that resource manager,
or unless that resource manager heuristically completes some branches,
between the two recovery scans."
I don't really understand the heuristic part, so perhaps that is my
prolem, but as I read it a prepared transaction should come up again
and again until I do something with it.
Please see the attached test program. This contains code to prepare
but not complete a transaction, and code to count the number of
transactions pending recover. This gives me the following results:
$ java -cp .:je-3.2.44.jar XATest prepare ~/dbtest
$ java -cp .:je-3.2.44.jar XATest recover ~/dbtest
1
$ java -cp .:je-3.2.44.jar XATest recover ~/dbtest
1
$ java -cp .:je-3.2.44.jar XATest recover ~/dbtest
1
$ java -cp .:je-3.2.44.jar XATest recover ~/dbtest
0
So on the fourth call, I am getting no transactions to recover. The
zero result is repeated on further attempts. Surely this is not
correct behaviour? Any insight would be appreciated.
My main concern here is that a transaction has been lost somewhere in
the system which spoils the atomicity in a distributed system.
James
---- XATest.java:
import com.sleepycat.je.Database;
public class XATest {
private static String path;
public static Xid xid (final int format, final String global, final String branch) {
return new Xid () {
public byte[] getBranchQualifier () {
try { return branch.getBytes ("utf-8"); }
catch (UnsupportedEncodingException e) { throw new RuntimeException (e); }
}
public int getFormatId () {
return format;
}
public byte[] getGlobalTransactionId () {
try { return global.getBytes ("utf-8"); }
catch (UnsupportedEncodingException e) { throw new RuntimeException (e); }
}
@Override
public String toString () {
return global + ":" + branch;
}
@Override
public boolean equals (Object other) {
if (! (other instanceof Xid)) return false;
Xid xid = (Xid) other;
try {
return format == xid.getFormatId ()
&& global.getBytes ("utf-8") == xid.getGlobalTransactionId ()
&& branch.getBytes ("utf-8") == xid.getBranchQualifier ();
} catch (UnsupportedEncodingException e) { throw new RuntimeException (e); }
}
};
}
public static void prepare () throws Exception {
EnvironmentConfig envConfig = new EnvironmentConfig ();
envConfig.setTransactional (true);
envConfig.setAllowCreate (true);
XAEnvironment env = new XAEnvironment (new File (path), envConfig);
DatabaseConfig dbConfig = new DatabaseConfig ();
dbConfig.setTransactional (true);
dbConfig.setAllowCreate (true);
Database db = env.openDatabase (null, "test", dbConfig);
Xid xid = xid (0, "a", "b");
env.start (xid, XAResource.TMNOFLAGS);
DatabaseEntry keyEntry = new DatabaseEntry ("key".getBytes ("utf-8"));
DatabaseEntry valEntry = new DatabaseEntry ("val".getBytes ("utf-8"));
db.put (null, keyEntry, valEntry);
env.end (xid, XAResource.TMSUCCESS);
env.prepare (xid);
}
public static void recover () throws Exception {
EnvironmentConfig envConfig = new EnvironmentConfig ();
envConfig.setTransactional (true);
envConfig.setAllowCreate (true);
XAEnvironment env = new XAEnvironment (new File (path), envConfig);
Xid[] xids = env.recover (XAResource.TMNOFLAGS);
System.out.println (xids.length);
}
public static void main (String[] args) {
try {
path = args [1];
if (args [0].equals ("prepare"))
prepare ();
if (args [0].equals ("recover"))
recover ();
} catch (Exception e) {
throw e instanceof RuntimeException? (RuntimeException) e : new RuntimeException (e);
}
}
}