Skip to Main Content

Java APIs

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

NIO Socket Channels, select loops.

843790Aug 11 2006 — edited Aug 16 2006
I've got a problem with my library for a chat. I use NIO:

1. creating selector
2. creating ServerSocket, make it non-blocking
3. after accepting connections I store them to output data during chat-session.

Of course, I set up a WRITE key for a channel only when I need to write something.

aSelectionKey.interestOps(SelectionKey.OP_WRITE);
aSelectionKey.selector().wakeup();

After writing I try to clear all interestingOps

aSelectionKey.interestOps(0);

Everything goes ok until some strange time, when my program begins to fire up CPU with 100% usage. I found, that _selector.select() does not block as before, so, I've got infinite loop without wait/sleep/block.

Here is main loop
        while (true) {
            try {
                if (_selector.select() > 0) {
                    updateLastLoopCounters();

                    Set<SelectionKey> selectionKeys = _selector.selectedKeys();

                    _lastSelectorsCount = selectionKeys.size();

                    try {
                        for (SelectionKey selectionKey : selectionKeys) {
                            if (selectionKey.isValid()) {
                                if (selectionKey.isAcceptable()) {
                                    _selectorsAcceptCount++;
                                    acceptSocket(selectionKey);
                                } else if (selectionKey.isWritable()) {
                                    _selectorsWriteCount++;
                                    writeDataToSocket(selectionKey);
                                }
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();

                        if (_currentChannel != null) {
                            try {
                                _currentChannel.close();
                            } catch (IOException e1) {
                                e1.printStackTrace();
                            }
                        }
                    } finally {
                        selectionKeys.clear();
                    }
                } else {
                    _loopCounter++;
                }
            } catch (Exception e) {
                e.printStackTrace();

                if (_currentChannel != null) {
                    try {
                        _currentChannel.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
And here is my Accept/Write methods:
    private void acceptSocket(SelectionKey aSelectionKey) throws IOException {
        StringBuilder builder = new StringBuilder();

        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) aSelectionKey.channel();

        _currentChannel = serverSocketChannel.accept();
        _currentChannel.configureBlocking(false);

        SelectionKey socketSelectionKey = _currentChannel.register(_selector, 0);

        builder.delete(0, builder.length());
        try {
            Utils.readFromChannel(_currentChannel, builder, JSocketerStarter.US_ASCII);
        } catch (InterruptedIOException e) {
            _currentChannel.close();
            _currentChannel = null;
            return;
        }

        String httpRequest = builder.toString();
        int headerStartIndex = httpRequest.indexOf("\r\n");

        if (headerStartIndex == -1) {
            return;
        }

        String get = httpRequest.substring(0, headerStartIndex).trim();

        if (!get.startsWith("GET") && !get.startsWith("get")) {
            _socketer.log(LOG_PREFIX, "Writers can accept only HTTP GET request", LogType.ERROR);
        } else {
            callControl(get, httpRequest, headerStartIndex, socketSelectionKey);
        }
    }
    private void writeDataToSocket(SelectionKey aSelectionKey) throws IOException {
        boolean cantContinueWriting = false;

        try {
            SocketChannel writeChannel = (SocketChannel) aSelectionKey.channel();

            if (!writeChannel.isConnected()) {
                aSelectionKey.interestOps(0);
                return;
            }

            Object attachment = aSelectionKey.attachment();

            if (attachment instanceof PreparedBuffers) {
                PreparedBuffers buffers = (PreparedBuffers) attachment;

                if (!buffers.isEmpty()) {
                    ByteBuffer buffer = buffers.getFirst();

                    if (buffer.capacity() == 0) {
                        writeChannel.close();
                    } else {
                        while (buffer.hasRemaining()) {
                            try {
                                writeChannel.write(buffer);
                            } catch (IOException e) {
                                cantContinueWriting = true;
                                buffers.getWritersControl().writingFailed(buffers, e);
                                aSelectionKey.attach(e);
                                break;
                            }
                            buffer.compact();
                            buffer.flip();
                        }

                        if (!buffer.hasRemaining() && !buffers.isEmpty()) {
                            _socketer.log(LOG_PREFIX, "Message sent through socket: " + buffers.getFirstKey() + '\n', LogType.DEBUG);
                            buffers.removeFirst();
                        }
                    }
                } else {
                    aSelectionKey.interestOps(0);
                }
            } else if (attachment instanceof Queue) {
                Queue<ByteBuffer> buffers = (Queue<ByteBuffer>) attachment;

                if (!buffers.isEmpty()) {
                    ByteBuffer buffer = buffers.peek();

                    if (buffer.capacity() == 0) {
                        writeChannel.close();
                    } else {
                        while (buffer.hasRemaining()) {
                            try {
                                writeChannel.write(buffer);
                            } catch (IOException e) {
                                cantContinueWriting = true;
                                writeChannel.close();
                                break;
                            }
                            buffer.compact();
                            buffer.flip();
                        }

                        if (!buffer.hasRemaining() && !buffers.isEmpty()) {
                            buffers.poll();
                        }
                    }
                } else {
                    aSelectionKey.interestOps(0);
                }
            }
        } finally {
            if (cantContinueWriting) {
                System.out.println("Cant continue writing to a channel due to strange error");
                aSelectionKey.interestOps(0);
            }
        }
    }
Why can this situation happen?

What have I to do with this looped selector? Do I have to rebuild it? If yes - how can I do this correctly?

Thanks.
Comments
Locked Post
New comments cannot be posted to this locked post.
Post Details
Locked on Sep 13 2006
Added on Aug 11 2006
15 comments
316 views