00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 package com.realtime.crossfire.jxclient.server.socket;
00023
00024 import com.realtime.crossfire.jxclient.server.crossfire.Model;
00025 import com.realtime.crossfire.jxclient.util.DebugWriter;
00026 import com.realtime.crossfire.jxclient.util.EventListenerList2;
00027 import java.io.EOFException;
00028 import java.io.IOException;
00029 import java.net.InetSocketAddress;
00030 import java.net.SocketAddress;
00031 import java.net.SocketException;
00032 import java.nio.BufferOverflowException;
00033 import java.nio.ByteBuffer;
00034 import java.nio.ByteOrder;
00035 import java.nio.channels.SelectableChannel;
00036 import java.nio.channels.SelectionKey;
00037 import java.nio.channels.Selector;
00038 import java.nio.channels.SocketChannel;
00039 import java.nio.channels.UnresolvedAddressException;
00040 import java.util.Collection;
00041 import org.jetbrains.annotations.NotNull;
00042 import org.jetbrains.annotations.Nullable;
00043
00052 public class ClientSocket {
00053
00057 private static final int MAXIMUM_PACKET_SIZE = 65536;
00058
00062 @NotNull
00063 private final Model model;
00064
00069 @Nullable
00070 private final DebugWriter debugProtocol;
00071
00075 @NotNull
00076 private final EventListenerList2<ClientSocketListener> clientSocketListeners = new EventListenerList2<ClientSocketListener>(ClientSocketListener.class);
00077
00081 @NotNull
00082 private final Selector selector;
00083
00088 @NotNull
00089 private final Object syncConnect = new Object();
00090
00095 private boolean reconnect = true;
00096
00100 @NotNull
00101 private String reconnectReason = "disconnect";
00102
00106 private boolean reconnectIsError = false;
00107
00111 @Nullable
00112 private String host = null;
00113
00117 private int port = 0;
00118
00122 private boolean disconnectPending = false;
00123
00127 @NotNull
00128 private final byte[] packetHeader = new byte[2];
00129
00133 @Nullable
00134 private SelectableChannel selectableChannel = null;
00135
00140 @Nullable
00141 private SelectionKey selectionKey = null;
00142
00146 private int interestOps = 0;
00147
00151 @NotNull
00152 private final byte[] inputBuf = new byte[2+MAXIMUM_PACKET_SIZE];
00153
00157 @NotNull
00158 private final ByteBuffer inputBuffer = ByteBuffer.wrap(inputBuf);
00159
00165 private int inputLen = -1;
00166
00171 @NotNull
00172 private final Object syncOutput = new Object();
00173
00177 @NotNull
00178 private final ByteBuffer outputBuffer = ByteBuffer.allocate(2+MAXIMUM_PACKET_SIZE);
00179
00184 @Nullable
00185 private SocketChannel socketChannel = null;
00186
00190 private boolean isConnected = false;
00191
00195 @NotNull
00196 private final Thread thread = new Thread(new Runnable() {
00197
00198 @Override
00199 public void run() {
00200 process();
00201 }
00202
00203 }, "JXClient:ClientSocket");
00204
00212 public ClientSocket(@NotNull final Model model, @Nullable final DebugWriter debugProtocol) throws IOException {
00213 this.model = model;
00214 this.debugProtocol = debugProtocol;
00215 selector = Selector.open();
00216 }
00217
00221 public void start() {
00222 if (debugProtocol != null) {
00223 debugProtocol.debugProtocolWrite("socket:start");
00224 }
00225 thread.start();
00226 }
00227
00232 public void stop() throws InterruptedException {
00233 if (debugProtocol != null) {
00234 debugProtocol.debugProtocolWrite("socket:stop");
00235 }
00236 thread.interrupt();
00237 thread.join();
00238 if (debugProtocol != null) {
00239 debugProtocol.debugProtocolWrite("socket:stopped");
00240 }
00241 }
00242
00247 public void addClientSocketListener(@NotNull final ClientSocketListener clientSocketListener) {
00248 clientSocketListeners.add(clientSocketListener);
00249 }
00250
00255 public void removeClientSocketListener(@NotNull final ClientSocketListener clientSocketListener) {
00256 clientSocketListeners.remove(clientSocketListener);
00257 }
00258
00264 public void connect(@NotNull final String host, final int port) {
00265 if (debugProtocol != null) {
00266 debugProtocol.debugProtocolWrite("socket:connect "+host+":"+port);
00267 }
00268 synchronized (syncConnect) {
00269 if (this.host == null || this.port == 0 || !this.host.equals(host) || this.port != port) {
00270 reconnect = true;
00271 reconnectReason = "connect";
00272 reconnectIsError = false;
00273 this.host = host;
00274 this.port = port;
00275 selector.wakeup();
00276 }
00277 }
00278 }
00279
00285 public void disconnect(@NotNull final String reason, final boolean isError) {
00286 if (debugProtocol != null) {
00287 debugProtocol.debugProtocolWrite("socket:disconnect: "+reason+(isError ? " [unexpected]" : ""));
00288 }
00289 synchronized (syncConnect) {
00290 if (host != null || port != 0) {
00291 reconnect = true;
00292 reconnectReason = reason;
00293 reconnectIsError = isError;
00294 host = null;
00295 port = 0;
00296 selector.wakeup();
00297 }
00298 }
00299 }
00300
00305 private void process() {
00306 while (!thread.isInterrupted()) {
00307 try {
00308 doReconnect();
00309 doConnect();
00310 updateWriteInterestOps();
00311 doTransceive();
00312 } catch (final EOFException ex) {
00313 final String tmp = ex.getMessage();
00314 final String message = tmp == null ? "EOF" : tmp;
00315 if (debugProtocol != null) {
00316 debugProtocol.debugProtocolWrite("socket:exception "+message, ex);
00317 }
00318 processDisconnect(message, false);
00319 } catch (final IOException ex) {
00320 final String tmp = ex.getMessage();
00321 final String message = tmp == null ? "I/O error" : tmp;
00322 if (debugProtocol != null) {
00323 debugProtocol.debugProtocolWrite("socket:exception "+message, ex);
00324 }
00325 processDisconnect(message, true);
00326 }
00327 }
00328 }
00329
00334 private void doConnect() throws IOException {
00335 final boolean notifyConnected;
00336 synchronized (syncOutput) {
00337 if (isConnected || socketChannel == null) {
00338 notifyConnected = false;
00339 } else {
00340 isConnected = socketChannel.finishConnect();
00341 if (isConnected) {
00342 interestOps = SelectionKey.OP_READ;
00343 updateInterestOps();
00344 notifyConnected = true;
00345 } else {
00346 notifyConnected = false;
00347 }
00348 }
00349 }
00350 if (notifyConnected) {
00351 for (final ClientSocketListener clientSocketListener : clientSocketListeners.getListeners()) {
00352 clientSocketListener.connected();
00353 }
00354 }
00355 }
00356
00361 private void doReconnect() throws IOException {
00362 assert Thread.currentThread() == thread;
00363
00364 final boolean doReconnect;
00365 final boolean doDisconnect;
00366 @Nullable final String disconnectReason;
00367 final boolean disconnectIsError;
00368 synchronized (syncConnect) {
00369 if (reconnect) {
00370 reconnect = false;
00371 if (host != null && port != 0) {
00372 doReconnect = true;
00373 doDisconnect = false;
00374 disconnectReason = "reconnect to "+host+":"+port;
00375 disconnectIsError = false;
00376 } else {
00377 doReconnect = false;
00378 doDisconnect = true;
00379 disconnectReason = reconnectReason;
00380 disconnectIsError = reconnectIsError;
00381 }
00382 } else {
00383 doReconnect = false;
00384 doDisconnect = false;
00385 disconnectReason = null;
00386 disconnectIsError = false;
00387 }
00388 }
00389 if (doReconnect) {
00390 assert disconnectReason != null;
00391 processDisconnect(disconnectReason, disconnectIsError);
00392 final String connectHost;
00393 final int connectPort;
00394 synchronized (syncConnect) {
00395 disconnectPending = true;
00396 connectHost = host;
00397 connectPort = port;
00398 }
00399 if (connectHost != null) {
00400 processConnect(connectHost, connectPort);
00401 }
00402 }
00403 if (doDisconnect) {
00404 processDisconnect(disconnectReason, disconnectIsError);
00405 }
00406 }
00407
00412 private void doTransceive() throws IOException {
00413 selector.select();
00414 final Collection<SelectionKey> selectedKeys = selector.selectedKeys();
00415 if (selectedKeys.remove(selectionKey) && isConnected) {
00416 processRead();
00417 processWrite();
00418 }
00419 assert selectedKeys.isEmpty();
00420 }
00421
00428 private void processConnect(@NotNull final String host, final int port) throws IOException {
00429 assert Thread.currentThread() == thread;
00430
00431 if (debugProtocol != null) {
00432 debugProtocol.debugProtocolWrite("socket:connecting to "+host+":"+port);
00433 }
00434 for (final ClientSocketListener clientSocketListener : clientSocketListeners.getListeners()) {
00435 clientSocketListener.connecting();
00436 }
00437
00438 final SocketAddress socketAddress = new InetSocketAddress(host, port);
00439 synchronized (syncOutput) {
00440 outputBuffer.clear();
00441 inputBuffer.clear();
00442 selectionKey = null;
00443 try {
00444 socketChannel = SocketChannel.open();
00445 selectableChannel = socketChannel.configureBlocking(false);
00446 try {
00447 isConnected = socketChannel.connect(socketAddress);
00448 } catch (final UnresolvedAddressException ex) {
00449
00450 throw new IOException("Cannot resolve address: "+socketAddress, ex);
00451 } catch (final IllegalArgumentException ex) {
00452 throw new IOException(ex.getMessage(), ex);
00453 }
00454 try {
00455 socketChannel.socket().setTcpNoDelay(true);
00456 } catch (final SocketException ex) {
00457 if (debugProtocol != null) {
00458 debugProtocol.debugProtocolWrite("socket:cannot set TCP_NODELAY option: "+ex.getMessage());
00459 }
00460 }
00461 interestOps = SelectionKey.OP_CONNECT;
00462 selectionKey = selectableChannel.register(selector, interestOps);
00463 } finally {
00464 if (selectionKey == null) {
00465 socketChannel = null;
00466 selectableChannel = null;
00467 isConnected = false;
00468 interestOps = 0;
00469 }
00470 }
00471 }
00472 }
00473
00479 private void processDisconnect(@NotNull final String reason, final boolean isError) {
00480 assert Thread.currentThread() == thread;
00481
00482 if (debugProtocol != null) {
00483 debugProtocol.debugProtocolWrite("socket:disconnecting: "+reason+(isError ? " [unexpected]" : ""));
00484 }
00485 final boolean notifyListeners;
00486 synchronized (syncConnect) {
00487 notifyListeners = disconnectPending;
00488 disconnectPending = false;
00489 }
00490 if (notifyListeners) {
00491 model.getGuiStateManager().disconnecting(reason, isError);
00492 for (final ClientSocketListener clientSocketListener : clientSocketListeners.getListeners()) {
00493 clientSocketListener.disconnecting(reason, isError);
00494 }
00495 }
00496
00497 try {
00498 synchronized (syncOutput) {
00499 if (selectionKey != null) {
00500 selectionKey.cancel();
00501 selectionKey = null;
00502 outputBuffer.clear();
00503
00504 try {
00505 if (socketChannel != null) {
00506 socketChannel.socket().shutdownOutput();
00507 }
00508 } catch (final IOException ignored) {
00509
00510 }
00511 try {
00512 if (socketChannel != null) {
00513 socketChannel.close();
00514 }
00515 } catch (final IOException ignored) {
00516
00517 }
00518 socketChannel = null;
00519 selectableChannel = null;
00520 inputBuffer.clear();
00521 }
00522 }
00523 } finally {
00524 if (notifyListeners) {
00525 model.getGuiStateManager().disconnected();
00526 for (final ClientSocketListener clientSocketListener : clientSocketListeners.getListeners()) {
00527 clientSocketListener.disconnected(reason);
00528 }
00529 }
00530 }
00531 }
00532
00537 private void processRead() throws IOException {
00538 synchronized (syncOutput) {
00539 if (socketChannel == null) {
00540 return;
00541 }
00542
00543 if (socketChannel.read(inputBuffer) == -1) {
00544 throw new EOFException();
00545 }
00546 }
00547 inputBuffer.flip();
00548 processReadCommand();
00549 inputBuffer.compact();
00550 }
00551
00555 private void processReadCommand() {
00556 while (true) {
00557 if (inputLen == -1) {
00558 if (inputBuffer.remaining() < 2) {
00559 break;
00560 }
00561
00562 inputLen = (inputBuffer.get()&0xFF)*0x100+(inputBuffer.get()&0xFF);
00563 }
00564
00565 if (inputBuffer.remaining() < inputLen) {
00566 break;
00567 }
00568
00569 final int start = inputBuffer.position();
00570 final int end = start+inputLen;
00571 inputBuffer.position(start+inputLen);
00572 inputLen = -1;
00573 final ByteBuffer packet = ByteBuffer.wrap(inputBuf, start, end-start);
00574 packet.order(ByteOrder.BIG_ENDIAN);
00575 try {
00576 for (final ClientSocketListener clientSocketListener : clientSocketListeners.getListeners()) {
00577 clientSocketListener.packetReceived(packet);
00578 }
00579 } catch (final UnknownCommandException ex) {
00580 disconnect(ex.getMessage(), true);
00581 break;
00582 }
00583 }
00584 }
00585
00593 public void writePacket(@NotNull final byte[] buf, final int len) {
00594 synchronized (syncOutput) {
00595 if (socketChannel == null) {
00596 return;
00597 }
00598
00599 packetHeader[0] = (byte)(len/0x100);
00600 packetHeader[1] = (byte)len;
00601 try {
00602 try {
00603 outputBuffer.put(packetHeader);
00604 outputBuffer.put(buf, 0, len);
00605 } catch (final BufferOverflowException ex) {
00606 throw new IOException("buffer overflow", ex);
00607 }
00608 } catch (final IOException ignored) {
00609
00610 try {
00611 socketChannel.close();
00612 } catch (final IOException ignored2) {
00613
00614 }
00615 return;
00616 }
00617 }
00618
00619 selector.wakeup();
00620 for (final ClientSocketListener clientSocketListener : clientSocketListeners.getListeners()) {
00621 clientSocketListener.packetSent(buf, len);
00622 }
00623 }
00624
00630 private void processWrite() throws IOException {
00631 synchronized (syncOutput) {
00632 if (outputBuffer.remaining() <= 0) {
00633 return;
00634 }
00635
00636 outputBuffer.flip();
00637 try {
00638 if (socketChannel != null) {
00639 socketChannel.write(outputBuffer);
00640 } else {
00641 outputBuffer.position(outputBuffer.limit());
00642 }
00643 } finally {
00644 outputBuffer.compact();
00645 }
00646 }
00647 }
00648
00653 private void updateWriteInterestOps() {
00654 synchronized (syncOutput) {
00655 final int newInterestOps;
00656 if (outputBuffer.position() > 0) {
00657 newInterestOps = interestOps|SelectionKey.OP_WRITE;
00658 } else {
00659 newInterestOps = interestOps&~SelectionKey.OP_WRITE;
00660 }
00661 if (interestOps != newInterestOps) {
00662 interestOps = newInterestOps;
00663 updateInterestOps();
00664 }
00665 }
00666 }
00667
00673 private void updateInterestOps() {
00674 if (debugProtocol != null) {
00675 debugProtocol.debugProtocolWrite("socket:set interest ops to "+interestOps);
00676 }
00677 assert Thread.holdsLock(syncOutput);
00678 if (selectionKey != null) {
00679 selectionKey.interestOps(interestOps);
00680 }
00681 }
00682
00683 }