RTMP update

This commit is contained in:
Michael Wain 2025-02-08 01:17:50 +03:00
parent 679ed915bf
commit 0a39a65fc6
5 changed files with 314 additions and 79 deletions

View File

@ -46,8 +46,6 @@
</parent>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

View File

@ -13,6 +13,7 @@ import java.io.OutputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class RTMPListener extends Thread {
@ -29,7 +30,7 @@ public class RTMPListener extends Thread {
private ConnectionState connectionState;
private final Map<ConnectionState, ConnectionProcessor> connectionProcessors = new HashMap<>();
private final Map<ConnectionState, ConnectionProcessor> connectionProcessors = new ConcurrentHashMap<>();
public RTMPListener(Socket sock, RTMPServer parent, Integer uid, GameServer gameServer) {
try {

View File

@ -3,13 +3,11 @@ package com.alterdekim.game.component.rtmp;
import com.alterdekim.game.component.GameServer;
import com.alterdekim.game.message.*;
import com.alterdekim.game.message.amf.*;
import com.alterdekim.game.message.serializer.BinaryMessageSerializer;
import com.alterdekim.game.utils.RTMPUtils;
import com.alterdekim.game.utils.StringUtils;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.javatuples.Pair;
import java.io.IOException;
import java.io.InputStream;
@ -19,23 +17,19 @@ import java.util.*;
@Slf4j
public class ConnectedProcessor extends ConnectionProcessor {
private final Long startTimestamp;
private ConnectedState state;
private final List<Byte> message;
private Integer expectedMessageLength;
private Integer lastStreamId;
private byte[] previousHeader;
private byte lastStreamId;
private final GameServer gameServer;
private Long playerId;
private final RTMPSession rtmpSessionInfo;
public ConnectedProcessor(InputStream inputStream, OutputStream outputStream, Socket sock, GameServer gameServer) {
super(inputStream, outputStream, sock);
this.state = ConnectedState.WAIT_FOR_MESSAGE;
this.message = new ArrayList<>();
this.gameServer = gameServer;
this.previousHeader = new byte[11];
this.startTimestamp = System.currentTimeMillis();
this.rtmpSessionInfo = new RTMPSession();
}
@Override
@ -45,43 +39,87 @@ public class ConnectedProcessor extends ConnectionProcessor {
@Override
public ConnectionState process() throws IOException {
int b = this.getInputStream().read();
byte[] b1 = new byte[1]; // this was left as is due to underlying conversion between int and byte types.
int b = this.getInputStream().read(b1);
if (b == -1) {
close(); return ConnectionState.CONNECTED;
}
if( this.state == ConnectedState.WAIT_FOR_MESSAGE && b != 0x03 && b != 0x43 && b != 0x83 && b != 0x02 && b != 0xC3 ) {
log.info("process() WAIT_FOR_MESSAGE not 03,43,83");
return ConnectionState.CONNECTED;
}
this.state = switch(state) {
case WAIT_FOR_MESSAGE -> processWaitForMessage(parseChunkHeader(b));
case LISTENS_FOR_MESSAGE -> processListensForMessage((byte) b);
case WAIT_FOR_MESSAGE -> processWaitForMessage(b1[0]);
case LISTENS_FOR_MESSAGE -> processListensForMessage(b1[0]);
};
return ConnectionState.CONNECTED;
}
private ConnectedState processWaitForMessage(Pair<ChunkHeaderType, Integer> packetHeader) {
private ConnectedState processWaitForMessage(byte b) {
try {
byte[] header = switch (packetHeader.getValue0()) {
case Full -> new byte[11];
case WithoutStreamId -> new byte[7];
case OnlyTimestamp -> new byte[3];
default -> new byte[0];
};
if(this.getInputStream().read(header) == -1) return ConnectedState.WAIT_FOR_MESSAGE;
if(header.length == 11) this.lastStreamId = BinaryMessageSerializer.deserializeInteger(Arrays.copyOfRange(header, 7, 11));
this.message.clear();
this.expectedMessageLength = BinaryMessageSerializer.deserializeInteger(
Arrays.copyOfRange( header.length >= 7 ? header : this.previousHeader, 3, 6)
);
if( header.length >= 7 ) this.previousHeader = header;
if( header.length == 11 && packetHeader.getValue1() == 2 ) {
byte[] b = new byte[4];
if( this.getInputStream().read(b) == -1 ) return ConnectedState.WAIT_FOR_MESSAGE;
this.getOutputStream().write(new byte[] {0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x03, 0x00, 0x00, 0x00, 0x00, b[0], b[1], b[2], b[3]});
this.getOutputStream().flush();
RTMPHeader header = new RTMPHeader();
header.setChunkType(RTMPHeader.ChunkType.fromByte((byte) ((0xC0 & b) >> 6)));
header.setChunkStreamId((byte) (b & 0x3F));
if( header.getChunkStreamId() != ChunkStreamInfo.CONTROL_CHANNEL &&
header.getChunkStreamId() != ChunkStreamInfo.RTMP_COMMAND_CHANNEL &&
header.getChunkStreamId() != ChunkStreamInfo.RTMP_STREAM_CHANNEL ) {
return ConnectedState.WAIT_FOR_MESSAGE;
}
switch (header.getChunkType()) {
case TYPE_0_FULL -> { // b00 = 12 byte header (full header)
// Read bytes 1-3: Absolute timestamp
header.setAbsoluteTimestamp(RTMPUtils.readUnsignedInt24(this.getInputStream()));
header.setTimestampDelta(0);
// Read bytes 4-6: Packet length
header.setPacketLength(RTMPUtils.readUnsignedInt24(this.getInputStream()));
// Read byte 7: Message type ID
byte[] b1 = new byte[1];
this.getInputStream().read(b1);
header.setMessageType(RTMPHeader.MessageType.valueOf(b1[0]));
// Read bytes 8-11: Message stream ID (apparently little-endian order)
byte[] messageStreamIdBytes = new byte[4];
this.getInputStream().read(messageStreamIdBytes);
header.setMessageStreamId(RTMPUtils.toUnsignedInt32LittleEndian(messageStreamIdBytes));
}
case TYPE_1_RELATIVE_LARGE -> { // b01 = 8 bytes - like type 0. not including message stream ID (4 last bytes)
// Read bytes 1-3: Timestamp delta
header.setTimestampDelta(RTMPUtils.readUnsignedInt24(this.getInputStream()));
// Read bytes 4-6: Packet length
header.setPacketLength(RTMPUtils.readUnsignedInt24(this.getInputStream()));
// Read byte 7: Message type ID
byte[] b1 = new byte[1];
this.getInputStream().read(b1);
header.setMessageType(RTMPHeader.MessageType.valueOf(b1[0]));
RTMPHeader prevHeader = rtmpSessionInfo.getLastHeader(header.getChunkStreamId());
try {
header.setMessageStreamId(prevHeader.getMessageStreamId());
header.setAbsoluteTimestamp(prevHeader.getAbsoluteTimestamp() + header.getTimestampDelta());
} catch (NullPointerException ex) {
header.setMessageStreamId(0);
header.setAbsoluteTimestamp(header.getTimestampDelta());
}
}
case TYPE_2_RELATIVE_TIMESTAMP_ONLY -> { // b10 = 4 bytes - Basic Header and timestamp (3 bytes) are included
// Read bytes 1-3: Timestamp delta
header.setTimestampDelta(RTMPUtils.readUnsignedInt24(this.getInputStream()));
RTMPHeader prevHeader = rtmpSessionInfo.getLastHeader(header.getChunkStreamId());
header.setPacketLength(prevHeader.getPacketLength());
header.setMessageType(prevHeader.getMessageType());
header.setMessageStreamId(prevHeader.getMessageStreamId());
header.setAbsoluteTimestamp(prevHeader.getAbsoluteTimestamp() + header.getTimestampDelta());
}
case TYPE_3_RELATIVE_SINGLE_BYTE -> { // b11 = 1 byte: basic header only
RTMPHeader prevHeader = rtmpSessionInfo.getLastHeader(header.getChunkStreamId());
header.setTimestampDelta(prevHeader.getTimestampDelta());
header.setAbsoluteTimestamp(prevHeader.getAbsoluteTimestamp() + header.getTimestampDelta());
header.setPacketLength(prevHeader.getPacketLength());
header.setMessageType(prevHeader.getMessageType());
header.setMessageStreamId(prevHeader.getMessageStreamId());
}
}
this.message.clear();
this.lastStreamId = header.getChunkStreamId();
this.rtmpSessionInfo.chunkStreams.put(this.lastStreamId, header);
return ConnectedState.LISTENS_FOR_MESSAGE;
} catch (IOException e) {
log.error("ConnectedProcessor.processWaitForMessage()", e);
@ -90,13 +128,13 @@ public class ConnectedProcessor extends ConnectionProcessor {
}
private ConnectedState processListensForMessage(byte b) {
if(this.message.size() >= this.expectedMessageLength) {
if(this.message.size() >= this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getPacketLength()) {
this.processMessage();
return ConnectedState.WAIT_FOR_MESSAGE;
}
if( b == (byte) 0xC3 ) { this.expectedMessageLength--; return ConnectedState.LISTENS_FOR_MESSAGE; }
if( b == (byte) 0xC3 ) { this.rtmpSessionInfo.getLastHeader(this.lastStreamId).setPacketLength(this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getPacketLength()-1); return ConnectedState.LISTENS_FOR_MESSAGE; }
this.message.add(b);
if(this.message.size() >= this.expectedMessageLength) {
if(this.message.size() >= this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getPacketLength()) {
this.processMessage();
return ConnectedState.WAIT_FOR_MESSAGE;
}
@ -104,6 +142,24 @@ public class ConnectedProcessor extends ConnectionProcessor {
}
private void processMessage() {
switch (this.lastStreamId) {
case ChunkStreamInfo.CONTROL_CHANNEL -> processControlChannel();
case ChunkStreamInfo.RTMP_COMMAND_CHANNEL -> processCommandChannel();
case ChunkStreamInfo.RTMP_STREAM_CHANNEL -> processStreamChannel();
default -> log.warn("Unknown channel id: {}", this.lastStreamId);
}
}
private void processControlChannel() {
byte[] bytes = SerializerUtils.bytesToPrimitive(message);
log.info("ConnectedProcessor.processMessage() controlMessage: {}", StringUtils.bytesToHex(bytes));
}
private void processCommandChannel() {
if( this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getMessageType() != RTMPHeader.MessageType.COMMAND_AMF0 ) {
log.info("ConnectedProcessor.processMessage() type: {}", this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getMessageType());
return;
}
byte[] bytes = SerializerUtils.bytesToPrimitive(message);
log.info("ConnectedProcessor.processMessage() messageString: {}", StringUtils.bytesToHex(bytes));
try {
@ -113,6 +169,7 @@ public class ConnectedProcessor extends ConnectionProcessor {
mr.get(0).getValue().equals("connect") ) {
this.playerId = Long.parseLong((String) mr.get(3).getValue());
this.gameServer.onConnect(this.playerId, (String) mr.get(5).getValue(), this);
this.getOutputStream().write( StringUtils.hexStringToByteArray("020000000000040500000000002625A0020000000000050600000000002625A00202000000000004010000000000001000030000000000F214000000000200075F726573756C74003FF0000000000000030006666D7356657202000E464D532F342C302C302C31313231000C6361706162696C697469657300406FE0000000000000046D6F6465003FF00000000000000000090300056C6576656C0200067374617475730004636F646502001D4E6574436F6E6E656374696F6E2E436F6E6E6563742E53756363657373000B6465736372697074696F6E020015436F6E6E656374696F6E207375636365656465642E000E6F626A656374456E636F64696E670000000000000000000004646174610800000000000776657273696F6E02000A342C302C302C31313231000009000009") );
this.getOutputStream().flush();
return;
@ -123,9 +180,8 @@ public class ConnectedProcessor extends ConnectionProcessor {
}
}
// returns chunk type and stream id
private Pair<ChunkHeaderType, Integer> parseChunkHeader(int b) {
return Pair.with(ChunkHeaderType.fromInt((b & 0xC0) >> 6), b & 0x3F);
private void processStreamChannel() {
log.warn("There's stream message.");
}
@Override
@ -146,7 +202,7 @@ public class ConnectedProcessor extends ConnectionProcessor {
cnt++;
}
}
l.addAll(int2bytes((int) ((System.currentTimeMillis() - this.startTimestamp) / 1000L), 3));
l.addAll(int2bytes((int) ((System.currentTimeMillis() - this.rtmpSessionInfo.getLastHeader(this.lastStreamId).getAbsoluteTimestamp()) / 1000L), 3));
l.addAll(int2bytes(data.size() - cnt, 3));
l.add((byte) 0x14);
l.addAll(int2bytes(this.lastStreamId, 4));
@ -168,52 +224,27 @@ public class ConnectedProcessor extends ConnectionProcessor {
super.close();
}
@AllArgsConstructor
private enum ChunkHeaderType {
Full(0),
WithoutStreamId(1),
OnlyTimestamp(2),
UsePrevious(3),
Unknown(4);
private final int type;
private static ChunkHeaderType fromInt(int b) {
for( ChunkHeaderType c : values() ) {
if( c.type == b ) return c;
}
return Unknown;
}
}
private class RTMPSession {
private int windowBytesRead;
@Getter
private int acknowledgementWindowSize = Integer.MAX_VALUE;
private int totalBytesRead = 0;
@Getter
@Setter
private int chunkSize = 128;
private Map<Byte, ChunkStreamInfo> chunkStreams = new HashMap<>();
private Map<Byte, RTMPHeader> chunkStreams = new HashMap<>();
public ChunkStreamInfo getChunkStreamInfo(byte chunkStreamId) {
ChunkStreamInfo chunkStreamInfo = chunkStreams.get(chunkStreamId);
public RTMPHeader getLastHeader(byte chunkStreamId) {
RTMPHeader chunkStreamInfo = chunkStreams.get(chunkStreamId);
if (chunkStreamInfo == null) {
chunkStreamInfo = new ChunkStreamInfo();
chunkStreamInfo = new RTMPHeader();
chunkStreams.put(chunkStreamId, chunkStreamInfo);
}
return chunkStreamInfo;
}
}
private class ChunkStreamInfo {
private static final byte RTMP_STREAM_CHANNEL = 0x08;
private static final byte RTMP_COMMAND_CHANNEL = 0x03;
private static final byte CONTROL_CHANNEL = 0x02;
private interface ChunkStreamInfo {
byte RTMP_STREAM_CHANNEL = 0x08;
byte RTMP_COMMAND_CHANNEL = 0x03;
byte CONTROL_CHANNEL = 0x02;
}
}

View File

@ -0,0 +1,182 @@
package com.alterdekim.game.component.rtmp;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import java.io.InputStream;
@ToString
@Getter
@Setter
@NoArgsConstructor
public class RTMPHeader {
private ChunkType chunkType;
private byte chunkStreamId;
private int absoluteTimestamp;
private int timestampDelta = -1;
private int packetLength;
private MessageType messageType;
private int messageStreamId;
@Getter
@RequiredArgsConstructor
public enum ChunkType {
/**
* Full 12-byte RTMP chunk header
*/
TYPE_0_FULL((byte) 0x00, 12),
/**
* Relative 8-byte RTMP chunk header (message stream ID is not included)
*/
TYPE_1_RELATIVE_LARGE((byte) 0x01, 8),
/**
* Relative 4-byte RTMP chunk header (only timestamp delta)
*/
TYPE_2_RELATIVE_TIMESTAMP_ONLY((byte) 0x02, 4),
/**
* Relative 1-byte RTMP chunk header (no "real" header, just the 1-byte indicating chunk header type & chunk stream ID)
*/
TYPE_3_RELATIVE_SINGLE_BYTE((byte) 0x03, 1);
/**
* The byte value of this chunk header type
*/
private final byte value;
/**
* The full size (in bytes) of this RTMP header (including the basic header byte)
*/
private final int size;
public static ChunkType fromByte(byte b) {
for( ChunkType c : values() ) {
if( c.value == b ) return c;
}
return TYPE_3_RELATIVE_SINGLE_BYTE;
}
}
@Slf4j
@Getter
@RequiredArgsConstructor
public enum MessageType {
/**
* Protocol control message 1
* Set Chunk Size, is used to notify the peer a new maximum chunk size to use.
*/
SET_CHUNK_SIZE((byte) 0x01),
/**
* Protocol control message 2
* Abort Message, is used to notify the peer if it is waiting for chunks
* to complete a message, then to discard the partially received message
* over a chunk stream and abort processing of that message.
*/
ABORT((byte) 0x02),
/**
* Protocol control message 3
* The client or the server sends the acknowledgment to the peer after
* receiving bytes equal to the window size. The window size is the
* maximum number of bytes that the sender sends without receiving
* acknowledgment from the receiver.
*/
ACKNOWLEDGEMENT((byte) 0x03),
/**
* Protocol control message 4
* The client or the server sends this message to notify the peer about
* the user control events. This message carries Event type and Event
* data.
* Also known as a PING message in some RTMP implementations.
*/
USER_CONTROL_MESSAGE((byte) 0x04),
/**
* Protocol control message 5
* The client or the server sends this message to inform the peer which
* window size to use when sending acknowledgment.
* Also known as ServerBW ("server bandwidth") in some RTMP implementations.
*/
WINDOW_ACKNOWLEDGEMENT_SIZE((byte) 0x05),
/**
* Protocol control message 6
* The client or the server sends this message to update the output
* bandwidth of the peer. The output bandwidth value is the same as the
* window size for the peer.
* Also known as ClientBW ("client bandwidth") in some RTMP implementations.
*/
SET_PEER_BANDWIDTH((byte) 0x06),
/**
* RTMP audio packet (0x08)
* The client or the server sends this message to send audio data to the peer.
*/
AUDIO((byte) 0x08),
/**
* RTMP video packet (0x09)
* The client or the server sends this message to send video data to the peer.
*/
VIDEO((byte) 0x09),
/**
* RTMP message type 0x0F
* The client or the server sends this message to send Metadata or any
* user data to the peer. Metadata includes details about the data (audio, video etc.)
* like creation time, duration, theme and so on.
* This is the AMF3-encoded version.
*/
DATA_AMF3((byte) 0x0F),
/**
* RTMP message type 0x10
* A shared object is a Flash object (a collection of name value pairs)
* that are in synchronization across multiple clients, instances, and
* so on.
* This is the AMF3 version: kMsgContainerEx=16 for AMF3.
*/
SHARED_OBJECT_AMF3((byte) 0x10),
/**
* RTMP message type 0x11
* Command messages carry the AMF-encoded commands between the client
* and the server.
* A command message consists of command name, transaction ID, and command object that
* contains related parameters.
* This is the AMF3-encoded version.
*/
COMMAND_AMF3((byte) 0x11),
/**
* RTMP message type 0x12
* The client or the server sends this message to send Metadata or any
* user data to the peer. Metadata includes details about the data (audio, video etc.)
* like creation time, duration, theme and so on.
* This is the AMF0-encoded version.
*/
DATA_AMF0((byte) 0x12),
/**
* RTMP message type 0x14
* Command messages carry the AMF-encoded commands between the client
* and the server.
* A command message consists of command name, transaction ID, and command object that
* contains related parameters.
* This is the common AMF0 version, also known as INVOKE in some RTMP implementations.
*/
COMMAND_AMF0((byte) 0x14),
/**
* RTMP message type 0x13
* A shared object is a Flash object (a collection of name value pairs)
* that are in synchronization across multiple clients, instances, and
* so on.
* This is the AMF0 version: kMsgContainer=19 for AMF0.
*/
SHARED_OBJECT_AMF0((byte) 0x13),
/**
* RTMP message type 0x16
* An aggregate message is a single message that contains a list of sub-messages.
*/
AGGREGATE_MESSAGE((byte) 0x16);
private final byte value;
public static MessageType valueOf(byte b) {
for( MessageType c : values() ) {
if( c.value == b ) return c;
}
log.warn("gosh im tired: {}", b);
return MessageType.ABORT;
}
}
}

View File

@ -0,0 +1,23 @@
package com.alterdekim.game.utils;
import java.io.IOException;
import java.io.InputStream;
public class RTMPUtils {
public static int readUnsignedInt32(InputStream in) throws IOException {
return ((in.read() & 0xff) << 24) | ((in.read() & 0xff) << 16) | ((in.read() & 0xff) << 8) | (in.read() & 0xff);
}
public static int readUnsignedInt24(InputStream in) throws IOException {
byte[] b = new byte[3]; in.read(b);
return ((b[0] & 0xff) << 16) | ((b[1] & 0xff) << 8) | (b[2] & 0xff);
}
public static int readUnsignedInt16(InputStream in) throws IOException {
return ((in.read() & 0xff) << 8) | (in.read() & 0xff);
}
public static int toUnsignedInt32LittleEndian(byte[] bytes) {
return ((bytes[3] & 0xff) << 24) | ((bytes[2] & 0xff) << 16) | ((bytes[1] & 0xff) << 8) | (bytes[0] & 0xff);
}
}