Broadcast to multiple clients using NIO.
1 posts in topic
Flat View  Flat View
TOPIC ACTIONS:
 

Posted By:   Ronald_Wildenberg
Posted On:   Tuesday, January 21, 2003 01:08 AM

Hi, I have a question concerning a Java NIO chat application. I'll post my code below, so if anyone has some time to take a look at it, please. The problem: The server, on processing an 'isReadable' key, reads whatever there is from the key's channel. When a chat line from a client is fully read, this line needs to be sent to all clients (including the sender). However, only the first registered client receives all lines, the others do not. My idea on doing this is as follows: When a complete chat line is read, call selector.keys() to retrieve all keys. Remove the OP_ACCEPT key from (a copy of) the returned set. Iterate over   More>>

Hi,


I have a question concerning a Java NIO chat application.
I'll post my code below, so if anyone has some time to take
a look at it, please.


The problem: The server, on processing an 'isReadable' key,
reads whatever there is from the key's channel. When a chat
line from a client is fully read, this line needs to be sent
to all clients (including the sender). However, only the
first registered client receives all lines, the others
do not.


My idea on doing this is as follows: When a complete chat
line is read, call selector.keys() to retrieve all keys.
Remove the OP_ACCEPT key from (a copy of) the returned set.
Iterate over this set and write the chat line to each
key.channel().


I even print the channels I write to and clearly see that
if two clients are connected, the chat line is written to
two distinct channels.


Does anyone have an idea why this should not work (source
code below)? I use Java 1.4.1.


Thanks in advance,
Ronald Wildenberg.




Short code explanation: Each time a connection is accepted,
a callback class is attached to the new key. This callback
class is called whenever its key is in readable status. The
interesting stuff happens in the FlashChannelCallback.read()
method.




Server Code:


			
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class SocketServer {

private int portNr = 2001;

private Selector selector = null;

private ServerSocketChannel selectableChannel = null;

private InetSocketAddress localAddress = null;

public SocketServer() {
}

public SocketServer(int portNr) {
this.portNr = portNr;
}

public void initialize() throws IOException {

selector = Selector.open();

selectableChannel = ServerSocketChannel.open();
selectableChannel.configureBlocking(false);

InetAddress localHost = InetAddress.getLocalHost();
localAddress = new InetSocketAddress(localHost, portNr);
selectableChannel.socket().bind(localAddress);
}

public void finalize() throws IOException {
selectableChannel.close();
selector.close();
}

public void acceptConnections() throws IOException {

int keysAdded = 0;

SelectionKey acceptKey =
selectableChannel.register(selector, SelectionKey.OP_ACCEPT);

while ((keysAdded = acceptKey.selector().select()) > 0) {

Set readyKeys = selector.selectedKeys();
Iterator iter = readyKeys.iterator();
while (iter.hasNext()) {

SelectionKey key = (SelectionKey) iter.next();
iter.remove();

if (key.isAcceptable()) {

ServerSocketChannel keyChannel =
(ServerSocketChannel) key.channel();
SocketChannel channel = keyChannel.accept();
channel.configureBlocking(false);

SelectionKey readWriteKey = channel.register(
selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
ChannelCallback callback =
ChannelCallback.create(channel, this);
readWriteKey.attach(callback);
}

else if (key.isReadable()) {
try {
((ChannelCallback) key.attachment()).read();
} catch (IOException e) {
key.channel().close();
}
}

else if (key.isWritable()) {

SocketChannel channel =
((ChannelCallback) key.attachment()).getChannel();
try {
String connectionMessage = "Connection established with "
+ localAddress + "...
";
channel.write(ByteBuffer.wrap(
connectionMessage.getBytes("us-ascii")));
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
channel.close();
}
}
}
}
}

public Set getChannelCallbacks() {

Set channelCallbacks = new HashSet(13);
Set keys = selector.keys();

synchronized (keys) {
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
SelectionKey key = (SelectionKey) iter.next();

if (key.interestOps() != SelectionKey.OP_ACCEPT) {
Object attachment = key.attachment();
channelCallbacks.add(attachment);
}
}
}
return channelCallbacks;
}

public static void main(String[] args) {

int portNr = 2001;

try {
String portNrString = args[0];
portNr = Integer.valueOf(portNrString).intValue();
} catch (ArrayIndexOutOfBoundsException e) {
// Do nothing.
} catch (NumberFormatException e) {
System.err.println("Usage: java SocketServer ");
return;
}

SocketServer socketServer = new SocketServer();
socketServer.setPortNr(portNr);

try {
socketServer.initialize();
socketServer.acceptConnections();
} catch (IOException e) {
e.printStackTrace();
}
}
}



Callback Code (attachment for each OP_READ | OP_WRITE key):


			
import java.io.IOException;
import java.nio.channels.SocketChannel;

public abstract class ChannelCallback {

private int bufferSize = 8;

private SocketChannel channel;

private SocketServer socketServer = null;

public static ChannelCallback create(
SocketChannel channel, SocketServer socketServer)
{
ChannelCallback channelCallback = new FlashChannelCallback();
channelCallback.setChannel(channel);
channelCallback.setSocketServer(socketServer);

return channelCallback;
}

public abstract void read() throws IOException;

public SocketChannel getChannel() {
return channel;
}
public void setChannel(SocketChannel channel) {
this.channel = channel;
}

public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}

public SocketServer getSocketServer() {
return socketServer;
}
public void setSocketServer(SocketServer socketServer) {
this.socketServer = socketServer;
}
}





Implementation of the callback for this application:


			
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

public class FlashChannelCallback extends ChannelCallback {

private StringBuffer currentIncoming;

public FlashChannelCallback() {
currentIncoming = new StringBuffer();
}

public void read() throws IOException {

ByteBuffer byteBuffer = ByteBuffer.allocate(getBufferSize());
getChannel().read(byteBuffer);
byteBuffer.flip();

Charset charset = Charset.forName("us-ascii");
CharsetDecoder charsetDecoder = charset.newDecoder();
CharBuffer charBuffer = charsetDecoder.decode(byteBuffer);
String result = charBuffer.toString();

currentIncoming.append(result);

if (readComplete()) {
execute();
}
}

public boolean readComplete() {
return (currentIncoming.toString().indexOf("
") >= 0);
}

public void execute() throws IOException {

String responseString = "Received: " + currentIncoming;
ByteBuffer responseBuffer =
ByteBuffer.wrap(responseString.getBytes("us-ascii"));

Set callbacks = getSocketServer().getChannelCallbacks();

for (Iterator iter = callbacks.iterator(); iter.hasNext(); ) {
ChannelCallback callback = (ChannelCallback) iter.next();
callback.getChannel().write(responseBuffer);
}

currentIncoming.setLength(0);
}
}
   <<Less

Re: Broadcast to multiple clients using NIO.

Posted By:   Ronald_Wildenberg  
Posted On:   Tuesday, January 21, 2003 01:59 AM

In the meantime I found the answer to my own problem, so thanks to anyone who has already looked into it.

The problem was that in the FlashChannelCallback.execute() method I write a ByteBuffer, but never take the effort of flipping it back.

The correct code snippet should be:

ChannelCallback callback = (ChannelCallback) iter.next();
callback.getChannel().write(responseBuffer);
responseBuffer.flip();
About | Sitemap | Contact