hi ,
In my application i want to use the file from one system to another system.
i am using stream reader to get the file over the network , its working fine for small file,
but i want to access file size exceed 10 MB then i faced the problem. Its get very slow the file transfer over the network.
so i am try to use java NIO for transfer file,
Using NIO , While i am make server and client both are same system then the file tranfer is 10MB file in 10 seconds , but i am making server and client are different machine then its take so long to transfer file ie (10 MB file in 3 minutes).
I want to reduce the time . If any chance to reduced the file transfer time then please suggest me.
my code is
Server Code :
public class NioServer implements Runnable {
// The host:port combination to listen on
private InetAddress hostAddress;
private int port;
// The channel on which we'll accept connections
private ServerSocketChannel serverChannel;
// The selector we'll be monitoring
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(10000);
private EchoWorker worker;
// A list of PendingChange instances
private List pendingChanges = new LinkedList();
// Maps a SocketChannel to a list of ByteBuffer instances
private Map pendingData = new HashMap();
public NioServer(InetAddress hostAddress, int port, EchoWorker worker) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
this.selector = this.initSelector();
this.worker = worker;
}
public void send(SocketChannel socket, byte[] data) {
System.out.println("Server Send ");
synchronized (this.pendingChanges) {
// Indicate we want the interest ops set changed
this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
// And queue the data we want written
synchronized (this.pendingData) {
List queue = (List) this.pendingData.get(socket);
if (queue == null) {
queue = new ArrayList();
this.pendingData.put(socket, queue);
}
queue.add(ByteBuffer.wrap(data));
}
}
// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();
}
public void run() {
while (true) {
try {
// Process any pending changes
synchronized (this.pendingChanges) {
Iterator changes = this.pendingChanges.iterator();
while (changes.hasNext()) {
ChangeRequest change = (ChangeRequest) changes.next();
switch (change.type) {
case ChangeRequest.CHANGEOPS:
SelectionKey key = change.socket.keyFor(this.selector);
key.interestOps(change.ops);
}
}
this.pendingChanges.clear();
}
// Wait for an event one of the registered channels
this.selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
this.accept(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable()) {
this.write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) throws IOException {
System.out.println("Server Accept ");
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
Socket socket = socketChannel.socket();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
System.out.println("server Read : ");
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
readBuffer.clear();
// readFully( readBuffer , socketChannel ) ;
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
// Hand the data off to our worker thread
this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead);
}
private void write(SelectionKey key) throws IOException {
System.out.println("Server Write ");
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (this.pendingData) {
List queue = (List) this.pendingData.get(socketChannel);
// Write until there's not more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
socketChannel.write(buf);
if (buf.remaining() > 0) {
System.out.println( "buf.remaining() " + buf.remaining() ) ;
// ... or the socket's buffer fills up
break;
}
queue.remove(0);
}
if (queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
key.interestOps(SelectionKey.OP_READ);
}
}
}
private Selector initSelector() throws IOException {
// Create a new selector
Selector socketSelector = SelectorProvider.provider().openSelector();
// Create a new non-blocking server socket channel
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
serverChannel.socket().bind(isa);
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
private static void readFully(ByteBuffer buf, SocketChannel socket) throws IOException
{
int len = buf.limit() - buf.position();
while (len > 0)
{
len -= socket.read(buf);
}
}
public static void main(String[] args) {
try {
EchoWorker worker = new EchoWorker();
new Thread(worker).start();
new Thread(new NioServer(null, 9090, worker)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client Code :
public class NioClient implements Runnable {
// The host:port combination to connect to
private InetAddress hostAddress;
private int port;
// The selector we'll be monitoring
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate( 10596 ) ;
// A list of PendingChange instances
private List pendingChanges = new LinkedList();
// Maps a SocketChannel to a list of ByteBuffer instances
private Map pendingData = new HashMap();
private byte[] bufferByteA = null ;
// Maps a SocketChannel to a RspHandler
private Map rspHandlers = Collections.synchronizedMap(new HashMap());
public NioClient(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
this.selector = this.initSelector();
}
public void send(byte[] data, RspHandler handler) throws IOException {
// Start a new connection
SocketChannel socket = this.initiateConnection();
// Register the response handler
this.rspHandlers.put(socket, handler);
// And queue the data we want written
synchronized (this.pendingData) {
List queue = (List) this.pendingData.get(socket);
if (queue == null) {
queue = new ArrayList();
this.pendingData.put(socket, queue);
}
queue.add(ByteBuffer.wrap(data));
}
// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();
}
public void run()
{
while (true)
{
try
{
// Process any pending changes
synchronized (this.pendingChanges)
{
Iterator changes = this.pendingChanges.iterator();
while (changes.hasNext())
{
ChangeRequest change = (ChangeRequest) changes.next();
switch (change.type)
{
case ChangeRequest.CHANGEOPS:
SelectionKey key = change.socket.keyFor(this.selector);
key.interestOps(change.ops);
break;
case ChangeRequest.REGISTER:
change.socket.register(this.selector, change.ops);
break;
}
}
this.pendingChanges.clear();
}
// Wait for an event one of the registered channels
this.selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext())
{
System.out.println( " ----run 5 " ) ;
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid())
{
continue;
}
// Check what event is available and deal with it
if (key.isConnectable())
{
this.finishConnection(key);
}
else if (key.isReadable())
{
this.read(key);
}
else if (key.isWritable())
{
this.write(key);
}
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
private void read(SelectionKey key) throws IOException {
System.out.println( "---------read 1 " ) ;
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
this.readBuffer.clear();
System.out.println( "---------read 2 " + readBuffer.capacity()) ;
readBuffer = ByteBuffer.allocate( bufferByteA.length ) ;
// Attempt to read off the channel
// int numRead;
try {
// numRead = socketChannel.read(this.readBuffer);
readFully( readBuffer , socketChannel ) ;
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
//
// if (numRead == -1) {
// // Remote entity shut the socket down cleanly. Do the
// // same from our end and cancel the channel.
// key.channel().close();
// key.cancel();
// return;
// }
// Handle the response
this.handleResponse(socketChannel, this.readBuffer.array(), readBuffer.capacity() );
}
private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
// Make a correctly sized copy of the data before handing it
// to the client
byte[] rspData = new byte[numRead];
// Look up the handler for this channel
RspHandler handler = (RspHandler) this.rspHandlers.get(socketChannel);
// And pass the response to it
if (handler.handleResponse(rspData)) {
// The handler has seen enough, close the connection
socketChannel.close();
socketChannel.keyFor(this.selector).cancel();
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
readBuffer.flip() ;
List queue = null ;
synchronized (this.pendingData) {
queue = (List) this.pendingData.get(socketChannel);
writeFully( readBuffer , socketChannel ) ;
//
// Write until there's not more data ...
while (!queue.isEmpty()) {
// ByteBuffer buf = (ByteBuffer) queue.get(0);
// socketChannel.write(buf);
// writeFully( buf , socketChannel ) ;
// if (buf.remaining() > 0) {
// // ... or the socket's buffer fills up
// break;
// }
queue.remove(0);
}
//
if (queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
key.interestOps(SelectionKey.OP_READ);
}
}
}
private void finishConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Finish the connection. If the connection operation failed
// this will raise an IOException.
try {
socketChannel.finishConnect();
} catch (IOException e) {
// Cancel the channel's registration with our selector
System.out.println(e);
key.cancel();
return;
}
// Register an interest in writing on this channel
key.interestOps(SelectionKey.OP_WRITE);
}
private SocketChannel initiateConnection() throws IOException {
// Create a non-blocking socket channel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// Kick off connection establishment
socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));
// socketChannel.finishConnect() ;
// Queue a channel registration since the caller is not the
// selecting thread. As part of the registration we'll register
// an interest in connection events. These are raised when a channel
// is ready to complete connection establishment.
synchronized(this.pendingChanges) {
this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
}
return socketChannel;
}
private Selector initSelector() throws IOException {
// Create a new selector
return SelectorProvider.provider().openSelector();
}
public static void main(String[] args) {
try {
NioClient client = new NioClient(InetAddress.getByName("healsoft1"), 9090);
Thread t = new Thread(client);
t.setDaemon(true);
t.start();
RspHandler handler = new RspHandler();
client.readBytesFromFile( handler ) ;
} catch (Exception e) {
e.printStackTrace();
}
}
private void readBytesFromFile( RspHandler handler ) throws IOException
{
File file = new File( "Y:/output.txt") ;
bufferByteA = getBytesFromFile( file ) ;
readBuffer = ByteBuffer.allocate(bufferByteA.length ) ;
readBuffer.put( bufferByteA , 0 , bufferByteA.length ) ;
send(bufferByteA , handler);
handler.waitForResponse();
}
private static void readFully(ByteBuffer buf, SocketChannel socket) throws IOException
{
System.out.println( "readFully : " ) ;
int len = buf.limit() - buf.position();
int count = 0 ;
while (len > 0)
{
len -= socket.read(buf);
}
}
private void writeFully(ByteBuffer buf , SocketChannel socketChannel) throws IOException
{
System.out.println( "writeFully : " ) ;
int len = buf.limit() - buf.position() ;
SocketChannel socket = socketChannel ;
socket.open();
while (len > 0)
{
len -= socket.write(buf);
}
}
private static byte[] getBytesFromFile(File file) throws IOException
{
InputStream is = new FileInputStream(file);
// Get the size of the file
long length = file.length();
/*
* You cannot create an array using a long type. It needs to be an int
* type. Before converting to an int type, check to ensure that file is
* not loarger than Integer.MAX_VALUE;
*/
if (length > Integer.MAX_VALUE)
{
System.out.println("File is too large to process");
return null;
}
// Create the byte array to hold the data
byte[] bytes = new byte[(int)length];
// Read in the bytes
int offset = 0;
int numRead = 0;
while ( (offset < bytes.length)
&&
( (numRead=is.read(bytes, offset, bytes.length-offset)) >= 0) )
{
offset += numRead;
}
// Ensure all the bytes have been read in
if (offset < bytes.length)
{
throw new IOException("Could not completely read file " + file.getName());
}
is.close();
return bytes;
}
public static String printTimeWithMilliSec(long l )
{
Date date = new Date( l ) ;
SimpleDateFormat f = new SimpleDateFormat("HH:mm:ss SSS");
return f.format(date);
}
}