I'm using a distributed cache with a write-behind cache store (see the config below). I found that when I do something like myCache.put(key, value, ttl), the entry survives the specified ttl. I tried doing the same with a distributed cache with a write-through cachestore and there everything does happen correctly.
Is this sort of operation not permitted in caches containing a write-behind cachestore? If not wouldn't it be better to throw an UnsupportedOperationException.
I created a small test to simulate this. I added values to the cache with a TTL of 1 to 10 seconds and found that the 10 second entries stayed in the cache.
Configuration used:
<?xml version="1.0"?>
<!DOCTYPE cache-config SYSTEM "cache-config.dtd">
<cache-config>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>TTL_TEST</cache-name>
<scheme-name>testScheme</scheme-name>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<distributed-scheme>
<scheme-name>testScheme</scheme-name>
<service-name>testService</service-name>
<backing-map-scheme>
<read-write-backing-map-scheme>
<internal-cache-scheme>
<local-scheme>
<service-name>testBackLocalService</service-name>
</local-scheme>
</internal-cache-scheme>
<cachestore-scheme>
<class-scheme>
<scheme-name>testBackStore</scheme-name>
<class-name>TTLTestServer$TestCacheStore</class-name>
</class-scheme>
</cachestore-scheme>
<write-delay>3s</write-delay>
</read-write-backing-map-scheme>
</backing-map-scheme>
<local-storage>true</local-storage>
<autostart>true</autostart>
</distributed-scheme>
</caching-schemes>
</cache-config>
Code of test:
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.net.cache.CacheStore;
@Test
public class TTLTestServer
{
private static final int RETRIES = 5;
private static final Logger logger = LoggerFactory.getLogger( TTLTestServer.class );
private NamedCache m_cache;
/**
* List of Time-To-Lives in seconds to check
*/
private final List<Integer> m_listOfTTLs = Lists.newArrayList(1, 3, 5, 10);
/**
* Test is done in separate threads to speed up the test
*/
private final ExecutorService m_executorService = Executors.newCachedThreadPool();
@BeforeClass
public void setup()
{
logger.info("Getting the cache");
m_cache = CacheFactory.getCache("TTL_TEST");
}
public static class TestCacheStore implements CacheStore
{
public void erase(Object arg0)
{}
public void eraseAll(Collection arg0)
{}
public void store(Object arg0, Object arg1)
{}
public void storeAll(Map arg0)
{}
public Object load(Object arg0)
{return null;}
public Map loadAll(Collection arg0)
{return null;}
}
public void testTTL() throws InterruptedException, ExecutionException
{
logger.info("Starting TTL test");
List<Future<StopWatch>> futures = Lists.newArrayList();
for (final Integer ttl : m_listOfTTLs)
{
futures.add(m_executorService.submit(new Callable()
{
public Object call() throws Exception
{
StopWatch stopWatch= new StopWatch("TTL=" + ttl);
for (int retry = 0; retry < RETRIES; retry++)
{
logger.info("Adding a value in cache for TTL={} in try={}", ttl, retry+1);
stopWatch.start("Retry="+retry);
m_cache.put(ttl, null, ttl*1000);
waitUntilNotInCacheAnymore(ttl, retry);
stopWatch.stop();
}
return stopWatch;
}
private void waitUntilNotInCacheAnymore(final Integer ttl, final int currentTry) throws InterruptedException
{
DateTime startTime = new DateTime();
long maxMillisToWait = ttl*2*1000; //wait max 2 times the time of the ttl
while(m_cache.containsKey(ttl) )
{
Duration timeTaken = new Duration(startTime, new DateTime());
if(timeTaken.getMillis() > maxMillisToWait)
{
throw new RuntimeException("Already waiting " + timeTaken + " for ttl=" + ttl + " and retry=" + currentTry);
}
Thread.sleep(1000);
}
}
}));
}
logger.info("Waiting until all futures are finished");
m_executorService.shutdown();
logger.info("Getting results from futures");
for (Future<StopWatch> future : futures)
{
StopWatch sw = future.get();
logger.info(sw.prettyPrint());
}
}
}
Failure message:
FAILED: testTTL
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Already waiting PT20.031S for ttl=10 and retry=0
at java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source)
at java.util.concurrent.FutureTask.get(Unknown Source)
at TTLTestServer.testTTL(TTLTestServer.java:159)
Caused by: java.lang.RuntimeException: Already waiting PT20.031S for ttl=10 and retry=0
at TTLTestServer$1.waitUntilNotInCacheAnymore(TTLTestServer.java:139)
at TTLTestServer$1.call(TTLTestServer.java:122)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
I'm using Coherence 3.4.2.
Best regards
Jan