diff --git a/pom.xml b/pom.xml
index ff21d27..d6b79f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,8 +46,6 @@
- 1.8
- 1.8
UTF-8
diff --git a/src/main/java/com/alterdekim/game/component/RTMPListener.java b/src/main/java/com/alterdekim/game/component/RTMPListener.java
index 83e2c59..e212460 100644
--- a/src/main/java/com/alterdekim/game/component/RTMPListener.java
+++ b/src/main/java/com/alterdekim/game/component/RTMPListener.java
@@ -13,6 +13,7 @@ import java.io.OutputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class RTMPListener extends Thread {
@@ -29,7 +30,7 @@ public class RTMPListener extends Thread {
private ConnectionState connectionState;
- private final Map connectionProcessors = new HashMap<>();
+ private final Map connectionProcessors = new ConcurrentHashMap<>();
public RTMPListener(Socket sock, RTMPServer parent, Integer uid, GameServer gameServer) {
try {
diff --git a/src/main/java/com/alterdekim/game/component/rtmp/ConnectedProcessor.java b/src/main/java/com/alterdekim/game/component/rtmp/ConnectedProcessor.java
index 4afbe8a..8415abb 100644
--- a/src/main/java/com/alterdekim/game/component/rtmp/ConnectedProcessor.java
+++ b/src/main/java/com/alterdekim/game/component/rtmp/ConnectedProcessor.java
@@ -3,13 +3,11 @@ package com.alterdekim.game.component.rtmp;
import com.alterdekim.game.component.GameServer;
import com.alterdekim.game.message.*;
import com.alterdekim.game.message.amf.*;
-import com.alterdekim.game.message.serializer.BinaryMessageSerializer;
+import com.alterdekim.game.utils.RTMPUtils;
import com.alterdekim.game.utils.StringUtils;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.javatuples.Pair;
import java.io.IOException;
import java.io.InputStream;
@@ -19,23 +17,19 @@ import java.util.*;
@Slf4j
public class ConnectedProcessor extends ConnectionProcessor {
-
- private final Long startTimestamp;
private ConnectedState state;
private final List message;
- private Integer expectedMessageLength;
- private Integer lastStreamId;
- private byte[] previousHeader;
+ private byte lastStreamId;
private final GameServer gameServer;
private Long playerId;
+ private final RTMPSession rtmpSessionInfo;
public ConnectedProcessor(InputStream inputStream, OutputStream outputStream, Socket sock, GameServer gameServer) {
super(inputStream, outputStream, sock);
this.state = ConnectedState.WAIT_FOR_MESSAGE;
this.message = new ArrayList<>();
this.gameServer = gameServer;
- this.previousHeader = new byte[11];
- this.startTimestamp = System.currentTimeMillis();
+ this.rtmpSessionInfo = new RTMPSession();
}
@Override
@@ -45,43 +39,87 @@ public class ConnectedProcessor extends ConnectionProcessor {
@Override
public ConnectionState process() throws IOException {
- int b = this.getInputStream().read();
+ byte[] b1 = new byte[1]; // this was left as is due to underlying conversion between int and byte types.
+ int b = this.getInputStream().read(b1);
if (b == -1) {
close(); return ConnectionState.CONNECTED;
}
- if( this.state == ConnectedState.WAIT_FOR_MESSAGE && b != 0x03 && b != 0x43 && b != 0x83 && b != 0x02 && b != 0xC3 ) {
- log.info("process() WAIT_FOR_MESSAGE not 03,43,83");
- return ConnectionState.CONNECTED;
- }
this.state = switch(state) {
- case WAIT_FOR_MESSAGE -> processWaitForMessage(parseChunkHeader(b));
- case LISTENS_FOR_MESSAGE -> processListensForMessage((byte) b);
+ case WAIT_FOR_MESSAGE -> processWaitForMessage(b1[0]);
+ case LISTENS_FOR_MESSAGE -> processListensForMessage(b1[0]);
};
return ConnectionState.CONNECTED;
}
- private ConnectedState processWaitForMessage(Pair packetHeader) {
+ private ConnectedState processWaitForMessage(byte b) {
try {
- byte[] header = switch (packetHeader.getValue0()) {
- case Full -> new byte[11];
- case WithoutStreamId -> new byte[7];
- case OnlyTimestamp -> new byte[3];
- default -> new byte[0];
- };
- if(this.getInputStream().read(header) == -1) return ConnectedState.WAIT_FOR_MESSAGE;
- if(header.length == 11) this.lastStreamId = BinaryMessageSerializer.deserializeInteger(Arrays.copyOfRange(header, 7, 11));
- this.message.clear();
- this.expectedMessageLength = BinaryMessageSerializer.deserializeInteger(
- Arrays.copyOfRange( header.length >= 7 ? header : this.previousHeader, 3, 6)
- );
- if( header.length >= 7 ) this.previousHeader = header;
- if( header.length == 11 && packetHeader.getValue1() == 2 ) {
- byte[] b = new byte[4];
- if( this.getInputStream().read(b) == -1 ) return ConnectedState.WAIT_FOR_MESSAGE;
- this.getOutputStream().write(new byte[] {0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x03, 0x00, 0x00, 0x00, 0x00, b[0], b[1], b[2], b[3]});
- this.getOutputStream().flush();
+ RTMPHeader header = new RTMPHeader();
+
+ header.setChunkType(RTMPHeader.ChunkType.fromByte((byte) ((0xC0 & b) >> 6)));
+ header.setChunkStreamId((byte) (b & 0x3F));
+
+ if( header.getChunkStreamId() != ChunkStreamInfo.CONTROL_CHANNEL &&
+ header.getChunkStreamId() != ChunkStreamInfo.RTMP_COMMAND_CHANNEL &&
+ header.getChunkStreamId() != ChunkStreamInfo.RTMP_STREAM_CHANNEL ) {
return ConnectedState.WAIT_FOR_MESSAGE;
}
+
+ switch (header.getChunkType()) {
+ case TYPE_0_FULL -> { // b00 = 12 byte header (full header)
+ // Read bytes 1-3: Absolute timestamp
+ header.setAbsoluteTimestamp(RTMPUtils.readUnsignedInt24(this.getInputStream()));
+ header.setTimestampDelta(0);
+ // Read bytes 4-6: Packet length
+ header.setPacketLength(RTMPUtils.readUnsignedInt24(this.getInputStream()));
+ // Read byte 7: Message type ID
+ byte[] b1 = new byte[1];
+ this.getInputStream().read(b1);
+ header.setMessageType(RTMPHeader.MessageType.valueOf(b1[0]));
+ // Read bytes 8-11: Message stream ID (apparently little-endian order)
+ byte[] messageStreamIdBytes = new byte[4];
+ this.getInputStream().read(messageStreamIdBytes);
+ header.setMessageStreamId(RTMPUtils.toUnsignedInt32LittleEndian(messageStreamIdBytes));
+ }
+ case TYPE_1_RELATIVE_LARGE -> { // b01 = 8 bytes - like type 0. not including message stream ID (4 last bytes)
+ // Read bytes 1-3: Timestamp delta
+ header.setTimestampDelta(RTMPUtils.readUnsignedInt24(this.getInputStream()));
+ // Read bytes 4-6: Packet length
+ header.setPacketLength(RTMPUtils.readUnsignedInt24(this.getInputStream()));
+ // Read byte 7: Message type ID
+ byte[] b1 = new byte[1];
+ this.getInputStream().read(b1);
+ header.setMessageType(RTMPHeader.MessageType.valueOf(b1[0]));
+ RTMPHeader prevHeader = rtmpSessionInfo.getLastHeader(header.getChunkStreamId());
+ try {
+ header.setMessageStreamId(prevHeader.getMessageStreamId());
+ header.setAbsoluteTimestamp(prevHeader.getAbsoluteTimestamp() + header.getTimestampDelta());
+ } catch (NullPointerException ex) {
+ header.setMessageStreamId(0);
+ header.setAbsoluteTimestamp(header.getTimestampDelta());
+ }
+ }
+ case TYPE_2_RELATIVE_TIMESTAMP_ONLY -> { // b10 = 4 bytes - Basic Header and timestamp (3 bytes) are included
+ // Read bytes 1-3: Timestamp delta
+ header.setTimestampDelta(RTMPUtils.readUnsignedInt24(this.getInputStream()));
+ RTMPHeader prevHeader = rtmpSessionInfo.getLastHeader(header.getChunkStreamId());
+ header.setPacketLength(prevHeader.getPacketLength());
+ header.setMessageType(prevHeader.getMessageType());
+ header.setMessageStreamId(prevHeader.getMessageStreamId());
+ header.setAbsoluteTimestamp(prevHeader.getAbsoluteTimestamp() + header.getTimestampDelta());
+ }
+ case TYPE_3_RELATIVE_SINGLE_BYTE -> { // b11 = 1 byte: basic header only
+ RTMPHeader prevHeader = rtmpSessionInfo.getLastHeader(header.getChunkStreamId());
+ header.setTimestampDelta(prevHeader.getTimestampDelta());
+ header.setAbsoluteTimestamp(prevHeader.getAbsoluteTimestamp() + header.getTimestampDelta());
+ header.setPacketLength(prevHeader.getPacketLength());
+ header.setMessageType(prevHeader.getMessageType());
+ header.setMessageStreamId(prevHeader.getMessageStreamId());
+ }
+ }
+ this.message.clear();
+ this.lastStreamId = header.getChunkStreamId();
+ this.rtmpSessionInfo.chunkStreams.put(this.lastStreamId, header);
+
return ConnectedState.LISTENS_FOR_MESSAGE;
} catch (IOException e) {
log.error("ConnectedProcessor.processWaitForMessage()", e);
@@ -90,13 +128,13 @@ public class ConnectedProcessor extends ConnectionProcessor {
}
private ConnectedState processListensForMessage(byte b) {
- if(this.message.size() >= this.expectedMessageLength) {
+ if(this.message.size() >= this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getPacketLength()) {
this.processMessage();
return ConnectedState.WAIT_FOR_MESSAGE;
}
- if( b == (byte) 0xC3 ) { this.expectedMessageLength--; return ConnectedState.LISTENS_FOR_MESSAGE; }
+ if( b == (byte) 0xC3 ) { this.rtmpSessionInfo.getLastHeader(this.lastStreamId).setPacketLength(this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getPacketLength()-1); return ConnectedState.LISTENS_FOR_MESSAGE; }
this.message.add(b);
- if(this.message.size() >= this.expectedMessageLength) {
+ if(this.message.size() >= this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getPacketLength()) {
this.processMessage();
return ConnectedState.WAIT_FOR_MESSAGE;
}
@@ -104,6 +142,24 @@ public class ConnectedProcessor extends ConnectionProcessor {
}
private void processMessage() {
+ switch (this.lastStreamId) {
+ case ChunkStreamInfo.CONTROL_CHANNEL -> processControlChannel();
+ case ChunkStreamInfo.RTMP_COMMAND_CHANNEL -> processCommandChannel();
+ case ChunkStreamInfo.RTMP_STREAM_CHANNEL -> processStreamChannel();
+ default -> log.warn("Unknown channel id: {}", this.lastStreamId);
+ }
+ }
+
+ private void processControlChannel() {
+ byte[] bytes = SerializerUtils.bytesToPrimitive(message);
+ log.info("ConnectedProcessor.processMessage() controlMessage: {}", StringUtils.bytesToHex(bytes));
+ }
+
+ private void processCommandChannel() {
+ if( this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getMessageType() != RTMPHeader.MessageType.COMMAND_AMF0 ) {
+ log.info("ConnectedProcessor.processMessage() type: {}", this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getMessageType());
+ return;
+ }
byte[] bytes = SerializerUtils.bytesToPrimitive(message);
log.info("ConnectedProcessor.processMessage() messageString: {}", StringUtils.bytesToHex(bytes));
try {
@@ -113,6 +169,7 @@ public class ConnectedProcessor extends ConnectionProcessor {
mr.get(0).getValue().equals("connect") ) {
this.playerId = Long.parseLong((String) mr.get(3).getValue());
this.gameServer.onConnect(this.playerId, (String) mr.get(5).getValue(), this);
+
this.getOutputStream().write( StringUtils.hexStringToByteArray("020000000000040500000000002625A0020000000000050600000000002625A00202000000000004010000000000001000030000000000F214000000000200075F726573756C74003FF0000000000000030006666D7356657202000E464D532F342C302C302C31313231000C6361706162696C697469657300406FE0000000000000046D6F6465003FF00000000000000000090300056C6576656C0200067374617475730004636F646502001D4E6574436F6E6E656374696F6E2E436F6E6E6563742E53756363657373000B6465736372697074696F6E020015436F6E6E656374696F6E207375636365656465642E000E6F626A656374456E636F64696E670000000000000000000004646174610800000000000776657273696F6E02000A342C302C302C31313231000009000009") );
this.getOutputStream().flush();
return;
@@ -123,9 +180,8 @@ public class ConnectedProcessor extends ConnectionProcessor {
}
}
- // returns chunk type and stream id
- private Pair parseChunkHeader(int b) {
- return Pair.with(ChunkHeaderType.fromInt((b & 0xC0) >> 6), b & 0x3F);
+ private void processStreamChannel() {
+ log.warn("There's stream message.");
}
@Override
@@ -146,7 +202,7 @@ public class ConnectedProcessor extends ConnectionProcessor {
cnt++;
}
}
- l.addAll(int2bytes((int) ((System.currentTimeMillis() - this.startTimestamp) / 1000L), 3));
+ l.addAll(int2bytes((int) ((System.currentTimeMillis() - this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getAbsoluteTimestamp()) / 1000L), 3));
l.addAll(int2bytes(data.size() - cnt, 3));
l.add((byte) 0x14);
l.addAll(int2bytes(this.lastStreamId, 4));
@@ -168,52 +224,27 @@ public class ConnectedProcessor extends ConnectionProcessor {
super.close();
}
- @AllArgsConstructor
- private enum ChunkHeaderType {
- Full(0),
- WithoutStreamId(1),
- OnlyTimestamp(2),
- UsePrevious(3),
- Unknown(4);
-
- private final int type;
-
- private static ChunkHeaderType fromInt(int b) {
- for( ChunkHeaderType c : values() ) {
- if( c.type == b ) return c;
- }
- return Unknown;
- }
- }
-
private class RTMPSession {
- private int windowBytesRead;
-
- @Getter
- private int acknowledgementWindowSize = Integer.MAX_VALUE;
- private int totalBytesRead = 0;
@Getter
@Setter
private int chunkSize = 128;
- private Map chunkStreams = new HashMap<>();
+ private Map chunkStreams = new HashMap<>();
- public ChunkStreamInfo getChunkStreamInfo(byte chunkStreamId) {
- ChunkStreamInfo chunkStreamInfo = chunkStreams.get(chunkStreamId);
+ public RTMPHeader getLastHeader(byte chunkStreamId) {
+ RTMPHeader chunkStreamInfo = chunkStreams.get(chunkStreamId);
if (chunkStreamInfo == null) {
- chunkStreamInfo = new ChunkStreamInfo();
+ chunkStreamInfo = new RTMPHeader();
chunkStreams.put(chunkStreamId, chunkStreamInfo);
}
return chunkStreamInfo;
}
}
- private class ChunkStreamInfo {
- private static final byte RTMP_STREAM_CHANNEL = 0x08;
- private static final byte RTMP_COMMAND_CHANNEL = 0x03;
- private static final byte CONTROL_CHANNEL = 0x02;
-
-
+ private interface ChunkStreamInfo {
+ byte RTMP_STREAM_CHANNEL = 0x08;
+ byte RTMP_COMMAND_CHANNEL = 0x03;
+ byte CONTROL_CHANNEL = 0x02;
}
}
diff --git a/src/main/java/com/alterdekim/game/component/rtmp/RTMPHeader.java b/src/main/java/com/alterdekim/game/component/rtmp/RTMPHeader.java
new file mode 100644
index 0000000..3140038
--- /dev/null
+++ b/src/main/java/com/alterdekim/game/component/rtmp/RTMPHeader.java
@@ -0,0 +1,182 @@
+package com.alterdekim.game.component.rtmp;
+
+import lombok.*;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.InputStream;
+
+@ToString
+@Getter
+@Setter
+@NoArgsConstructor
+public class RTMPHeader {
+ private ChunkType chunkType;
+ private byte chunkStreamId;
+ private int absoluteTimestamp;
+ private int timestampDelta = -1;
+ private int packetLength;
+ private MessageType messageType;
+ private int messageStreamId;
+
+ @Getter
+ @RequiredArgsConstructor
+ public enum ChunkType {
+
+ /**
+ * Full 12-byte RTMP chunk header
+ */
+ TYPE_0_FULL((byte) 0x00, 12),
+ /**
+ * Relative 8-byte RTMP chunk header (message stream ID is not included)
+ */
+ TYPE_1_RELATIVE_LARGE((byte) 0x01, 8),
+ /**
+ * Relative 4-byte RTMP chunk header (only timestamp delta)
+ */
+ TYPE_2_RELATIVE_TIMESTAMP_ONLY((byte) 0x02, 4),
+ /**
+ * Relative 1-byte RTMP chunk header (no "real" header, just the 1-byte indicating chunk header type & chunk stream ID)
+ */
+ TYPE_3_RELATIVE_SINGLE_BYTE((byte) 0x03, 1);
+ /**
+ * The byte value of this chunk header type
+ */
+ private final byte value;
+ /**
+ * The full size (in bytes) of this RTMP header (including the basic header byte)
+ */
+ private final int size;
+
+ public static ChunkType fromByte(byte b) {
+ for( ChunkType c : values() ) {
+ if( c.value == b ) return c;
+ }
+ return TYPE_3_RELATIVE_SINGLE_BYTE;
+ }
+ }
+
+ @Slf4j
+ @Getter
+ @RequiredArgsConstructor
+ public enum MessageType {
+
+ /**
+ * Protocol control message 1
+ * Set Chunk Size, is used to notify the peer a new maximum chunk size to use.
+ */
+ SET_CHUNK_SIZE((byte) 0x01),
+ /**
+ * Protocol control message 2
+ * Abort Message, is used to notify the peer if it is waiting for chunks
+ * to complete a message, then to discard the partially received message
+ * over a chunk stream and abort processing of that message.
+ */
+ ABORT((byte) 0x02),
+ /**
+ * Protocol control message 3
+ * The client or the server sends the acknowledgment to the peer after
+ * receiving bytes equal to the window size. The window size is the
+ * maximum number of bytes that the sender sends without receiving
+ * acknowledgment from the receiver.
+ */
+ ACKNOWLEDGEMENT((byte) 0x03),
+ /**
+ * Protocol control message 4
+ * The client or the server sends this message to notify the peer about
+ * the user control events. This message carries Event type and Event
+ * data.
+ * Also known as a PING message in some RTMP implementations.
+ */
+ USER_CONTROL_MESSAGE((byte) 0x04),
+ /**
+ * Protocol control message 5
+ * The client or the server sends this message to inform the peer which
+ * window size to use when sending acknowledgment.
+ * Also known as ServerBW ("server bandwidth") in some RTMP implementations.
+ */
+ WINDOW_ACKNOWLEDGEMENT_SIZE((byte) 0x05),
+ /**
+ * Protocol control message 6
+ * The client or the server sends this message to update the output
+ * bandwidth of the peer. The output bandwidth value is the same as the
+ * window size for the peer.
+ * Also known as ClientBW ("client bandwidth") in some RTMP implementations.
+ */
+ SET_PEER_BANDWIDTH((byte) 0x06),
+ /**
+ * RTMP audio packet (0x08)
+ * The client or the server sends this message to send audio data to the peer.
+ */
+ AUDIO((byte) 0x08),
+ /**
+ * RTMP video packet (0x09)
+ * The client or the server sends this message to send video data to the peer.
+ */
+ VIDEO((byte) 0x09),
+ /**
+ * RTMP message type 0x0F
+ * The client or the server sends this message to send Metadata or any
+ * user data to the peer. Metadata includes details about the data (audio, video etc.)
+ * like creation time, duration, theme and so on.
+ * This is the AMF3-encoded version.
+ */
+ DATA_AMF3((byte) 0x0F),
+ /**
+ * RTMP message type 0x10
+ * A shared object is a Flash object (a collection of name value pairs)
+ * that are in synchronization across multiple clients, instances, and
+ * so on.
+ * This is the AMF3 version: kMsgContainerEx=16 for AMF3.
+ */
+ SHARED_OBJECT_AMF3((byte) 0x10),
+ /**
+ * RTMP message type 0x11
+ * Command messages carry the AMF-encoded commands between the client
+ * and the server.
+ * A command message consists of command name, transaction ID, and command object that
+ * contains related parameters.
+ * This is the AMF3-encoded version.
+ */
+ COMMAND_AMF3((byte) 0x11),
+ /**
+ * RTMP message type 0x12
+ * The client or the server sends this message to send Metadata or any
+ * user data to the peer. Metadata includes details about the data (audio, video etc.)
+ * like creation time, duration, theme and so on.
+ * This is the AMF0-encoded version.
+ */
+ DATA_AMF0((byte) 0x12),
+ /**
+ * RTMP message type 0x14
+ * Command messages carry the AMF-encoded commands between the client
+ * and the server.
+ * A command message consists of command name, transaction ID, and command object that
+ * contains related parameters.
+ * This is the common AMF0 version, also known as INVOKE in some RTMP implementations.
+ */
+ COMMAND_AMF0((byte) 0x14),
+ /**
+ * RTMP message type 0x13
+ * A shared object is a Flash object (a collection of name value pairs)
+ * that are in synchronization across multiple clients, instances, and
+ * so on.
+ * This is the AMF0 version: kMsgContainer=19 for AMF0.
+ */
+ SHARED_OBJECT_AMF0((byte) 0x13),
+ /**
+ * RTMP message type 0x16
+ * An aggregate message is a single message that contains a list of sub-messages.
+ */
+ AGGREGATE_MESSAGE((byte) 0x16);
+
+ private final byte value;
+
+ public static MessageType valueOf(byte b) {
+ for( MessageType c : values() ) {
+ if( c.value == b ) return c;
+ }
+ log.warn("gosh im tired: {}", b);
+ return MessageType.ABORT;
+ }
+ }
+}
diff --git a/src/main/java/com/alterdekim/game/utils/RTMPUtils.java b/src/main/java/com/alterdekim/game/utils/RTMPUtils.java
new file mode 100644
index 0000000..1dc340d
--- /dev/null
+++ b/src/main/java/com/alterdekim/game/utils/RTMPUtils.java
@@ -0,0 +1,23 @@
+package com.alterdekim.game.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class RTMPUtils {
+ public static int readUnsignedInt32(InputStream in) throws IOException {
+ return ((in.read() & 0xff) << 24) | ((in.read() & 0xff) << 16) | ((in.read() & 0xff) << 8) | (in.read() & 0xff);
+ }
+
+ public static int readUnsignedInt24(InputStream in) throws IOException {
+ byte[] b = new byte[3]; in.read(b);
+ return ((b[0] & 0xff) << 16) | ((b[1] & 0xff) << 8) | (b[2] & 0xff);
+ }
+
+ public static int readUnsignedInt16(InputStream in) throws IOException {
+ return ((in.read() & 0xff) << 8) | (in.read() & 0xff);
+ }
+
+ public static int toUnsignedInt32LittleEndian(byte[] bytes) {
+ return ((bytes[3] & 0xff) << 24) | ((bytes[2] & 0xff) << 16) | ((bytes[1] & 0xff) << 8) | (bytes[0] & 0xff);
+ }
+}