Implemented stream packets
This commit is contained in:
2
pom.xml
2
pom.xml
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>org.openautonomousconnection</groupId>
|
<groupId>org.openautonomousconnection</groupId>
|
||||||
<artifactId>InfoNameLib</artifactId>
|
<artifactId>InfoNameLib</artifactId>
|
||||||
<version>1.0.0-BETA.1.0</version>
|
<version>1.0.0-BETA.1.1</version>
|
||||||
<organization>
|
<organization>
|
||||||
<name>Open Autonomous Connection</name>
|
<name>Open Autonomous Connection</name>
|
||||||
<url>https://open-autonomous-connection.org/</url>
|
<url>https://open-autonomous-connection.org/</url>
|
||||||
|
|||||||
@@ -0,0 +1,7 @@
|
|||||||
|
package org.openautonomousconnection.infonamelib;
|
||||||
|
|
||||||
|
public abstract class LibClientImpl {
|
||||||
|
|
||||||
|
public abstract void serverConnectionFailed(Exception exception);
|
||||||
|
|
||||||
|
}
|
||||||
@@ -126,7 +126,6 @@ public final class OacWebHttpURLConnection extends HttpURLConnection {
|
|||||||
// Snapshot headers/body at send time.
|
// Snapshot headers/body at send time.
|
||||||
Map<String, String> carryHeaders = new LinkedHashMap<>(requestHeaders);
|
Map<String, String> carryHeaders = new LinkedHashMap<>(requestHeaders);
|
||||||
|
|
||||||
// ---- SESSION HEADER INJECTION (the core fix for your "header resets") ----
|
|
||||||
// Each navigation creates a new connection, so we re-add the session for every request.
|
// Each navigation creates a new connection, so we re-add the session for every request.
|
||||||
String session = sessionJar.get();
|
String session = sessionJar.get();
|
||||||
if (session != null && !session.isBlank() && headerValue(carryHeaders, "session") == null) {
|
if (session != null && !session.isBlank() && headerValue(carryHeaders, "session") == null) {
|
||||||
|
|||||||
@@ -11,9 +11,10 @@ import org.openautonomousconnection.protocol.packets.v1_0_0.beta.web.stream.WebS
|
|||||||
import org.openautonomousconnection.protocol.packets.v1_0_0.beta.web.stream.WebStreamStartPacket;
|
import org.openautonomousconnection.protocol.packets.v1_0_0.beta.web.stream.WebStreamStartPacket;
|
||||||
import org.openautonomousconnection.protocol.side.client.ProtocolClient;
|
import org.openautonomousconnection.protocol.side.client.ProtocolClient;
|
||||||
import org.openautonomousconnection.protocol.side.client.events.ConnectedToProtocolServerEvent;
|
import org.openautonomousconnection.protocol.side.client.events.ConnectedToProtocolServerEvent;
|
||||||
import org.openautonomousconnection.protocol.versions.v1_0_0.beta.INSRecord;
|
|
||||||
import org.openautonomousconnection.protocol.versions.v1_0_0.beta.INSResponseStatus;
|
import org.openautonomousconnection.protocol.versions.v1_0_0.beta.INSResponseStatus;
|
||||||
|
import org.openautonomousconnection.protocol.versions.v1_0_0.beta.INSRecord;
|
||||||
|
|
||||||
|
import javax.swing.*;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
@@ -30,100 +31,115 @@ public final class OacWebPacketListener extends EventListener {
|
|||||||
|
|
||||||
private final OacWebRequestBroker broker;
|
private final OacWebRequestBroker broker;
|
||||||
private final ProtocolClient client;
|
private final ProtocolClient client;
|
||||||
|
private final LibClientImpl impl;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new listener bound to the given broker.
|
* Creates a new listener bound to the given broker.
|
||||||
*
|
*
|
||||||
* @param broker broker instance
|
* @param broker broker instance
|
||||||
|
* @param client protocol client
|
||||||
*/
|
*/
|
||||||
public OacWebPacketListener(OacWebRequestBroker broker, ProtocolClient client) {
|
public OacWebPacketListener(OacWebRequestBroker broker, ProtocolClient client, LibClientImpl impl) {
|
||||||
this.broker = Objects.requireNonNull(broker, "broker");
|
this.broker = Objects.requireNonNull(broker, "broker");
|
||||||
this.client = Objects.requireNonNull(client, "client");
|
this.client = Objects.requireNonNull(client, "client");
|
||||||
|
this.impl = Objects.requireNonNull(impl, "impl");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notifies the broker that the server connection is established.
|
||||||
|
*
|
||||||
|
* @param event connected event
|
||||||
|
*/
|
||||||
@Listener(priority = EventPriority.HIGHEST)
|
@Listener(priority = EventPriority.HIGHEST)
|
||||||
public void onConnected(ConnectedToProtocolServerEvent event) {
|
public void onConnected(ConnectedToProtocolServerEvent event) {
|
||||||
OacWebRequestBroker.get().notifyServerConnected();
|
broker.notifyServerConnected();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles packets coming from INS and the web server side.
|
||||||
|
*
|
||||||
|
* @param event packet event
|
||||||
|
*/
|
||||||
@Listener(priority = EventPriority.HIGHEST)
|
@Listener(priority = EventPriority.HIGHEST)
|
||||||
public void onPacketRead(C_PacketReadEvent event) {
|
public void onPacketRead(C_PacketReadEvent event) {
|
||||||
Object p = event.getPacket();
|
Object p = event.getPacket();
|
||||||
|
|
||||||
if (p instanceof INSResponsePacket resp) {
|
if (p instanceof INSResponsePacket resp) {
|
||||||
INSResponseStatus status = resp.getStatus();
|
onInsResponse(resp);
|
||||||
List<INSRecord> records = resp.getRecords();
|
|
||||||
|
|
||||||
if (status != INSResponseStatus.OK) {
|
|
||||||
broker.invalidateCurrentInfoName();
|
|
||||||
throw new IllegalStateException("INS resolution failed: " + status);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (records == null || records.isEmpty() || records.getFirst() == null || records.getFirst().value == null) {
|
|
||||||
broker.invalidateCurrentInfoName();
|
|
||||||
throw new IllegalStateException("INS resolution returned no usable records");
|
|
||||||
}
|
|
||||||
|
|
||||||
String host = records.getFirst().value.trim();
|
|
||||||
if (host.isEmpty()) {
|
|
||||||
broker.invalidateCurrentInfoName();
|
|
||||||
throw new IllegalStateException("INS record value is empty");
|
|
||||||
}
|
|
||||||
|
|
||||||
String hostname;
|
|
||||||
int port;
|
|
||||||
|
|
||||||
if (!host.contains(":")) {
|
|
||||||
hostname = host;
|
|
||||||
|
|
||||||
if (records.getFirst().port == 0) port = 1028;
|
|
||||||
else port = records.getFirst().port;
|
|
||||||
} else {
|
|
||||||
String[] split = host.split(":", 2);
|
|
||||||
hostname = split[0].trim();
|
|
||||||
String p1 = split[1].trim();
|
|
||||||
if (hostname.isEmpty() || p1.isEmpty()) {
|
|
||||||
broker.invalidateCurrentInfoName();
|
|
||||||
throw new IllegalStateException("Invalid INS host:port value: " + host);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
port = Integer.parseInt(p1);
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
broker.invalidateCurrentInfoName();
|
|
||||||
throw new IllegalStateException("Invalid port in INS record: " + host, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread t = new Thread(() -> connectServer(hostname, port), "oac-web-server-connect");
|
|
||||||
t.setDaemon(true);
|
|
||||||
t.start();
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p instanceof WebResponsePacket resp) {
|
if (p instanceof WebResponsePacket resp) {
|
||||||
onWebResponse(resp);
|
broker.onWebResponse(resp.getStatusCode(), resp.getContentType(), resp.getHeaders(), resp.getBody());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p instanceof WebStreamStartPacket start) {
|
if (p instanceof WebStreamStartPacket start) {
|
||||||
onStreamStart(start);
|
broker.onStreamStart(start.getStatusCode(), start.getContentType(), start.getHeaders(), start.getTotalLength());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p instanceof WebStreamChunkPacket chunk) {
|
if (p instanceof WebStreamChunkPacket chunk) {
|
||||||
onStreamChunk(chunk);
|
broker.onStreamChunk(chunk.getSeq(), chunk.getData());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p instanceof WebStreamEndPacket end) {
|
if (p instanceof WebStreamEndPacket end) {
|
||||||
onStreamEnd(end);
|
broker.onStreamEnd(end.isOk());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void onInsResponse(INSResponsePacket resp) {
|
||||||
|
INSResponseStatus status = resp.getStatus();
|
||||||
|
List<INSRecord> records = resp.getRecords();
|
||||||
|
|
||||||
|
if (status != INSResponseStatus.OK) {
|
||||||
|
broker.invalidateCurrentInfoName();
|
||||||
|
throw new IllegalStateException("INS resolution failed: " + status);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (records == null || records.isEmpty() || records.getFirst() == null || records.getFirst().value == null) {
|
||||||
|
broker.invalidateCurrentInfoName();
|
||||||
|
throw new IllegalStateException("INS resolution returned no usable records");
|
||||||
|
}
|
||||||
|
|
||||||
|
String host = records.getFirst().value.trim();
|
||||||
|
if (host.isEmpty()) {
|
||||||
|
broker.invalidateCurrentInfoName();
|
||||||
|
throw new IllegalStateException("INS record value is empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
String hostname;
|
||||||
|
int port;
|
||||||
|
|
||||||
|
if (!host.contains(":")) {
|
||||||
|
hostname = host;
|
||||||
|
|
||||||
|
if (records.getFirst().port == 0) port = 1028;
|
||||||
|
else port = records.getFirst().port;
|
||||||
|
} else {
|
||||||
|
String[] split = host.split(":", 2);
|
||||||
|
hostname = split[0].trim();
|
||||||
|
String p1 = split[1].trim();
|
||||||
|
if (hostname.isEmpty() || p1.isEmpty()) {
|
||||||
|
broker.invalidateCurrentInfoName();
|
||||||
|
throw new IllegalStateException("Invalid INS host:port value: " + host);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
port = Integer.parseInt(p1);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
broker.invalidateCurrentInfoName();
|
||||||
|
throw new IllegalStateException("Invalid port in INS record: " + host, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread t = new Thread(() -> connectServer(hostname, port), "oac-web-server-connect");
|
||||||
|
t.setDaemon(true);
|
||||||
|
t.start();
|
||||||
|
}
|
||||||
|
|
||||||
private void connectServer(String hostname, int port) {
|
private void connectServer(String hostname, int port) {
|
||||||
try {
|
try {
|
||||||
// Ensure the server connection object exists
|
|
||||||
client.buildServerConnection(null, client.getProtocolBridge().getProtocolValues().ssl);
|
client.buildServerConnection(null, client.getProtocolBridge().getProtocolValues().ssl);
|
||||||
|
|
||||||
if (client.getClientServerConnection() != null && client.getClientServerConnection().isConnected()) {
|
if (client.getClientServerConnection() != null && client.getClientServerConnection().isConnected()) {
|
||||||
@@ -133,43 +149,7 @@ public final class OacWebPacketListener extends EventListener {
|
|||||||
client.getClientServerConnection().connect(hostname, port);
|
client.getClientServerConnection().connect(hostname, port);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
broker.invalidateCurrentInfoName();
|
broker.invalidateCurrentInfoName();
|
||||||
e.printStackTrace();
|
impl.serverConnectionFailed(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Handles a non-streamed WebResponsePacket.
|
|
||||||
*
|
|
||||||
* @param resp response packet
|
|
||||||
*/
|
|
||||||
private void onWebResponse(WebResponsePacket resp) {
|
|
||||||
broker.onWebResponse(resp.getStatusCode(), resp.getContentType(), resp.getHeaders(), resp.getBody());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handles the beginning of a streamed response.
|
|
||||||
*
|
|
||||||
* @param start stream start packet
|
|
||||||
*/
|
|
||||||
private void onStreamStart(WebStreamStartPacket start) {
|
|
||||||
broker.onStreamStart(start.getStatusCode(), start.getContentType(), start.getHeaders(), start.getTotalLength());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handles a chunk of a streamed response.
|
|
||||||
*
|
|
||||||
* @param chunk chunk packet
|
|
||||||
*/
|
|
||||||
private void onStreamChunk(WebStreamChunkPacket chunk) {
|
|
||||||
broker.onStreamChunk(chunk.getSeq(), chunk.getData());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handles stream end.
|
|
||||||
*
|
|
||||||
* @param end stream end packet
|
|
||||||
*/
|
|
||||||
private void onStreamEnd(WebStreamEndPacket end) {
|
|
||||||
broker.onStreamEnd(end.isOk());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
@@ -10,10 +10,7 @@ import org.openautonomousconnection.protocol.versions.v1_0_0.beta.WebRequestMeth
|
|||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.Arrays;
|
import java.util.*;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@@ -21,6 +18,13 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* Central broker that translates {@code web://} URLs into OAC protocol traffic.
|
* Central broker that translates {@code web://} URLs into OAC protocol traffic.
|
||||||
*
|
*
|
||||||
* <p>Protocol limitation: no correlation id -> single-flight (one in-flight request at a time).</p>
|
* <p>Protocol limitation: no correlation id -> single-flight (one in-flight request at a time).</p>
|
||||||
|
*
|
||||||
|
* <p>UDP streaming semantics (best-effort):</p>
|
||||||
|
* <ul>
|
||||||
|
* <li>Chunks may arrive out of order or be lost.</li>
|
||||||
|
* <li>We accept gaps and assemble what we have after {@code WebStreamEndPacket}.</li>
|
||||||
|
* <li>We wait a short grace window after stream end to allow late UDP packets.</li>
|
||||||
|
* </ul>
|
||||||
*/
|
*/
|
||||||
public final class OacWebRequestBroker {
|
public final class OacWebRequestBroker {
|
||||||
|
|
||||||
@@ -29,6 +33,11 @@ public final class OacWebRequestBroker {
|
|||||||
private static final long CONNECT_TIMEOUT_SECONDS = 10;
|
private static final long CONNECT_TIMEOUT_SECONDS = 10;
|
||||||
private static final long RESPONSE_TIMEOUT_SECONDS = 25;
|
private static final long RESPONSE_TIMEOUT_SECONDS = 25;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Grace time after receiving WebStreamEndPacket to allow late UDP packets (reordering).
|
||||||
|
*/
|
||||||
|
private static final long UDP_END_GRACE_MILLIS = 150;
|
||||||
|
|
||||||
private final Object responseLock = new Object();
|
private final Object responseLock = new Object();
|
||||||
|
|
||||||
private volatile ProtocolClient client;
|
private volatile ProtocolClient client;
|
||||||
@@ -66,7 +75,6 @@ public final class OacWebRequestBroker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (query != null && !query.isBlank()) {
|
if (query != null && !query.isBlank()) {
|
||||||
// Keep query for server-side fallback parsing
|
|
||||||
return p + "?" + query;
|
return p + "?" + query;
|
||||||
}
|
}
|
||||||
return p;
|
return p;
|
||||||
@@ -153,7 +161,6 @@ public final class OacWebRequestBroker {
|
|||||||
throw new IllegalArgumentException("Missing InfoName in URL: " + url);
|
throw new IllegalArgumentException("Missing InfoName in URL: " + url);
|
||||||
}
|
}
|
||||||
|
|
||||||
// IMPORTANT FIX: include query in the path, so the server can read it as fallback.
|
|
||||||
String path = normalizePathWithQuery(url.getPath(), url.getQuery());
|
String path = normalizePathWithQuery(url.getPath(), url.getQuery());
|
||||||
|
|
||||||
beginNewResponse();
|
beginNewResponse();
|
||||||
@@ -168,6 +175,9 @@ public final class OacWebRequestBroker {
|
|||||||
sendWebRequest(client, path, method, headers, body);
|
sendWebRequest(client, path, method, headers, body);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by packet listener when server connection is established.
|
||||||
|
*/
|
||||||
public void notifyServerConnected() {
|
public void notifyServerConnected() {
|
||||||
CountDownLatch latch = connectionLatch;
|
CountDownLatch latch = connectionLatch;
|
||||||
if (latch != null) {
|
if (latch != null) {
|
||||||
@@ -175,10 +185,21 @@ public final class OacWebRequestBroker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invalidates cached InfoName, forcing a new INS resolution on next request.
|
||||||
|
*/
|
||||||
public synchronized void invalidateCurrentInfoName() {
|
public synchronized void invalidateCurrentInfoName() {
|
||||||
currentInfoName = null;
|
currentInfoName = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles a non-streamed WebResponsePacket.
|
||||||
|
*
|
||||||
|
* @param statusCode status code
|
||||||
|
* @param contentType content-type
|
||||||
|
* @param headers headers
|
||||||
|
* @param body body
|
||||||
|
*/
|
||||||
public void onWebResponse(int statusCode, String contentType, Map<String, String> headers, byte[] body) {
|
public void onWebResponse(int statusCode, String contentType, Map<String, String> headers, byte[] body) {
|
||||||
ResponseState st = responseState;
|
ResponseState st = responseState;
|
||||||
if (st == null) return;
|
if (st == null) return;
|
||||||
@@ -191,6 +212,7 @@ public final class OacWebRequestBroker {
|
|||||||
st.headers = safeHeaders(headers);
|
st.headers = safeHeaders(headers);
|
||||||
|
|
||||||
byte[] b = (body == null) ? new byte[0] : body;
|
byte[] b = (body == null) ? new byte[0] : body;
|
||||||
|
st.body.reset();
|
||||||
st.body.write(b, 0, b.length);
|
st.body.write(b, 0, b.length);
|
||||||
|
|
||||||
st.completed = true;
|
st.completed = true;
|
||||||
@@ -199,6 +221,14 @@ public final class OacWebRequestBroker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles the beginning of a streamed response.
|
||||||
|
*
|
||||||
|
* @param statusCode status code
|
||||||
|
* @param contentType content-type
|
||||||
|
* @param headers headers
|
||||||
|
* @param totalLength total length (may be -1)
|
||||||
|
*/
|
||||||
public void onStreamStart(int statusCode, String contentType, Map<String, String> headers, long totalLength) {
|
public void onStreamStart(int statusCode, String contentType, Map<String, String> headers, long totalLength) {
|
||||||
ResponseState st = responseState;
|
ResponseState st = responseState;
|
||||||
if (st == null) return;
|
if (st == null) return;
|
||||||
@@ -209,12 +239,26 @@ public final class OacWebRequestBroker {
|
|||||||
st.statusCode = statusCode;
|
st.statusCode = statusCode;
|
||||||
st.contentType = safeContentType(contentType);
|
st.contentType = safeContentType(contentType);
|
||||||
st.headers = safeHeaders(headers);
|
st.headers = safeHeaders(headers);
|
||||||
st.totalLength = Math.max(0, totalLength);
|
st.totalLength = totalLength;
|
||||||
|
|
||||||
st.streamStarted = true;
|
st.streamStarted = true;
|
||||||
|
st.maxSeqSeen = -1;
|
||||||
|
st.endReceived = false;
|
||||||
|
st.endReceivedAtMillis = 0L;
|
||||||
|
|
||||||
|
// Streaming body will be assembled on end (best-effort UDP)
|
||||||
|
st.body.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles a streamed chunk.
|
||||||
|
*
|
||||||
|
* <p>UDP best-effort: store by seq and assemble later; accept gaps.</p>
|
||||||
|
*
|
||||||
|
* @param seq chunk sequence number
|
||||||
|
* @param data chunk bytes
|
||||||
|
*/
|
||||||
public void onStreamChunk(int seq, byte[] data) {
|
public void onStreamChunk(int seq, byte[] data) {
|
||||||
ResponseState st = responseState;
|
ResponseState st = responseState;
|
||||||
if (st == null) return;
|
if (st == null) return;
|
||||||
@@ -228,11 +272,20 @@ public final class OacWebRequestBroker {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (seq < 0) return;
|
||||||
|
|
||||||
st.chunkBuffer.put(seq, Arrays.copyOf(data, data.length));
|
st.chunkBuffer.put(seq, Arrays.copyOf(data, data.length));
|
||||||
flushChunksLocked(st);
|
if (seq > st.maxSeqSeen) st.maxSeqSeen = seq;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles stream end.
|
||||||
|
*
|
||||||
|
* <p>UDP best-effort: do not complete immediately; allow late UDP packets and assemble after grace.</p>
|
||||||
|
*
|
||||||
|
* @param ok end status
|
||||||
|
*/
|
||||||
public void onStreamEnd(boolean ok) {
|
public void onStreamEnd(boolean ok) {
|
||||||
ResponseState st = responseState;
|
ResponseState st = responseState;
|
||||||
if (st == null) return;
|
if (st == null) return;
|
||||||
@@ -244,17 +297,20 @@ public final class OacWebRequestBroker {
|
|||||||
st.streamStarted = true;
|
st.streamStarted = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
flushChunksLocked(st);
|
|
||||||
|
|
||||||
st.completed = true;
|
|
||||||
st.success = ok;
|
st.success = ok;
|
||||||
if (!ok) {
|
st.endReceived = true;
|
||||||
st.errorMessage = "Streaming failed";
|
st.endReceivedAtMillis = System.currentTimeMillis();
|
||||||
}
|
// completion + assembly happens in awaitResponse() after grace window
|
||||||
st.done.countDown();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits for the current response (single-flight).
|
||||||
|
*
|
||||||
|
* @param timeout timeout
|
||||||
|
* @param unit unit
|
||||||
|
* @return response
|
||||||
|
*/
|
||||||
public Response awaitResponse(long timeout, TimeUnit unit) {
|
public Response awaitResponse(long timeout, TimeUnit unit) {
|
||||||
Objects.requireNonNull(unit, "unit");
|
Objects.requireNonNull(unit, "unit");
|
||||||
|
|
||||||
@@ -263,14 +319,43 @@ public final class OacWebRequestBroker {
|
|||||||
throw new IllegalStateException("No in-flight request");
|
throw new IllegalStateException("No in-flight request");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
|
||||||
boolean ok = st.done.await(timeout, unit);
|
|
||||||
if (!ok) {
|
for (;;) {
|
||||||
|
synchronized (responseLock) {
|
||||||
|
if (st.completed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-stream response already completed by onWebResponse()
|
||||||
|
if (!st.streamStarted && st.done.getCount() == 0) {
|
||||||
|
st.completed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (st.endReceived) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now - st.endReceivedAtMillis >= UDP_END_GRACE_MILLIS) {
|
||||||
|
// Assemble best-effort body from received chunks
|
||||||
|
assembleBestEffortLocked(st);
|
||||||
|
|
||||||
|
st.completed = true;
|
||||||
|
|
||||||
|
if (!st.success) {
|
||||||
|
st.errorMessage = (st.errorMessage == null) ? "Streaming failed" : st.errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
st.done.countDown();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (System.nanoTime() >= deadlineNanos) {
|
||||||
throw new IllegalStateException("Timeout while waiting for Web response");
|
throw new IllegalStateException("Timeout while waiting for Web response");
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
sleepSilently(10);
|
||||||
throw new IllegalStateException("Interrupted while waiting for Web response", e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (responseLock) {
|
synchronized (responseLock) {
|
||||||
@@ -368,13 +453,22 @@ public final class OacWebRequestBroker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flushChunksLocked(ResponseState st) {
|
/**
|
||||||
for (; ; ) {
|
* Assembles the response body from received chunks in ascending seq order.
|
||||||
byte[] next = st.chunkBuffer.remove(st.nextExpectedSeq);
|
*
|
||||||
if (next == null) break;
|
* <p>Best-effort UDP behavior: gaps are ignored.</p>
|
||||||
|
*/
|
||||||
|
private static void assembleBestEffortLocked(ResponseState st) {
|
||||||
|
st.body.reset();
|
||||||
|
|
||||||
st.body.write(next, 0, next.length);
|
if (st.chunkBuffer.isEmpty() || st.maxSeqSeen < 0) {
|
||||||
st.nextExpectedSeq++;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int seq = 0; seq <= st.maxSeqSeen; seq++) {
|
||||||
|
byte[] chunk = st.chunkBuffer.get(seq);
|
||||||
|
if (chunk == null) continue; // gap accepted
|
||||||
|
st.body.write(chunk, 0, chunk.length);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -385,11 +479,31 @@ public final class OacWebRequestBroker {
|
|||||||
st.done.countDown();
|
st.done.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void sleepSilently(long millis) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(millis);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Response DTO.
|
||||||
|
*
|
||||||
|
* @param statusCode status code
|
||||||
|
* @param contentType content type
|
||||||
|
* @param headers headers
|
||||||
|
* @param body body bytes
|
||||||
|
*/
|
||||||
public record Response(int statusCode, String contentType, Map<String, String> headers, byte[] body) {
|
public record Response(int statusCode, String contentType, Map<String, String> headers, byte[] body) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In-flight state for a single request (single-flight).
|
||||||
|
*/
|
||||||
private static final class ResponseState {
|
private static final class ResponseState {
|
||||||
private final CountDownLatch done = new CountDownLatch(1);
|
private final CountDownLatch done = new CountDownLatch(1);
|
||||||
|
|
||||||
private final ByteArrayOutputStream body = new ByteArrayOutputStream(64 * 1024);
|
private final ByteArrayOutputStream body = new ByteArrayOutputStream(64 * 1024);
|
||||||
private final Map<Integer, byte[]> chunkBuffer = new HashMap<>();
|
private final Map<Integer, byte[]> chunkBuffer = new HashMap<>();
|
||||||
|
|
||||||
@@ -399,7 +513,11 @@ public final class OacWebRequestBroker {
|
|||||||
|
|
||||||
private boolean streamStarted;
|
private boolean streamStarted;
|
||||||
private long totalLength;
|
private long totalLength;
|
||||||
private int nextExpectedSeq = 0;
|
|
||||||
|
private int maxSeqSeen = -1;
|
||||||
|
|
||||||
|
private boolean endReceived;
|
||||||
|
private long endReceivedAtMillis;
|
||||||
|
|
||||||
private boolean completed;
|
private boolean completed;
|
||||||
private boolean success;
|
private boolean success;
|
||||||
|
|||||||
@@ -1,237 +0,0 @@
|
|||||||
package org.openautonomousconnection.infonamelib;
|
|
||||||
|
|
||||||
import org.openautonomousconnection.protocol.versions.v1_0_0.beta.WebRequestMethod;
|
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.net.URLConnection;
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* URLConnection implementation that maps "web://" URLs to OAC WebRequestPacket/WebResponsePacket.
|
|
||||||
*
|
|
||||||
* <p>This implementation supports:</p>
|
|
||||||
* <ul>
|
|
||||||
* <li>GET requests via {@link #getInputStream()}</li>
|
|
||||||
* <li>POST requests via {@link #getOutputStream()} (e.g. HTML form submits)</li>
|
|
||||||
* <li>Request headers via {@link #setRequestProperty(String, String)}</li>
|
|
||||||
* <li>Redirect following (301/302/303/307/308) with session propagation</li>
|
|
||||||
* </ul>
|
|
||||||
*
|
|
||||||
* <p>Important: the underlying protocol has no request IDs, so the broker enforces single-flight.</p>
|
|
||||||
*/
|
|
||||||
public final class OacWebURLConnection extends URLConnection {
|
|
||||||
|
|
||||||
private static final int MAX_REDIRECTS = 8;
|
|
||||||
|
|
||||||
private final OacWebRequestBroker broker;
|
|
||||||
private final Map<String, String> requestHeaders = new LinkedHashMap<>();
|
|
||||||
private final ByteArrayOutputStream requestBody = new ByteArrayOutputStream(1024);
|
|
||||||
private boolean connected;
|
|
||||||
private OacWebResponse response;
|
|
||||||
private boolean outputOpened;
|
|
||||||
private boolean outputClosed;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new OAC URLConnection.
|
|
||||||
*
|
|
||||||
* @param url the web:// URL
|
|
||||||
* @param broker request broker
|
|
||||||
*/
|
|
||||||
public OacWebURLConnection(URL url, OacWebRequestBroker broker) {
|
|
||||||
super(url);
|
|
||||||
this.broker = Objects.requireNonNull(broker, "broker");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String headerValue(Map<String, String> headers, String nameLower) {
|
|
||||||
if (headers == null || headers.isEmpty() || nameLower == null) return null;
|
|
||||||
for (Map.Entry<String, String> e : headers.entrySet()) {
|
|
||||||
if (e.getKey() == null) continue;
|
|
||||||
if (e.getKey().trim().toLowerCase(Locale.ROOT).equals(nameLower)) {
|
|
||||||
return e.getValue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setRequestProperty(String key, String value) {
|
|
||||||
if (key == null) return;
|
|
||||||
if (value == null) requestHeaders.remove(key);
|
|
||||||
else requestHeaders.put(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getRequestProperty(String key) {
|
|
||||||
if (key == null) return null;
|
|
||||||
for (Map.Entry<String, String> e : requestHeaders.entrySet()) {
|
|
||||||
if (e.getKey() != null && e.getKey().equalsIgnoreCase(key)) return e.getValue();
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, List<String>> getRequestProperties() {
|
|
||||||
Map<String, List<String>> out = new LinkedHashMap<>();
|
|
||||||
for (Map.Entry<String, String> e : requestHeaders.entrySet()) {
|
|
||||||
out.put(e.getKey(), e.getValue() == null ? List.of() : List.of(e.getValue()));
|
|
||||||
}
|
|
||||||
return Collections.unmodifiableMap(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public OutputStream getOutputStream() {
|
|
||||||
setDoOutput(true);
|
|
||||||
outputOpened = true;
|
|
||||||
|
|
||||||
return new OutputStream() {
|
|
||||||
@Override
|
|
||||||
public void write(int b) {
|
|
||||||
requestBody.write(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(byte[] b, int off, int len) {
|
|
||||||
requestBody.write(b, off, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
outputClosed = true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connect() throws IOException {
|
|
||||||
if (connected) return;
|
|
||||||
|
|
||||||
URL cur = this.url;
|
|
||||||
|
|
||||||
// Decide method:
|
|
||||||
// - If doOutput is true OR content-type is present: treat as POST (even if body is empty).
|
|
||||||
// This fixes engines that perform POST with empty/unknown body.
|
|
||||||
boolean hasContentType = headerValue(requestHeaders, "content-type") != null;
|
|
||||||
boolean wantsPost = getDoOutput() || hasContentType;
|
|
||||||
|
|
||||||
WebRequestMethod method = wantsPost ? WebRequestMethod.POST : WebRequestMethod.GET;
|
|
||||||
|
|
||||||
byte[] body;
|
|
||||||
if (wantsPost) {
|
|
||||||
// Always send a body for POST; may be empty.
|
|
||||||
body = requestBody.toByteArray();
|
|
||||||
} else {
|
|
||||||
body = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure content-type exists for form posts if caller didn't set it.
|
|
||||||
if (method == WebRequestMethod.POST && !hasContentType) {
|
|
||||||
requestHeaders.put("content-type", "application/x-www-form-urlencoded; charset=utf-8");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Redirect loop.
|
|
||||||
OacWebResponse resp = null;
|
|
||||||
Map<String, String> carryHeaders = new LinkedHashMap<>(requestHeaders);
|
|
||||||
|
|
||||||
for (int i = 0; i <= MAX_REDIRECTS; i++) {
|
|
||||||
resp = broker.fetch(cur, method, carryHeaders, body);
|
|
||||||
|
|
||||||
int code = resp.statusCode();
|
|
||||||
if (code == 301 || code == 302 || code == 303 || code == 307 || code == 308) {
|
|
||||||
String loc = headerValue(resp.headers(), "location");
|
|
||||||
if (loc == null || loc.isBlank()) break;
|
|
||||||
|
|
||||||
// Propagate session header from redirect response to the next request.
|
|
||||||
String session = headerValue(resp.headers(), "session");
|
|
||||||
if (session != null && !session.isBlank()) {
|
|
||||||
carryHeaders.put("session", session);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resolve redirect URL.
|
|
||||||
try {
|
|
||||||
cur = new URL(cur, loc);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Redirect method handling:
|
|
||||||
// - 303: switch to GET
|
|
||||||
// - 301/302: commonly switch to GET for POST (browser-like)
|
|
||||||
// - 307/308: keep method and body
|
|
||||||
if (code == 303) {
|
|
||||||
method = WebRequestMethod.GET;
|
|
||||||
body = null;
|
|
||||||
} else if ((code == 301 || code == 302) && method == WebRequestMethod.POST) {
|
|
||||||
method = WebRequestMethod.GET;
|
|
||||||
body = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.response = resp;
|
|
||||||
this.connected = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InputStream getInputStream() throws IOException {
|
|
||||||
connect();
|
|
||||||
return response.bodyStream();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getContentType() {
|
|
||||||
try {
|
|
||||||
connect();
|
|
||||||
} catch (IOException e) {
|
|
||||||
return "application/octet-stream";
|
|
||||||
}
|
|
||||||
String ct = response.contentType();
|
|
||||||
return (ct == null || ct.isBlank()) ? "application/octet-stream" : ct;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getContentLength() {
|
|
||||||
try {
|
|
||||||
connect();
|
|
||||||
} catch (IOException e) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
long len = response.contentLength();
|
|
||||||
return (len <= 0 || len > Integer.MAX_VALUE) ? -1 : (int) len;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getContentLengthLong() {
|
|
||||||
try {
|
|
||||||
connect();
|
|
||||||
} catch (IOException e) {
|
|
||||||
return -1L;
|
|
||||||
}
|
|
||||||
return response.contentLength();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, List<String>> getHeaderFields() {
|
|
||||||
try {
|
|
||||||
connect();
|
|
||||||
} catch (IOException e) {
|
|
||||||
return Map.of();
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, List<String>> out = new LinkedHashMap<>();
|
|
||||||
for (Map.Entry<String, String> e : response.headers().entrySet()) {
|
|
||||||
String k = e.getKey();
|
|
||||||
String v = e.getValue();
|
|
||||||
if (k == null) continue;
|
|
||||||
out.put(k, v == null ? List.of() : List.of(v));
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -16,16 +16,16 @@ public final class OacWebUrlInstaller {
|
|||||||
private OacWebUrlInstaller() {
|
private OacWebUrlInstaller() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void installOnce(EventManager eventManager, ProtocolClient client) {
|
public static void installOnce(EventManager eventManager, ProtocolClient client, LibClientImpl impl) {
|
||||||
Objects.requireNonNull(eventManager, "eventManager");
|
Objects.requireNonNull(eventManager, "eventManager");
|
||||||
Objects.requireNonNull(client, "client");
|
Objects.requireNonNull(client, "client");
|
||||||
|
Objects.requireNonNull(impl, "impl");
|
||||||
|
|
||||||
if (!INSTALLED.compareAndSet(false, true)) return;
|
if (!INSTALLED.compareAndSet(false, true)) return;
|
||||||
|
|
||||||
OacWebRequestBroker.get().attachClient(client);
|
OacWebRequestBroker.get().attachClient(client);
|
||||||
eventManager.registerListener(new OacWebPacketListener(OacWebRequestBroker.get(), client));
|
eventManager.registerListener(new OacWebPacketListener(OacWebRequestBroker.get(), client, impl));
|
||||||
|
|
||||||
// IMPORTANT: must match "org.openautonomousconnection.infonamelib.web.Handler"
|
|
||||||
ProtocolHandlerPackages.installPackage("org.openautonomousconnection.infonamelib");
|
ProtocolHandlerPackages.installPackage("org.openautonomousconnection.infonamelib");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,28 +0,0 @@
|
|||||||
package org.openautonomousconnection.infonamelib;
|
|
||||||
|
|
||||||
import java.net.URL;
|
|
||||||
import java.net.URLConnection;
|
|
||||||
import java.net.URLStreamHandler;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* URLStreamHandler for the custom OAC scheme "web://".
|
|
||||||
*/
|
|
||||||
public final class OacWebUrlStreamHandler extends URLStreamHandler {
|
|
||||||
|
|
||||||
private final OacWebRequestBroker broker;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a handler.
|
|
||||||
*
|
|
||||||
* @param broker request broker
|
|
||||||
*/
|
|
||||||
public OacWebUrlStreamHandler(OacWebRequestBroker broker) {
|
|
||||||
this.broker = Objects.requireNonNull(broker, "broker");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected URLConnection openConnection(URL u) {
|
|
||||||
return new OacWebURLConnection(u, broker);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user