Skip to Main Content

Java APIs

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

how to increase the speed of network file transfer

843790Jul 30 2008 — edited Aug 5 2008
hi ,

In my application i want to use the file from one system to another system.
i am using stream reader to get the file over the network , its working fine for small file,
but i want to access file size exceed 10 MB then i faced the problem. Its get very slow the file transfer over the network.
so i am try to use java NIO for transfer file,

Using NIO , While i am make server and client both are same system then the file tranfer is 10MB file in 10 seconds , but i am making server and client are different machine then its take so long to transfer file ie (10 MB file in 3 minutes).
I want to reduce the time . If any chance to reduced the file transfer time then please suggest me.

my code is

Server Code :
public class NioServer implements Runnable {
  // The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;

  // The selector we'll be monitoring
  private Selector selector;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(10000);

  private EchoWorker worker;

  // A list of PendingChange instances
  private List pendingChanges = new LinkedList();

  // Maps a SocketChannel to a list of ByteBuffer instances
  private Map pendingData = new HashMap();

  public NioServer(InetAddress hostAddress, int port, EchoWorker worker) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
    this.worker = worker;
  }

  public void send(SocketChannel socket, byte[] data) {
    
    System.out.println("Server Send ");
    synchronized (this.pendingChanges) {
      // Indicate we want the interest ops set changed
      this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));

      // And queue the data we want written
      synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socket);
        if (queue == null) {
          queue = new ArrayList();
          this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
      }
    }

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
  }

  public void run() {
    while (true) {
      try {
        // Process any pending changes
        synchronized (this.pendingChanges) {
          Iterator changes = this.pendingChanges.iterator();
          while (changes.hasNext()) {
            ChangeRequest change = (ChangeRequest) changes.next();
            switch (change.type) {
            case ChangeRequest.CHANGEOPS:
              SelectionKey key = change.socket.keyFor(this.selector);
              key.interestOps(change.ops);
            }
          }
          this.pendingChanges.clear();
        }

        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
        while (selectedKeys.hasNext()) {
          SelectionKey key = (SelectionKey) selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          } else if (key.isReadable()) {
            this.read(key);
          } else if (key.isWritable()) {
            this.write(key);
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  private void accept(SelectionKey key) throws IOException {
    System.out.println("Server Accept ");
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }

  private void read(SelectionKey key) throws IOException {
    System.out.println("server Read : ");
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    readBuffer.clear();

//    readFully( readBuffer , socketChannel ) ;
    // Attempt to read off the channel
    int numRead;
    try {
      numRead = socketChannel.read(readBuffer);
      
    } catch (IOException e) {
      // The remote forcibly closed the connection, cancel
      // the selection key and close the channel.
      key.cancel();
      socketChannel.close();
      return;
    }

    if (numRead == -1) {
      // Remote entity shut the socket down cleanly. Do the
      // same from our end and cancel the channel.
      key.channel().close();
      key.cancel();
      return;
    }

    // Hand the data off to our worker thread
    this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead);
  }

  private void write(SelectionKey key) throws IOException {
    System.out.println("Server Write ");
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
      List queue = (List) this.pendingData.get(socketChannel);

      // Write until there's not more data ...
      while (!queue.isEmpty()) {
        ByteBuffer buf = (ByteBuffer) queue.get(0);
        socketChannel.write(buf);
        if (buf.remaining() > 0) {
          System.out.println( "buf.remaining() " + buf.remaining() ) ;
          // ... or the socket's buffer fills up
          break;
        }
        queue.remove(0);
      }

      if (queue.isEmpty()) {
        // We wrote away all data, so we're no longer interested
        // in writing on this socket. Switch back to waiting for
        // data.
        key.interestOps(SelectionKey.OP_READ);
      }
    }
  }

  private Selector initSelector() throws IOException {
    // Create a new selector
    Selector socketSelector = SelectorProvider.provider().openSelector();

    // Create a new non-blocking server socket channel
    this.serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
    serverChannel.socket().bind(isa);

    // Register the server socket channel, indicating an interest in 
    // accepting new connections
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

    return socketSelector;
  }
  
  
  private static void readFully(ByteBuffer buf, SocketChannel socket) throws IOException
  {
    int len = buf.limit() - buf.position();
    while (len > 0)
    {
      len -= socket.read(buf);
    }
  }

  public static void main(String[] args) {
    try {
      EchoWorker worker = new EchoWorker();
      new Thread(worker).start();
      new Thread(new NioServer(null, 9090, worker)).start();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
Client Code :
public class NioClient implements Runnable {
  // The host:port combination to connect to
  private InetAddress hostAddress;
  private int port;

  // The selector we'll be monitoring
  private Selector selector;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate( 10596 ) ;

  // A list of PendingChange instances
  private List pendingChanges = new LinkedList();

  // Maps a SocketChannel to a list of ByteBuffer instances
  private Map pendingData = new HashMap();
  
  private byte[] bufferByteA = null ;
  
  // Maps a SocketChannel to a RspHandler
  private Map rspHandlers = Collections.synchronizedMap(new HashMap());
  
  public NioClient(InetAddress hostAddress, int port) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }

  public void send(byte[] data, RspHandler handler) throws IOException {
    // Start a new connection
    SocketChannel socket = this.initiateConnection();
    
    // Register the response handler
    this.rspHandlers.put(socket, handler);
    
    // And queue the data we want written
    synchronized (this.pendingData) {
      List queue = (List) this.pendingData.get(socket);
      if (queue == null) {
        queue = new ArrayList();
        this.pendingData.put(socket, queue);
      }
      queue.add(ByteBuffer.wrap(data));
    }

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
  }

  public void run()
  {
   
    while (true)
    {
      try
      {
        
        // Process any pending changes
        synchronized (this.pendingChanges)
        {
          Iterator changes = this.pendingChanges.iterator();
          while (changes.hasNext())
          {
            ChangeRequest change = (ChangeRequest) changes.next();
            switch (change.type)
            {
              case ChangeRequest.CHANGEOPS:
                
                SelectionKey key = change.socket.keyFor(this.selector);
                key.interestOps(change.ops);
                break;
              case ChangeRequest.REGISTER:
                
                change.socket.register(this.selector, change.ops);
                break;
            }
          }
          this.pendingChanges.clear();
        }
       
        // Wait for an event one of the registered channels
        this.selector.select();
        
        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
       
        while (selectedKeys.hasNext())
        {
        System.out.println( " ----run 5 " ) ;
          SelectionKey key = (SelectionKey) selectedKeys.next();
          
          selectedKeys.remove();
          
          if (!key.isValid())
          {
            continue;
          }
          
          // Check what event is available and deal with it
          if (key.isConnectable())
          {
            
            this.finishConnection(key);
          }
          else if (key.isReadable())
          {
           
            this.read(key);
           
          }
          else if (key.isWritable())
          {
            
            this.write(key);
          
          }
        }
      }
      catch (Exception e)
      {
        e.printStackTrace();
      }
    }
  }

  private void read(SelectionKey key) throws IOException {
    System.out.println( "---------read 1 " ) ;
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();
    
    System.out.println( "---------read 2 " + readBuffer.capacity()) ;
     readBuffer = ByteBuffer.allocate( bufferByteA.length  ) ;
     
    // Attempt to read off the channel
//    int numRead;
    try {
//      numRead = socketChannel.read(this.readBuffer);
      readFully( readBuffer , socketChannel ) ;
    } catch (IOException e) {
      // The remote forcibly closed the connection, cancel
      // the selection key and close the channel.
      key.cancel();
      socketChannel.close();
      return;
    }
//
//    if (numRead == -1) {
//      // Remote entity shut the socket down cleanly. Do the
//      // same from our end and cancel the channel.
//      key.channel().close();
//      key.cancel();
//      return;
//    }

    // Handle the response
    this.handleResponse(socketChannel, this.readBuffer.array(), readBuffer.capacity() );
  }

  private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
    // Make a correctly sized copy of the data before handing it
    // to the client
    byte[] rspData = new byte[numRead];
  
    // Look up the handler for this channel
    RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel);
    
    // And pass the response to it
    if (handler.handleResponse(rspData)) {
      // The handler has seen enough, close the connection
      socketChannel.close();
      socketChannel.keyFor(this.selector).cancel();
    }
  }

  private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    readBuffer.flip() ;
    List queue = null ;
    synchronized (this.pendingData) {
      queue = (List) this.pendingData.get(socketChannel);
     
      writeFully( readBuffer , socketChannel ) ;
//
      // Write until there's not more data ...
      while (!queue.isEmpty()) {
//        ByteBuffer buf = (ByteBuffer) queue.get(0);
//        socketChannel.write(buf);
//        writeFully( buf , socketChannel ) ;
//        if (buf.remaining() > 0) {
//          // ... or the socket's buffer fills up
//          break;
//        }
        queue.remove(0);
      }
//
      if (queue.isEmpty()) {
        // We wrote away all data, so we're no longer interested
        // in writing on this socket. Switch back to waiting for
        // data.
        key.interestOps(SelectionKey.OP_READ);
      }
    }
  }

  private void finishConnection(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
  
    // Finish the connection. If the connection operation failed
    // this will raise an IOException.
    try {
      socketChannel.finishConnect();
    } catch (IOException e) {
      // Cancel the channel's registration with our selector
      System.out.println(e);
      key.cancel();
      return;
    }
  
    // Register an interest in writing on this channel
    key.interestOps(SelectionKey.OP_WRITE);
  }

  private SocketChannel initiateConnection() throws IOException {
    // Create a non-blocking socket channel
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
  
    // Kick off connection establishment
    socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));
  
//    socketChannel.finishConnect() ;
    
    // Queue a channel registration since the caller is not the 
    // selecting thread. As part of the registration we'll register
    // an interest in connection events. These are raised when a channel
    // is ready to complete connection establishment.
    synchronized(this.pendingChanges) {
      this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
    }
    
    return socketChannel;
  }

  private Selector initSelector() throws IOException {
    // Create a new selector
    return SelectorProvider.provider().openSelector();
  }

  public static void main(String[] args) {
    try {
      NioClient client = new NioClient(InetAddress.getByName("healsoft1"), 9090);
      Thread t = new Thread(client);
      t.setDaemon(true);
      t.start();
      RspHandler handler = new RspHandler();
      client.readBytesFromFile( handler ) ;


    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  private void readBytesFromFile( RspHandler handler ) throws IOException
  {
    File file = new File( "Y:/output.txt") ;
    bufferByteA = getBytesFromFile( file ) ;
    readBuffer = ByteBuffer.allocate(bufferByteA.length ) ;

    readBuffer.put( bufferByteA , 0 , bufferByteA.length ) ;

    send(bufferByteA , handler);
   
    handler.waitForResponse();
  }
  
  private static void readFully(ByteBuffer buf, SocketChannel socket) throws IOException
  {
    System.out.println( "readFully  : " ) ;
    int len = buf.limit() - buf.position();
    int count = 0 ; 
    while (len > 0)
    {

      len -= socket.read(buf);
    }    
      
  }
  
  private void writeFully(ByteBuffer buf , SocketChannel socketChannel) throws IOException
  {
    System.out.println( "writeFully  : " ) ;
    int len = buf.limit() - buf.position() ;
    SocketChannel socket = socketChannel ;
    socket.open();
    while (len > 0)
    {
      
      len -= socket.write(buf);
    }
  }
  
  private static byte[] getBytesFromFile(File file) throws IOException
  {
    InputStream is = new FileInputStream(file);
    
    // Get the size of the file
    long length = file.length();
    
        /*
         * You cannot create an array using a long type. It needs to be an int
         * type. Before converting to an int type, check to ensure that file is
         * not loarger than Integer.MAX_VALUE;
         */
    if (length > Integer.MAX_VALUE)
    {
      System.out.println("File is too large to process");
      return null;
    }
    
    // Create the byte array to hold the data
    byte[] bytes = new byte[(int)length];
    
    // Read in the bytes
    int offset = 0;
    int numRead = 0;
    while ( (offset < bytes.length)
    &&
            ( (numRead=is.read(bytes, offset, bytes.length-offset)) >= 0) )
         {
            offset += numRead;
          }
    
    // Ensure all the bytes have been read in
    if (offset < bytes.length)
    {
      throw new IOException("Could not completely read file " + file.getName());
    }
    
    is.close();
    return bytes;
    
  }
  public static String printTimeWithMilliSec(long l )
  {
    Date date = new Date( l ) ;
    SimpleDateFormat f = new SimpleDateFormat("HH:mm:ss SSS");
    return f.format(date);
    
  }

}
Comments
Locked Post
New comments cannot be posted to this locked post.
Post Details
Locked on Sep 2 2008
Added on Jul 30 2008
4 comments
575 views