headers, long totalLength) {
ResponseState st = responseState;
if (st == null) return;
@@ -209,12 +239,26 @@ public final class OacWebRequestBroker {
st.statusCode = statusCode;
st.contentType = safeContentType(contentType);
st.headers = safeHeaders(headers);
- st.totalLength = Math.max(0, totalLength);
+ st.totalLength = totalLength;
st.streamStarted = true;
+ st.maxSeqSeen = -1;
+ st.endReceived = false;
+ st.endReceivedAtMillis = 0L;
+
+ // Streaming body will be assembled on end (best-effort UDP)
+ st.body.reset();
}
}
+ /**
+ * Handles a streamed chunk.
+ *
+ * UDP best-effort: store by seq and assemble later; accept gaps.
+ *
+ * @param seq chunk sequence number
+ * @param data chunk bytes
+ */
public void onStreamChunk(int seq, byte[] data) {
ResponseState st = responseState;
if (st == null) return;
@@ -228,11 +272,20 @@ public final class OacWebRequestBroker {
return;
}
+ if (seq < 0) return;
+
st.chunkBuffer.put(seq, Arrays.copyOf(data, data.length));
- flushChunksLocked(st);
+ if (seq > st.maxSeqSeen) st.maxSeqSeen = seq;
}
}
+ /**
+ * Handles stream end.
+ *
+ * UDP best-effort: do not complete immediately; allow late UDP packets and assemble after grace.
+ *
+ * @param ok end status
+ */
public void onStreamEnd(boolean ok) {
ResponseState st = responseState;
if (st == null) return;
@@ -244,17 +297,20 @@ public final class OacWebRequestBroker {
st.streamStarted = true;
}
- flushChunksLocked(st);
-
- st.completed = true;
st.success = ok;
- if (!ok) {
- st.errorMessage = "Streaming failed";
- }
- st.done.countDown();
+ st.endReceived = true;
+ st.endReceivedAtMillis = System.currentTimeMillis();
+ // completion + assembly happens in awaitResponse() after grace window
}
}
+ /**
+ * Waits for the current response (single-flight).
+ *
+ * @param timeout timeout
+ * @param unit unit
+ * @return response
+ */
public Response awaitResponse(long timeout, TimeUnit unit) {
Objects.requireNonNull(unit, "unit");
@@ -263,14 +319,43 @@ public final class OacWebRequestBroker {
throw new IllegalStateException("No in-flight request");
}
- try {
- boolean ok = st.done.await(timeout, unit);
- if (!ok) {
+ long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
+
+ for (;;) {
+ synchronized (responseLock) {
+ if (st.completed) {
+ break;
+ }
+
+ // Non-stream response already completed by onWebResponse()
+ if (!st.streamStarted && st.done.getCount() == 0) {
+ st.completed = true;
+ break;
+ }
+
+ if (st.endReceived) {
+ long now = System.currentTimeMillis();
+ if (now - st.endReceivedAtMillis >= UDP_END_GRACE_MILLIS) {
+ // Assemble best-effort body from received chunks
+ assembleBestEffortLocked(st);
+
+ st.completed = true;
+
+ if (!st.success) {
+ st.errorMessage = (st.errorMessage == null) ? "Streaming failed" : st.errorMessage;
+ }
+
+ st.done.countDown();
+ break;
+ }
+ }
+ }
+
+ if (System.nanoTime() >= deadlineNanos) {
throw new IllegalStateException("Timeout while waiting for Web response");
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException("Interrupted while waiting for Web response", e);
+
+ sleepSilently(10);
}
synchronized (responseLock) {
@@ -368,13 +453,22 @@ public final class OacWebRequestBroker {
}
}
- private void flushChunksLocked(ResponseState st) {
- for (; ; ) {
- byte[] next = st.chunkBuffer.remove(st.nextExpectedSeq);
- if (next == null) break;
+ /**
+ * Assembles the response body from received chunks in ascending seq order.
+ *
+ * Best-effort UDP behavior: gaps are ignored.
+ */
+ private static void assembleBestEffortLocked(ResponseState st) {
+ st.body.reset();
- st.body.write(next, 0, next.length);
- st.nextExpectedSeq++;
+ if (st.chunkBuffer.isEmpty() || st.maxSeqSeen < 0) {
+ return;
+ }
+
+ for (int seq = 0; seq <= st.maxSeqSeen; seq++) {
+ byte[] chunk = st.chunkBuffer.get(seq);
+ if (chunk == null) continue; // gap accepted
+ st.body.write(chunk, 0, chunk.length);
}
}
@@ -385,11 +479,31 @@ public final class OacWebRequestBroker {
st.done.countDown();
}
+ private static void sleepSilently(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Response DTO.
+ *
+ * @param statusCode status code
+ * @param contentType content type
+ * @param headers headers
+ * @param body body bytes
+ */
public record Response(int statusCode, String contentType, Map headers, byte[] body) {
}
+ /**
+ * In-flight state for a single request (single-flight).
+ */
private static final class ResponseState {
private final CountDownLatch done = new CountDownLatch(1);
+
private final ByteArrayOutputStream body = new ByteArrayOutputStream(64 * 1024);
private final Map chunkBuffer = new HashMap<>();
@@ -399,7 +513,11 @@ public final class OacWebRequestBroker {
private boolean streamStarted;
private long totalLength;
- private int nextExpectedSeq = 0;
+
+ private int maxSeqSeen = -1;
+
+ private boolean endReceived;
+ private long endReceivedAtMillis;
private boolean completed;
private boolean success;
diff --git a/src/main/java/org/openautonomousconnection/infonamelib/OacWebURLConnection.java b/src/main/java/org/openautonomousconnection/infonamelib/OacWebURLConnection.java
deleted file mode 100644
index cf30478..0000000
--- a/src/main/java/org/openautonomousconnection/infonamelib/OacWebURLConnection.java
+++ /dev/null
@@ -1,237 +0,0 @@
-package org.openautonomousconnection.infonamelib;
-
-import org.openautonomousconnection.protocol.versions.v1_0_0.beta.WebRequestMethod;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.*;
-
-/**
- * URLConnection implementation that maps "web://" URLs to OAC WebRequestPacket/WebResponsePacket.
- *
- * This implementation supports:
- *
- * - GET requests via {@link #getInputStream()}
- * - POST requests via {@link #getOutputStream()} (e.g. HTML form submits)
- * - Request headers via {@link #setRequestProperty(String, String)}
- * - Redirect following (301/302/303/307/308) with session propagation
- *
- *
- * Important: the underlying protocol has no request IDs, so the broker enforces single-flight.
- */
-public final class OacWebURLConnection extends URLConnection {
-
- private static final int MAX_REDIRECTS = 8;
-
- private final OacWebRequestBroker broker;
- private final Map requestHeaders = new LinkedHashMap<>();
- private final ByteArrayOutputStream requestBody = new ByteArrayOutputStream(1024);
- private boolean connected;
- private OacWebResponse response;
- private boolean outputOpened;
- private boolean outputClosed;
-
- /**
- * Creates a new OAC URLConnection.
- *
- * @param url the web:// URL
- * @param broker request broker
- */
- public OacWebURLConnection(URL url, OacWebRequestBroker broker) {
- super(url);
- this.broker = Objects.requireNonNull(broker, "broker");
- }
-
- private static String headerValue(Map headers, String nameLower) {
- if (headers == null || headers.isEmpty() || nameLower == null) return null;
- for (Map.Entry e : headers.entrySet()) {
- if (e.getKey() == null) continue;
- if (e.getKey().trim().toLowerCase(Locale.ROOT).equals(nameLower)) {
- return e.getValue();
- }
- }
- return null;
- }
-
- @Override
- public void setRequestProperty(String key, String value) {
- if (key == null) return;
- if (value == null) requestHeaders.remove(key);
- else requestHeaders.put(key, value);
- }
-
- @Override
- public String getRequestProperty(String key) {
- if (key == null) return null;
- for (Map.Entry e : requestHeaders.entrySet()) {
- if (e.getKey() != null && e.getKey().equalsIgnoreCase(key)) return e.getValue();
- }
- return null;
- }
-
- @Override
- public Map> getRequestProperties() {
- Map> out = new LinkedHashMap<>();
- for (Map.Entry e : requestHeaders.entrySet()) {
- out.put(e.getKey(), e.getValue() == null ? List.of() : List.of(e.getValue()));
- }
- return Collections.unmodifiableMap(out);
- }
-
- @Override
- public OutputStream getOutputStream() {
- setDoOutput(true);
- outputOpened = true;
-
- return new OutputStream() {
- @Override
- public void write(int b) {
- requestBody.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) {
- requestBody.write(b, off, len);
- }
-
- @Override
- public void close() {
- outputClosed = true;
- }
- };
- }
-
- @Override
- public void connect() throws IOException {
- if (connected) return;
-
- URL cur = this.url;
-
- // Decide method:
- // - If doOutput is true OR content-type is present: treat as POST (even if body is empty).
- // This fixes engines that perform POST with empty/unknown body.
- boolean hasContentType = headerValue(requestHeaders, "content-type") != null;
- boolean wantsPost = getDoOutput() || hasContentType;
-
- WebRequestMethod method = wantsPost ? WebRequestMethod.POST : WebRequestMethod.GET;
-
- byte[] body;
- if (wantsPost) {
- // Always send a body for POST; may be empty.
- body = requestBody.toByteArray();
- } else {
- body = null;
- }
-
- // Ensure content-type exists for form posts if caller didn't set it.
- if (method == WebRequestMethod.POST && !hasContentType) {
- requestHeaders.put("content-type", "application/x-www-form-urlencoded; charset=utf-8");
- }
-
- // Redirect loop.
- OacWebResponse resp = null;
- Map carryHeaders = new LinkedHashMap<>(requestHeaders);
-
- for (int i = 0; i <= MAX_REDIRECTS; i++) {
- resp = broker.fetch(cur, method, carryHeaders, body);
-
- int code = resp.statusCode();
- if (code == 301 || code == 302 || code == 303 || code == 307 || code == 308) {
- String loc = headerValue(resp.headers(), "location");
- if (loc == null || loc.isBlank()) break;
-
- // Propagate session header from redirect response to the next request.
- String session = headerValue(resp.headers(), "session");
- if (session != null && !session.isBlank()) {
- carryHeaders.put("session", session);
- }
-
- // Resolve redirect URL.
- try {
- cur = new URL(cur, loc);
- } catch (Exception ex) {
- break;
- }
-
- // Redirect method handling:
- // - 303: switch to GET
- // - 301/302: commonly switch to GET for POST (browser-like)
- // - 307/308: keep method and body
- if (code == 303) {
- method = WebRequestMethod.GET;
- body = null;
- } else if ((code == 301 || code == 302) && method == WebRequestMethod.POST) {
- method = WebRequestMethod.GET;
- body = null;
- }
-
- continue;
- }
-
- break;
- }
-
- this.response = resp;
- this.connected = true;
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- connect();
- return response.bodyStream();
- }
-
- @Override
- public String getContentType() {
- try {
- connect();
- } catch (IOException e) {
- return "application/octet-stream";
- }
- String ct = response.contentType();
- return (ct == null || ct.isBlank()) ? "application/octet-stream" : ct;
- }
-
- @Override
- public int getContentLength() {
- try {
- connect();
- } catch (IOException e) {
- return -1;
- }
- long len = response.contentLength();
- return (len <= 0 || len > Integer.MAX_VALUE) ? -1 : (int) len;
- }
-
- @Override
- public long getContentLengthLong() {
- try {
- connect();
- } catch (IOException e) {
- return -1L;
- }
- return response.contentLength();
- }
-
- @Override
- public Map> getHeaderFields() {
- try {
- connect();
- } catch (IOException e) {
- return Map.of();
- }
-
- Map> out = new LinkedHashMap<>();
- for (Map.Entry e : response.headers().entrySet()) {
- String k = e.getKey();
- String v = e.getValue();
- if (k == null) continue;
- out.put(k, v == null ? List.of() : List.of(v));
- }
- return out;
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/openautonomousconnection/infonamelib/OacWebUrlInstaller.java b/src/main/java/org/openautonomousconnection/infonamelib/OacWebUrlInstaller.java
index 0309b1d..c8369ad 100644
--- a/src/main/java/org/openautonomousconnection/infonamelib/OacWebUrlInstaller.java
+++ b/src/main/java/org/openautonomousconnection/infonamelib/OacWebUrlInstaller.java
@@ -16,16 +16,16 @@ public final class OacWebUrlInstaller {
private OacWebUrlInstaller() {
}
- public static void installOnce(EventManager eventManager, ProtocolClient client) {
+ public static void installOnce(EventManager eventManager, ProtocolClient client, LibClientImpl impl) {
Objects.requireNonNull(eventManager, "eventManager");
Objects.requireNonNull(client, "client");
+ Objects.requireNonNull(impl, "impl");
if (!INSTALLED.compareAndSet(false, true)) return;
OacWebRequestBroker.get().attachClient(client);
- eventManager.registerListener(new OacWebPacketListener(OacWebRequestBroker.get(), client));
+ eventManager.registerListener(new OacWebPacketListener(OacWebRequestBroker.get(), client, impl));
- // IMPORTANT: must match "org.openautonomousconnection.infonamelib.web.Handler"
ProtocolHandlerPackages.installPackage("org.openautonomousconnection.infonamelib");
}
}
\ No newline at end of file
diff --git a/src/main/java/org/openautonomousconnection/infonamelib/OacWebUrlStreamHandler.java b/src/main/java/org/openautonomousconnection/infonamelib/OacWebUrlStreamHandler.java
deleted file mode 100644
index 22d311e..0000000
--- a/src/main/java/org/openautonomousconnection/infonamelib/OacWebUrlStreamHandler.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.openautonomousconnection.infonamelib;
-
-import java.net.URL;
-import java.net.URLConnection;
-import java.net.URLStreamHandler;
-import java.util.Objects;
-
-/**
- * URLStreamHandler for the custom OAC scheme "web://".
- */
-public final class OacWebUrlStreamHandler extends URLStreamHandler {
-
- private final OacWebRequestBroker broker;
-
- /**
- * Creates a handler.
- *
- * @param broker request broker
- */
- public OacWebUrlStreamHandler(OacWebRequestBroker broker) {
- this.broker = Objects.requireNonNull(broker, "broker");
- }
-
- @Override
- protected URLConnection openConnection(URL u) {
- return new OacWebURLConnection(u, broker);
- }
-}
\ No newline at end of file