Crossfire JXClient, Trunk  R20561
ClientSocket.java
Go to the documentation of this file.
1 /*
2  * This file is part of JXClient, the Fullscreen Java Crossfire Client.
3  *
4  * JXClient is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * JXClient is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with JXClient; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  * Copyright (C) 2005-2008 Yann Chachkoff.
19  * Copyright (C) 2006-2011 Andreas Kirschbaum.
20  */
21 
22 package com.realtime.crossfire.jxclient.server.socket;
23 
27 import java.io.EOFException;
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.SocketAddress;
31 import java.net.SocketException;
32 import java.nio.BufferOverflowException;
33 import java.nio.ByteBuffer;
34 import java.nio.ByteOrder;
35 import java.nio.channels.SelectableChannel;
36 import java.nio.channels.SelectionKey;
37 import java.nio.channels.Selector;
38 import java.nio.channels.SocketChannel;
39 import java.nio.channels.UnresolvedAddressException;
40 import java.util.Collection;
41 import org.jetbrains.annotations.NotNull;
42 import org.jetbrains.annotations.Nullable;
43 
53 public class ClientSocket {
54 
58  private static final int MAXIMUM_PACKET_SIZE = 65536;
59 
63  @NotNull
64  private final Model model;
65 
70  @Nullable
71  private final DebugWriter debugProtocol;
72 
76  @NotNull
78 
82  @NotNull
83  private final Selector selector;
84 
89  @NotNull
90  private final Object syncConnect = new Object();
91 
96  private boolean reconnect = true;
97 
101  @NotNull
102  private String reconnectReason = "disconnect";
103 
107  private boolean reconnectIsError;
108 
112  @Nullable
113  private String host;
114 
118  private int port;
119 
123  private boolean disconnectPending;
124 
128  @NotNull
129  private final byte[] packetHeader = new byte[2];
130 
134  @Nullable
135  private SelectableChannel selectableChannel;
136 
141  @Nullable
142  private SelectionKey selectionKey;
143 
147  private int interestOps;
148 
152  @NotNull
153  private final byte[] inputBuf = new byte[2+MAXIMUM_PACKET_SIZE];
154 
158  @NotNull
159  private final ByteBuffer inputBuffer = ByteBuffer.wrap(inputBuf);
160 
166  private int inputLen = -1;
167 
172  @NotNull
173  private final Object syncOutput = new Object();
174 
178  @NotNull
179  private final ByteBuffer outputBuffer = ByteBuffer.allocate(2+MAXIMUM_PACKET_SIZE);
180 
185  @Nullable
186  private SocketChannel socketChannel;
187 
191  private boolean isConnected;
192 
196  @NotNull
197  private final Thread thread = new Thread(this::process, "JXClient:ClientSocket");
198 
206  public ClientSocket(@NotNull final Model model, @Nullable final DebugWriter debugProtocol) throws IOException {
207  this.model = model;
208  this.debugProtocol = debugProtocol;
209  selector = Selector.open();
210  }
211 
215  public void start() {
216  if (debugProtocol != null) {
217  debugProtocol.debugProtocolWrite("socket:start");
218  }
219  thread.start();
220  }
221 
226  public void stop() throws InterruptedException {
227  if (debugProtocol != null) {
228  debugProtocol.debugProtocolWrite("socket:stop");
229  }
230  thread.interrupt();
231  try {
232  selector.close();
233  } catch (final IOException ex) {
234  if (debugProtocol != null) {
235  debugProtocol.debugProtocolWrite("close failed: "+ex.getMessage());
236  }
237  }
238  thread.join();
239  if (debugProtocol != null) {
240  debugProtocol.debugProtocolWrite("socket:stopped");
241  }
242  }
243 
248  public void addClientSocketListener(@NotNull final ClientSocketListener clientSocketListener) {
249  clientSocketListeners.add(clientSocketListener);
250  }
251 
256  public void removeClientSocketListener(@NotNull final ClientSocketListener clientSocketListener) {
257  clientSocketListeners.remove(clientSocketListener);
258  }
259 
265  public void connect(@NotNull final String host, final int port) {
266  if (debugProtocol != null) {
267  debugProtocol.debugProtocolWrite("socket:connect "+host+":"+port);
268  }
269  synchronized (syncConnect) {
270  if (this.host == null || this.port == 0 || !this.host.equals(host) || this.port != port) {
271  reconnect = true;
272  reconnectReason = "connect";
273  reconnectIsError = false;
274  this.host = host;
275  this.port = port;
276  selector.wakeup();
277  }
278  }
279  }
280 
286  public void disconnect(@NotNull final String reason, final boolean isError) {
287  if (debugProtocol != null) {
288  debugProtocol.debugProtocolWrite("socket:disconnect: "+reason+(isError ? " [unexpected]" : ""));
289  }
290  synchronized (syncConnect) {
291  if (host != null || port != 0) {
292  reconnect = true;
293  reconnectReason = reason;
294  reconnectIsError = isError;
295  host = null;
296  port = 0;
297  selector.wakeup();
298  }
299  }
300  }
301 
306  private void process() {
307  while (!thread.isInterrupted()) {
308  try {
309  doReconnect();
310  doConnect();
312  doTransceive();
313  } catch (final EOFException ex) {
314  final String tmp = ex.getMessage();
315  final String message = tmp == null ? "EOF" : tmp;
316  if (debugProtocol != null) {
317  debugProtocol.debugProtocolWrite("socket:exception "+message, ex);
318  }
319  processDisconnect(message, false);
320  } catch (final IOException ex) {
321  final String tmp = ex.getMessage();
322  final String message = tmp == null ? "I/O error" : tmp;
323  if (debugProtocol != null) {
324  debugProtocol.debugProtocolWrite("socket:exception "+message, ex);
325  }
326  processDisconnect(message, true);
327  }
328  }
329  }
330 
335  private void doConnect() throws IOException {
336  final boolean notifyConnected;
337  synchronized (syncOutput) {
338  if (isConnected || socketChannel == null) {
339  notifyConnected = false;
340  } else {
341  isConnected = socketChannel.finishConnect();
342  if (isConnected) {
343  interestOps = SelectionKey.OP_READ;
345  notifyConnected = true;
346  } else {
347  notifyConnected = false;
348  }
349  }
350  }
351  if (notifyConnected) {
352  for (final ClientSocketListener clientSocketListener : clientSocketListeners) {
353  clientSocketListener.connected();
354  }
355  }
356  }
357 
362  private void doReconnect() throws IOException {
363  assert Thread.currentThread() == thread;
364 
365  final boolean doReconnect;
366  final boolean doDisconnect;
367  @Nullable final String disconnectReason;
368  final boolean disconnectIsError;
369  synchronized (syncConnect) {
370  if (reconnect) {
371  reconnect = false;
372  if (host != null && port != 0) {
373  doReconnect = true;
374  doDisconnect = false;
375  disconnectReason = "reconnect to "+host+":"+port;
376  disconnectIsError = false;
377  } else {
378  doReconnect = false;
379  doDisconnect = true;
380  disconnectReason = reconnectReason;
381  disconnectIsError = reconnectIsError;
382  }
383  } else {
384  doReconnect = false;
385  doDisconnect = false;
386  disconnectReason = null;
387  disconnectIsError = false;
388  }
389  }
390  if (doReconnect) {
391  assert disconnectReason != null;
392  processDisconnect(disconnectReason, disconnectIsError);
393  final String connectHost;
394  final int connectPort;
395  synchronized (syncConnect) {
396  disconnectPending = true;
397  connectHost = host;
398  connectPort = port;
399  }
400  if (connectHost != null) {
401  processConnect(connectHost, connectPort);
402  }
403  }
404  if (doDisconnect) {
405  processDisconnect(disconnectReason, disconnectIsError);
406  }
407  }
408 
413  private void doTransceive() throws IOException {
414  selector.select();
415  final Collection<SelectionKey> selectedKeys = selector.selectedKeys();
416  if (selectedKeys.remove(selectionKey) && isConnected) {
417  processRead();
418  processWrite();
419  }
420  assert selectedKeys.isEmpty();
421  }
422 
429  private void processConnect(@NotNull final String host, final int port) throws IOException {
430  assert Thread.currentThread() == thread;
431 
432  if (debugProtocol != null) {
433  debugProtocol.debugProtocolWrite("socket:connecting to "+host+":"+port);
434  }
435  for (final ClientSocketListener clientSocketListener : clientSocketListeners) {
436  clientSocketListener.connecting();
437  }
438 
439  final SocketAddress socketAddress = new InetSocketAddress(host, port);
440  synchronized (syncOutput) {
441  outputBuffer.clear();
442  inputBuffer.clear();
443  selectionKey = null;
444  try {
445  socketChannel = SocketChannel.open();
446  selectableChannel = socketChannel.configureBlocking(false);
447  try {
448  isConnected = socketChannel.connect(socketAddress);
449  } catch (final UnresolvedAddressException ex) {
450  //noinspection ObjectToString
451  throw new IOException("Cannot resolve address: "+socketAddress, ex);
452  } catch (final IllegalArgumentException ex) {
453  throw new IOException(ex.getMessage(), ex);
454  }
455  try {
456  socketChannel.socket().setTcpNoDelay(true);
457  } catch (final SocketException ex) {
458  if (debugProtocol != null) {
459  debugProtocol.debugProtocolWrite("socket:cannot set TCP_NODELAY option: "+ex.getMessage());
460  }
461  }
462  interestOps = SelectionKey.OP_CONNECT;
463  selectionKey = selectableChannel.register(selector, interestOps);
464  } finally {
465  if (selectionKey == null) {
466  socketChannel = null;
467  selectableChannel = null;
468  isConnected = false;
469  interestOps = 0;
470  }
471  }
472  }
473  }
474 
480  private void processDisconnect(@NotNull final String reason, final boolean isError) {
481  assert Thread.currentThread() == thread;
482 
483  if (debugProtocol != null) {
484  debugProtocol.debugProtocolWrite("socket:disconnecting: "+reason+(isError ? " [unexpected]" : ""));
485  }
486  final boolean notifyListeners;
487  synchronized (syncConnect) {
488  notifyListeners = disconnectPending;
489  disconnectPending = false;
490  }
491  if (notifyListeners) {
492  model.getGuiStateManager().disconnecting(reason, isError);
493  for (final ClientSocketListener clientSocketListener : clientSocketListeners) {
494  clientSocketListener.disconnecting(reason, isError);
495  }
496  }
497 
498  try {
499  synchronized (syncOutput) {
500  if (selectionKey != null) {
501  selectionKey.cancel();
502  selectionKey = null;
503  outputBuffer.clear();
504 
505  try {
506  if (socketChannel != null) {
507  socketChannel.socket().shutdownOutput();
508  }
509  } catch (final IOException ignored) {
510  // ignore
511  }
512  try {
513  if (socketChannel != null) {
514  socketChannel.close();
515  }
516  } catch (final IOException ignored) {
517  // ignore
518  }
519  socketChannel = null;
520  selectableChannel = null;
521  inputBuffer.clear();
522  }
523  }
524  } finally {
525  if (notifyListeners) {
527  for (final ClientSocketListener clientSocketListener : clientSocketListeners) {
528  clientSocketListener.disconnected(reason);
529  }
530  }
531  }
532  }
533 
538  private void processRead() throws IOException {
539  synchronized (syncOutput) {
540  if (socketChannel == null) {
541  return;
542  }
543 
544  if (socketChannel.read(inputBuffer) == -1) {
545  throw new EOFException("EOF");
546  }
547  }
548  inputBuffer.flip();
550  inputBuffer.compact();
551  }
552 
556  private void processReadCommand() {
557  while (true) {
558  if (inputLen == -1) {
559  if (inputBuffer.remaining() < 2) {
560  break;
561  }
562 
563  inputLen = (inputBuffer.get()&0xFF)*0x100+(inputBuffer.get()&0xFF);
564  }
565 
566  if (inputBuffer.remaining() < inputLen) {
567  break;
568  }
569 
570  final int start = inputBuffer.position();
571  final int end = start+inputLen;
572  inputBuffer.position(start+inputLen);
573  inputLen = -1;
574  final ByteBuffer packet = ByteBuffer.wrap(inputBuf, start, end-start);
575  packet.order(ByteOrder.BIG_ENDIAN);
576  try {
577  for (final ClientSocketListener clientSocketListener : clientSocketListeners) {
578  clientSocketListener.packetReceived(packet);
579  }
580  } catch (final UnknownCommandException ex) {
581  disconnect(ex.getMessage(), true);
582  break;
583  }
584  }
585  }
586 
594  public void writePacket(@NotNull final byte[] buf, final int len) {
595  synchronized (syncOutput) {
596  if (socketChannel == null) {
597  return;
598  }
599 
600  packetHeader[0] = (byte)(len/0x100);
601  packetHeader[1] = (byte)len;
602  try {
603  try {
604  outputBuffer.put(packetHeader);
605  outputBuffer.put(buf, 0, len);
606  } catch (final BufferOverflowException ex) {
607  throw new IOException("buffer overflow", ex);
608  }
609  } catch (final IOException ignored) {
610  //noinspection UnusedCatchParameter
611  try {
612  socketChannel.close();
613  } catch (final IOException ignore) {
614  // ignore
615  }
616  return;
617  }
618  }
619 
620  selector.wakeup();
621  for (final ClientSocketListener clientSocketListener : clientSocketListeners) {
622  clientSocketListener.packetSent(buf, len);
623  }
624  }
625 
631  private void processWrite() throws IOException {
632  synchronized (syncOutput) {
633  if (outputBuffer.remaining() <= 0) {
634  return;
635  }
636 
637  outputBuffer.flip();
638  try {
639  if (socketChannel == null) {
640  outputBuffer.position(outputBuffer.limit());
641  } else {
642  socketChannel.write(outputBuffer);
643  }
644  } finally {
645  outputBuffer.compact();
646  }
647  }
648  }
649 
654  private void updateWriteInterestOps() {
655  synchronized (syncOutput) {
656  final int newInterestOps;
657  //noinspection IfMayBeConditional
658  if (outputBuffer.position() > 0) {
659  newInterestOps = interestOps|SelectionKey.OP_WRITE;
660  } else {
661  newInterestOps = interestOps&~SelectionKey.OP_WRITE;
662  }
663  if (interestOps != newInterestOps) {
664  interestOps = newInterestOps;
666  }
667  }
668  }
669 
674  private void updateInterestOps() {
675  if (debugProtocol != null) {
676  debugProtocol.debugProtocolWrite("socket:set interest ops to "+interestOps);
677  }
678  assert Thread.holdsLock(syncOutput);
679  if (selectionKey != null) {
680  selectionKey.interestOps(interestOps);
681  }
682  }
683 
684 }
boolean isConnected
Whether socketChannel is connected.
void updateWriteInterestOps()
Updates interestOps&#39;s OP_WRITE according to whether outputBuffer has pending data.
void processReadCommand()
Parses data from inputBuffer into commands.
void disconnect(@NotNull final String reason, final boolean isError)
Terminates the connection.
void processConnect(@NotNull final String host, final int port)
Connects the socket.
boolean reconnectIsError
Only valid if reconnect is set.
void doConnect()
Processes pending connect requests.
int interestOps
The currently set interest ops for selectionKey.
final Object syncOutput
Synchronization object for outputBuffer, selectionKey, interestOps, and socketChannel.
void processRead()
Reads data from the socket and parses the data into commands.
void debugProtocolWrite(@NotNull final CharSequence str)
Writes a message to the debug protocol.
void disconnected()
Called after the connection has been closed.
void addClientSocketListener(@NotNull final ClientSocketListener clientSocketListener)
Adds a ClientSocketListener to be notified.
Writer debug information to a log file.
SelectionKey selectionKey
The SelectionKey registered to selectableChannel.
void processWrite()
Writes some pending data to the socket.
final Model model
The Model instance that is updated.
final EventListenerList2< ClientSocketListener > clientSocketListeners
The ClientSocketListeners to notify.
void updateInterestOps()
Updates selectionKey&#39;s interest ops to match interestOps.
final byte [] packetHeader
A buffer for sending packets.
String reconnectReason
Only valid if reconnect is set.
SelectableChannel selectableChannel
The SelectableChannel of socketChannel.
Combines all model classes that are updated.
Definition: Model.java:44
SocketChannel socketChannel
The SocketChannel when connected.
void disconnecting(@NotNull final String reason, final boolean isError)
Called when the connection is being teared down.
An UnknownCommandException is generated whenever an unknown message packet is received from the serve...
void add(@NotNull final T listener)
Adds a listener.
Interface for listeners interested in ClientSocket related events.
void doReconnect()
Processes pending re- or disconnect requests.
void writePacket(@NotNull final byte[] buf, final int len)
Writes a packet.
void processDisconnect(@NotNull final String reason, final boolean isError)
Disconnects the socket.
final Object syncConnect
Synchronization object for reconnect, host, port, and disconnectPending.
ClientSocket(@NotNull final Model model, @Nullable final DebugWriter debugProtocol)
Creates a new instance.
void doTransceive()
Processes pending data to receive of transmit.
void connect(@NotNull final String host, final int port)
Connects to a server.
final Thread thread
The Thread used to operate the socket.
void removeClientSocketListener(@NotNull final ClientSocketListener clientSocketListener)
Removes a ClientSocketListener to be notified.
boolean reconnect
Set if host or port has changed and thus a reconnect is needed.
final Selector selector
The Selector used for waiting.
final DebugWriter debugProtocol
The appender to write state changes to.
void process()
Reads/writes data from/to the socket.
void remove(@NotNull final T listener)
Removes a listener.
static final int MAXIMUM_PACKET_SIZE
The maximum payload size of a Crossfire protocol packet.