I've got a problem with my library for a chat. I use NIO:
1. creating selector
2. creating ServerSocket, make it non-blocking
3. after accepting connections I store them to output data during chat-session.
Of course, I set up a WRITE key for a channel only when I need to write something.
aSelectionKey.interestOps(SelectionKey.OP_WRITE);
aSelectionKey.selector().wakeup();
After writing I try to clear all interestingOps
aSelectionKey.interestOps(0);
Everything goes ok until some strange time, when my program begins to fire up CPU with 100% usage. I found, that _selector.select() does not block as before, so, I've got infinite loop without wait/sleep/block.
Here is main loop
while (true) {
try {
if (_selector.select() > 0) {
updateLastLoopCounters();
Set<SelectionKey> selectionKeys = _selector.selectedKeys();
_lastSelectorsCount = selectionKeys.size();
try {
for (SelectionKey selectionKey : selectionKeys) {
if (selectionKey.isValid()) {
if (selectionKey.isAcceptable()) {
_selectorsAcceptCount++;
acceptSocket(selectionKey);
} else if (selectionKey.isWritable()) {
_selectorsWriteCount++;
writeDataToSocket(selectionKey);
}
}
}
} catch (Exception e) {
e.printStackTrace();
if (_currentChannel != null) {
try {
_currentChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
} finally {
selectionKeys.clear();
}
} else {
_loopCounter++;
}
} catch (Exception e) {
e.printStackTrace();
if (_currentChannel != null) {
try {
_currentChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
And here is my Accept/Write methods:
private void acceptSocket(SelectionKey aSelectionKey) throws IOException {
StringBuilder builder = new StringBuilder();
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) aSelectionKey.channel();
_currentChannel = serverSocketChannel.accept();
_currentChannel.configureBlocking(false);
SelectionKey socketSelectionKey = _currentChannel.register(_selector, 0);
builder.delete(0, builder.length());
try {
Utils.readFromChannel(_currentChannel, builder, JSocketerStarter.US_ASCII);
} catch (InterruptedIOException e) {
_currentChannel.close();
_currentChannel = null;
return;
}
String httpRequest = builder.toString();
int headerStartIndex = httpRequest.indexOf("\r\n");
if (headerStartIndex == -1) {
return;
}
String get = httpRequest.substring(0, headerStartIndex).trim();
if (!get.startsWith("GET") && !get.startsWith("get")) {
_socketer.log(LOG_PREFIX, "Writers can accept only HTTP GET request", LogType.ERROR);
} else {
callControl(get, httpRequest, headerStartIndex, socketSelectionKey);
}
}
private void writeDataToSocket(SelectionKey aSelectionKey) throws IOException {
boolean cantContinueWriting = false;
try {
SocketChannel writeChannel = (SocketChannel) aSelectionKey.channel();
if (!writeChannel.isConnected()) {
aSelectionKey.interestOps(0);
return;
}
Object attachment = aSelectionKey.attachment();
if (attachment instanceof PreparedBuffers) {
PreparedBuffers buffers = (PreparedBuffers) attachment;
if (!buffers.isEmpty()) {
ByteBuffer buffer = buffers.getFirst();
if (buffer.capacity() == 0) {
writeChannel.close();
} else {
while (buffer.hasRemaining()) {
try {
writeChannel.write(buffer);
} catch (IOException e) {
cantContinueWriting = true;
buffers.getWritersControl().writingFailed(buffers, e);
aSelectionKey.attach(e);
break;
}
buffer.compact();
buffer.flip();
}
if (!buffer.hasRemaining() && !buffers.isEmpty()) {
_socketer.log(LOG_PREFIX, "Message sent through socket: " + buffers.getFirstKey() + '\n', LogType.DEBUG);
buffers.removeFirst();
}
}
} else {
aSelectionKey.interestOps(0);
}
} else if (attachment instanceof Queue) {
Queue<ByteBuffer> buffers = (Queue<ByteBuffer>) attachment;
if (!buffers.isEmpty()) {
ByteBuffer buffer = buffers.peek();
if (buffer.capacity() == 0) {
writeChannel.close();
} else {
while (buffer.hasRemaining()) {
try {
writeChannel.write(buffer);
} catch (IOException e) {
cantContinueWriting = true;
writeChannel.close();
break;
}
buffer.compact();
buffer.flip();
}
if (!buffer.hasRemaining() && !buffers.isEmpty()) {
buffers.poll();
}
}
} else {
aSelectionKey.interestOps(0);
}
}
} finally {
if (cantContinueWriting) {
System.out.println("Cant continue writing to a channel due to strange error");
aSelectionKey.interestOps(0);
}
}
}
Why can this situation happen?
What have I to do with this looped selector? Do I have to rebuild it? If yes - how can I do this correctly?
Thanks.