Implemented stream packets

This commit is contained in:
UnlegitDqrk
2026-02-14 15:36:29 +01:00
parent 62ccedb2ed
commit 3b5f4f62f8
2 changed files with 43 additions and 41 deletions

View File

@@ -11,10 +11,9 @@ import org.openautonomousconnection.protocol.packets.v1_0_0.beta.web.stream.WebS
import org.openautonomousconnection.protocol.packets.v1_0_0.beta.web.stream.WebStreamStartPacket; import org.openautonomousconnection.protocol.packets.v1_0_0.beta.web.stream.WebStreamStartPacket;
import org.openautonomousconnection.protocol.side.client.ProtocolClient; import org.openautonomousconnection.protocol.side.client.ProtocolClient;
import org.openautonomousconnection.protocol.side.client.events.ConnectedToProtocolServerEvent; import org.openautonomousconnection.protocol.side.client.events.ConnectedToProtocolServerEvent;
import org.openautonomousconnection.protocol.versions.v1_0_0.beta.INSResponseStatus;
import org.openautonomousconnection.protocol.versions.v1_0_0.beta.INSRecord; import org.openautonomousconnection.protocol.versions.v1_0_0.beta.INSRecord;
import org.openautonomousconnection.protocol.versions.v1_0_0.beta.INSResponseStatus;
import javax.swing.*;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;

View File

@@ -10,7 +10,10 @@ import org.openautonomousconnection.protocol.versions.v1_0_0.beta.WebRequestMeth
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.net.URL; import java.net.URL;
import java.util.*; import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -80,6 +83,33 @@ public final class OacWebRequestBroker {
return p; return p;
} }
/**
* Assembles the response body from received chunks in ascending seq order.
*
* <p>Best-effort UDP behavior: gaps are ignored.</p>
*/
private static void assembleBestEffortLocked(ResponseState st) {
st.body.reset();
if (st.chunkBuffer.isEmpty() || st.maxSeqSeen < 0) {
return;
}
for (int seq = 0; seq <= st.maxSeqSeen; seq++) {
byte[] chunk = st.chunkBuffer.get(seq);
if (chunk == null) continue; // gap accepted
st.body.write(chunk, 0, chunk.length);
}
}
private static void sleepSilently(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/** /**
* Attaches the client used to send INS/Web packets. * Attaches the client used to send INS/Web packets.
* *
@@ -195,10 +225,10 @@ public final class OacWebRequestBroker {
/** /**
* Handles a non-streamed WebResponsePacket. * Handles a non-streamed WebResponsePacket.
* *
* @param statusCode status code * @param statusCode status code
* @param contentType content-type * @param contentType content-type
* @param headers headers * @param headers headers
* @param body body * @param body body
*/ */
public void onWebResponse(int statusCode, String contentType, Map<String, String> headers, byte[] body) { public void onWebResponse(int statusCode, String contentType, Map<String, String> headers, byte[] body) {
ResponseState st = responseState; ResponseState st = responseState;
@@ -224,9 +254,9 @@ public final class OacWebRequestBroker {
/** /**
* Handles the beginning of a streamed response. * Handles the beginning of a streamed response.
* *
* @param statusCode status code * @param statusCode status code
* @param contentType content-type * @param contentType content-type
* @param headers headers * @param headers headers
* @param totalLength total length (may be -1) * @param totalLength total length (may be -1)
*/ */
public void onStreamStart(int statusCode, String contentType, Map<String, String> headers, long totalLength) { public void onStreamStart(int statusCode, String contentType, Map<String, String> headers, long totalLength) {
@@ -256,7 +286,7 @@ public final class OacWebRequestBroker {
* *
* <p>UDP best-effort: store by seq and assemble later; accept gaps.</p> * <p>UDP best-effort: store by seq and assemble later; accept gaps.</p>
* *
* @param seq chunk sequence number * @param seq chunk sequence number
* @param data chunk bytes * @param data chunk bytes
*/ */
public void onStreamChunk(int seq, byte[] data) { public void onStreamChunk(int seq, byte[] data) {
@@ -308,7 +338,7 @@ public final class OacWebRequestBroker {
* Waits for the current response (single-flight). * Waits for the current response (single-flight).
* *
* @param timeout timeout * @param timeout timeout
* @param unit unit * @param unit unit
* @return response * @return response
*/ */
public Response awaitResponse(long timeout, TimeUnit unit) { public Response awaitResponse(long timeout, TimeUnit unit) {
@@ -321,7 +351,7 @@ public final class OacWebRequestBroker {
long deadlineNanos = System.nanoTime() + unit.toNanos(timeout); long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
for (;;) { for (; ; ) {
synchronized (responseLock) { synchronized (responseLock) {
if (st.completed) { if (st.completed) {
break; break;
@@ -453,25 +483,6 @@ public final class OacWebRequestBroker {
} }
} }
/**
* Assembles the response body from received chunks in ascending seq order.
*
* <p>Best-effort UDP behavior: gaps are ignored.</p>
*/
private static void assembleBestEffortLocked(ResponseState st) {
st.body.reset();
if (st.chunkBuffer.isEmpty() || st.maxSeqSeen < 0) {
return;
}
for (int seq = 0; seq <= st.maxSeqSeen; seq++) {
byte[] chunk = st.chunkBuffer.get(seq);
if (chunk == null) continue; // gap accepted
st.body.write(chunk, 0, chunk.length);
}
}
private void failLocked(ResponseState st, String message) { private void failLocked(ResponseState st, String message) {
st.completed = true; st.completed = true;
st.success = false; st.success = false;
@@ -479,21 +490,13 @@ public final class OacWebRequestBroker {
st.done.countDown(); st.done.countDown();
} }
private static void sleepSilently(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/** /**
* Response DTO. * Response DTO.
* *
* @param statusCode status code * @param statusCode status code
* @param contentType content type * @param contentType content type
* @param headers headers * @param headers headers
* @param body body bytes * @param body body bytes
*/ */
public record Response(int statusCode, String contentType, Map<String, String> headers, byte[] body) { public record Response(int statusCode, String contentType, Map<String, String> headers, byte[] body) {
} }