mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-27 09:21:35 +07:00
Fix BlueBubbles send confirmation reliability
This commit is contained in:
@@ -49,6 +49,9 @@ const bluebubblesAccountSchema = z
|
||||
mediaMaxMb: z.number().int().positive().optional(),
|
||||
mediaLocalRoots: z.array(z.string()).optional(),
|
||||
sendReadReceipts: z.boolean().optional(),
|
||||
sendConfirmationTimeoutMs: z.number().int().positive().optional(),
|
||||
sendRetryCount: z.number().int().min(0).max(3).optional(),
|
||||
sendRetryBaseDelayMs: z.number().int().positive().optional(),
|
||||
allowPrivateNetwork: z.boolean().optional(),
|
||||
blockStreaming: z.boolean().optional(),
|
||||
groups: z.object({}).catchall(bluebubblesGroupConfigSchema).optional(),
|
||||
|
||||
@@ -33,6 +33,7 @@ import type {
|
||||
BlueBubblesRuntimeEnv,
|
||||
WebhookTarget,
|
||||
} from "./monitor-shared.js";
|
||||
import { confirmBlueBubblesOutboundMessage } from "./outbound-confirmation.js";
|
||||
import { isBlueBubblesPrivateApiEnabled } from "./probe.js";
|
||||
import { normalizeBlueBubblesReactionInput, sendBlueBubblesReaction } from "./reactions.js";
|
||||
import type { OpenClawConfig } from "./runtime-api.js";
|
||||
@@ -521,6 +522,14 @@ export async function processMessage(
|
||||
if (message.fromMe) {
|
||||
// Cache from-me messages so reply context can resolve sender/body.
|
||||
cacheInboundMessage();
|
||||
confirmBlueBubblesOutboundMessage({
|
||||
accountId: account.accountId,
|
||||
chatGuid: message.chatGuid,
|
||||
chatIdentifier: message.chatIdentifier,
|
||||
chatId: message.chatId,
|
||||
messageId: cacheMessageId,
|
||||
body: rawBody,
|
||||
});
|
||||
const confirmedAssistantOutbound =
|
||||
confirmedOutboundCacheEntry?.senderLabel === "me" &&
|
||||
normalizeSnippet(confirmedOutboundCacheEntry.body ?? "") === normalizeSnippet(rawBody);
|
||||
|
||||
74
extensions/bluebubbles/src/outbound-confirmation.test.ts
Normal file
74
extensions/bluebubbles/src/outbound-confirmation.test.ts
Normal file
@@ -0,0 +1,74 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
clearBlueBubblesOutboundConfirmations,
|
||||
confirmBlueBubblesOutboundMessage,
|
||||
waitForBlueBubblesOutboundConfirmation,
|
||||
} from "./outbound-confirmation.js";
|
||||
|
||||
afterEach(() => {
|
||||
clearBlueBubblesOutboundConfirmations();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
describe("outbound confirmation", () => {
|
||||
it("matches a pending confirmation by chat and message id", async () => {
|
||||
const pendingPromise = waitForBlueBubblesOutboundConfirmation({
|
||||
accountId: "default",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
messageId: "msg-123",
|
||||
body: "Hello there",
|
||||
timeoutMs: 1_000,
|
||||
});
|
||||
|
||||
expect(
|
||||
confirmBlueBubblesOutboundMessage({
|
||||
accountId: "default",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
messageId: "msg-123",
|
||||
body: "Hello there",
|
||||
}),
|
||||
).toBe(true);
|
||||
|
||||
await expect(pendingPromise).resolves.toEqual({
|
||||
messageId: "msg-123",
|
||||
source: "webhook",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to normalized body matching", async () => {
|
||||
const pendingPromise = waitForBlueBubblesOutboundConfirmation({
|
||||
accountId: "default",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
body: "**Hello** there",
|
||||
timeoutMs: 1_000,
|
||||
});
|
||||
|
||||
expect(
|
||||
confirmBlueBubblesOutboundMessage({
|
||||
accountId: "default",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
body: "Hello there",
|
||||
}),
|
||||
).toBe(true);
|
||||
|
||||
await expect(pendingPromise).resolves.toEqual({
|
||||
messageId: undefined,
|
||||
source: "webhook",
|
||||
});
|
||||
});
|
||||
|
||||
it("returns null when confirmation times out", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
const pendingPromise = waitForBlueBubblesOutboundConfirmation({
|
||||
accountId: "default",
|
||||
chatGuid: "iMessage;-;+15551234567",
|
||||
body: "Hello there",
|
||||
timeoutMs: 250,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(250);
|
||||
|
||||
await expect(pendingPromise).resolves.toBeNull();
|
||||
});
|
||||
});
|
||||
165
extensions/bluebubbles/src/outbound-confirmation.ts
Normal file
165
extensions/bluebubbles/src/outbound-confirmation.ts
Normal file
@@ -0,0 +1,165 @@
|
||||
import { stripMarkdown } from "./runtime-api.js";
|
||||
|
||||
const PENDING_CONFIRMATION_TTL_MS = 2 * 60 * 1000;
|
||||
|
||||
export type BlueBubblesOutboundConfirmation = {
|
||||
messageId?: string;
|
||||
source: "webhook";
|
||||
};
|
||||
|
||||
type PendingBlueBubblesOutboundConfirmation = {
|
||||
id: number;
|
||||
accountId: string;
|
||||
chatGuid?: string;
|
||||
chatIdentifier?: string;
|
||||
chatId?: number;
|
||||
messageId?: string;
|
||||
bodyNorm: string;
|
||||
createdAt: number;
|
||||
resolve: (value: BlueBubblesOutboundConfirmation | null) => void;
|
||||
timeout: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
|
||||
const pendingConfirmations: PendingBlueBubblesOutboundConfirmation[] = [];
|
||||
let nextPendingConfirmationId = 0;
|
||||
|
||||
function trimOrUndefined(value?: string | null): string | undefined {
|
||||
const trimmed = value?.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function normalizeBody(value: string): string {
|
||||
return stripMarkdown(value).replace(/\s+/g, " ").trim().toLowerCase();
|
||||
}
|
||||
|
||||
function prunePendingConfirmations(now = Date.now()): void {
|
||||
const cutoff = now - PENDING_CONFIRMATION_TTL_MS;
|
||||
for (let i = pendingConfirmations.length - 1; i >= 0; i--) {
|
||||
if (pendingConfirmations[i].createdAt < cutoff) {
|
||||
const [entry] = pendingConfirmations.splice(i, 1);
|
||||
clearTimeout(entry.timeout);
|
||||
entry.resolve(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function removePendingConfirmation(id: number): PendingBlueBubblesOutboundConfirmation | null {
|
||||
const index = pendingConfirmations.findIndex((entry) => entry.id === id);
|
||||
if (index < 0) {
|
||||
return null;
|
||||
}
|
||||
const [entry] = pendingConfirmations.splice(index, 1);
|
||||
clearTimeout(entry.timeout);
|
||||
return entry ?? null;
|
||||
}
|
||||
|
||||
function chatsMatch(
|
||||
pending: Pick<PendingBlueBubblesOutboundConfirmation, "chatGuid" | "chatIdentifier" | "chatId">,
|
||||
candidate: { chatGuid?: string; chatIdentifier?: string; chatId?: number },
|
||||
): boolean {
|
||||
const pendingGuid = trimOrUndefined(pending.chatGuid);
|
||||
const candidateGuid = trimOrUndefined(candidate.chatGuid);
|
||||
if (pendingGuid && candidateGuid) {
|
||||
return pendingGuid === candidateGuid;
|
||||
}
|
||||
|
||||
const pendingIdentifier = trimOrUndefined(pending.chatIdentifier);
|
||||
const candidateIdentifier = trimOrUndefined(candidate.chatIdentifier);
|
||||
if (pendingIdentifier && candidateIdentifier) {
|
||||
return pendingIdentifier === candidateIdentifier;
|
||||
}
|
||||
|
||||
const pendingChatId = typeof pending.chatId === "number" ? pending.chatId : undefined;
|
||||
const candidateChatId = typeof candidate.chatId === "number" ? candidate.chatId : undefined;
|
||||
if (pendingChatId !== undefined && candidateChatId !== undefined) {
|
||||
return pendingChatId === candidateChatId;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export async function waitForBlueBubblesOutboundConfirmation(params: {
|
||||
accountId: string;
|
||||
chatGuid?: string;
|
||||
chatIdentifier?: string;
|
||||
chatId?: number;
|
||||
messageId?: string;
|
||||
body: string;
|
||||
timeoutMs: number;
|
||||
}): Promise<BlueBubblesOutboundConfirmation | null> {
|
||||
prunePendingConfirmations();
|
||||
const normalizedTimeoutMs = Math.max(0, Math.floor(params.timeoutMs));
|
||||
const normalizedBody = normalizeBody(params.body);
|
||||
if (!normalizedBody || normalizedTimeoutMs === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return await new Promise<BlueBubblesOutboundConfirmation | null>((resolve) => {
|
||||
nextPendingConfirmationId += 1;
|
||||
const pendingId = nextPendingConfirmationId;
|
||||
const timeout = setTimeout(() => {
|
||||
const entry = removePendingConfirmation(pendingId);
|
||||
entry?.resolve(null);
|
||||
}, normalizedTimeoutMs);
|
||||
|
||||
pendingConfirmations.push({
|
||||
id: pendingId,
|
||||
accountId: params.accountId,
|
||||
chatGuid: trimOrUndefined(params.chatGuid),
|
||||
chatIdentifier: trimOrUndefined(params.chatIdentifier),
|
||||
chatId: typeof params.chatId === "number" ? params.chatId : undefined,
|
||||
messageId: trimOrUndefined(params.messageId),
|
||||
bodyNorm: normalizedBody,
|
||||
createdAt: Date.now(),
|
||||
resolve,
|
||||
timeout,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function confirmBlueBubblesOutboundMessage(params: {
|
||||
accountId: string;
|
||||
chatGuid?: string;
|
||||
chatIdentifier?: string;
|
||||
chatId?: number;
|
||||
messageId?: string;
|
||||
body: string;
|
||||
}): boolean {
|
||||
prunePendingConfirmations();
|
||||
const normalizedBody = normalizeBody(params.body);
|
||||
const normalizedMessageId = trimOrUndefined(params.messageId);
|
||||
for (let i = 0; i < pendingConfirmations.length; i++) {
|
||||
const pending = pendingConfirmations[i];
|
||||
if (pending.accountId !== params.accountId) {
|
||||
continue;
|
||||
}
|
||||
if (!chatsMatch(pending, params)) {
|
||||
continue;
|
||||
}
|
||||
if (normalizedMessageId && pending.messageId && normalizedMessageId === pending.messageId) {
|
||||
pendingConfirmations.splice(i, 1);
|
||||
clearTimeout(pending.timeout);
|
||||
pending.resolve({ messageId: normalizedMessageId, source: "webhook" });
|
||||
return true;
|
||||
}
|
||||
if (!normalizedBody || pending.bodyNorm !== normalizedBody) {
|
||||
continue;
|
||||
}
|
||||
pendingConfirmations.splice(i, 1);
|
||||
clearTimeout(pending.timeout);
|
||||
pending.resolve({ messageId: normalizedMessageId, source: "webhook" });
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export function clearBlueBubblesOutboundConfirmations(): void {
|
||||
while (pendingConfirmations.length > 0) {
|
||||
const entry = pendingConfirmations.pop();
|
||||
if (!entry) {
|
||||
continue;
|
||||
}
|
||||
clearTimeout(entry.timeout);
|
||||
entry.resolve(null);
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,12 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import "./test-mocks.js";
|
||||
|
||||
const waitForBlueBubblesOutboundConfirmationMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("./outbound-confirmation.js", () => ({
|
||||
waitForBlueBubblesOutboundConfirmation: waitForBlueBubblesOutboundConfirmationMock,
|
||||
}));
|
||||
|
||||
import { getCachedBlueBubblesPrivateApiStatus } from "./probe.js";
|
||||
import type { PluginRuntime } from "./runtime-api.js";
|
||||
import { clearBlueBubblesRuntime, setBlueBubblesRuntime } from "./runtime.js";
|
||||
@@ -19,6 +26,13 @@ installBlueBubblesFetchTestHooks({
|
||||
privateApiStatusMock,
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
waitForBlueBubblesOutboundConfirmationMock.mockReset();
|
||||
waitForBlueBubblesOutboundConfirmationMock.mockResolvedValue({
|
||||
source: "webhook",
|
||||
});
|
||||
});
|
||||
|
||||
function mockResolvedHandleTarget(
|
||||
guid: string = "iMessage;-;+15551234567",
|
||||
address: string = "+15551234567",
|
||||
@@ -780,6 +794,75 @@ describe("send", () => {
|
||||
expect(typeof body.tempGuid).toBe("string");
|
||||
expect(body.tempGuid.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("throws when BlueBubbles returns an error envelope with HTTP 200", async () => {
|
||||
mockResolvedHandleTarget();
|
||||
mockSendResponse({
|
||||
status: 500,
|
||||
message: "Message Send Error",
|
||||
error: { message: "Transaction timeout", type: "iMessage Error" },
|
||||
});
|
||||
|
||||
await expect(
|
||||
sendMessageBlueBubbles("+15551234567", "Hello", {
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
}),
|
||||
).rejects.toThrow("Transaction timeout");
|
||||
});
|
||||
|
||||
it("retries once when BlueBubbles accepts the send but no confirmation arrives", async () => {
|
||||
waitForBlueBubblesOutboundConfirmationMock
|
||||
.mockResolvedValueOnce(null)
|
||||
.mockResolvedValueOnce({ source: "webhook" });
|
||||
|
||||
mockResolvedHandleTarget();
|
||||
mockSendResponse({ data: { guid: "msg-first-attempt" } });
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: () => Promise.resolve({ data: [] }),
|
||||
});
|
||||
mockSendResponse({ data: { guid: "msg-second-attempt" } });
|
||||
|
||||
const result = await sendMessageBlueBubbles("+15551234567", "Hello", {
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
deliveryRetryBaseDelayMs: 1,
|
||||
});
|
||||
|
||||
expect(result.messageId).toBe("msg-second-attempt");
|
||||
expect(waitForBlueBubblesOutboundConfirmationMock).toHaveBeenCalledTimes(2);
|
||||
expect(mockFetch).toHaveBeenCalledTimes(4);
|
||||
});
|
||||
|
||||
it("uses recent history as delivery confirmation before retrying", async () => {
|
||||
waitForBlueBubblesOutboundConfirmationMock.mockResolvedValueOnce(null);
|
||||
|
||||
mockResolvedHandleTarget();
|
||||
mockSendResponse({ data: { guid: "msg-api-response" } });
|
||||
mockFetch.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: () =>
|
||||
Promise.resolve({
|
||||
data: [
|
||||
{
|
||||
guid: "msg-history-confirmed",
|
||||
text: "Hello from history",
|
||||
is_from_me: true,
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
|
||||
const result = await sendMessageBlueBubbles("+15551234567", "Hello from history", {
|
||||
serverUrl: "http://localhost:1234",
|
||||
password: "test",
|
||||
});
|
||||
|
||||
expect(result.messageId).toBe("msg-history-confirmed");
|
||||
expect(waitForBlueBubblesOutboundConfirmationMock).toHaveBeenCalledTimes(1);
|
||||
expect(mockFetch).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createChatForHandle", () => {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import crypto from "node:crypto";
|
||||
import { resolveBlueBubblesAccount } from "./accounts.js";
|
||||
import { fetchBlueBubblesHistory } from "./history.js";
|
||||
import { waitForBlueBubblesOutboundConfirmation } from "./outbound-confirmation.js";
|
||||
import {
|
||||
getCachedBlueBubblesPrivateApiStatus,
|
||||
isBlueBubblesPrivateApiStatusEnabled,
|
||||
@@ -28,6 +30,12 @@ export type BlueBubblesSendOpts = {
|
||||
replyToPartIndex?: number;
|
||||
/** Effect ID or short name for message effects (e.g., "slam", "balloons") */
|
||||
effectId?: string;
|
||||
/** Wait for webhook or history confirmation before treating the send as delivered. */
|
||||
deliveryConfirmationTimeoutMs?: number;
|
||||
/** Retries to attempt when BlueBubbles accepts a send but no delivery confirmation arrives. */
|
||||
deliveryRetryCount?: number;
|
||||
/** Base delay between retry attempts; exponential backoff is applied from this value. */
|
||||
deliveryRetryBaseDelayMs?: number;
|
||||
};
|
||||
|
||||
export type BlueBubblesSendResult = {
|
||||
@@ -108,16 +116,233 @@ function resolvePrivateApiDecision(params: {
|
||||
};
|
||||
}
|
||||
|
||||
async function parseBlueBubblesMessageResponse(res: Response): Promise<BlueBubblesSendResult> {
|
||||
const DEFAULT_DELIVERY_CONFIRMATION_TIMEOUT_MS = 8_000;
|
||||
const DEFAULT_DELIVERY_RETRY_COUNT = 1;
|
||||
const DEFAULT_DELIVERY_RETRY_BASE_DELAY_MS = 1_500;
|
||||
const DELIVERY_HISTORY_CHECK_LIMIT = 8;
|
||||
|
||||
type BlueBubblesDeliveryConfig = {
|
||||
timeoutMs: number;
|
||||
retryCount: number;
|
||||
retryBaseDelayMs: number;
|
||||
};
|
||||
|
||||
type ParsedBlueBubblesMessageResponse = BlueBubblesSendResult & {
|
||||
chatGuid?: string | null;
|
||||
};
|
||||
|
||||
type BlueBubblesApiRecord = Record<string, unknown>;
|
||||
|
||||
function asBlueBubblesRecord(value: unknown): BlueBubblesApiRecord | null {
|
||||
return value && typeof value === "object" && !Array.isArray(value)
|
||||
? (value as BlueBubblesApiRecord)
|
||||
: null;
|
||||
}
|
||||
|
||||
function extractBlueBubblesResponseError(
|
||||
payload: unknown,
|
||||
): { status?: number; message: string } | null {
|
||||
const record = asBlueBubblesRecord(payload);
|
||||
if (!record) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const status =
|
||||
typeof record.status === "number" && Number.isFinite(record.status) ? record.status : undefined;
|
||||
const errorRecord = asBlueBubblesRecord(record.error);
|
||||
const explicitMessage =
|
||||
(typeof errorRecord?.message === "string" && errorRecord.message) ||
|
||||
(typeof errorRecord?.error === "string" && errorRecord.error) ||
|
||||
(typeof record.message === "string" &&
|
||||
status !== undefined &&
|
||||
status >= 400 &&
|
||||
record.message) ||
|
||||
"";
|
||||
if (status !== undefined && status >= 400) {
|
||||
return { status, message: explicitMessage || `status ${status}` };
|
||||
}
|
||||
if (errorRecord) {
|
||||
return {
|
||||
status,
|
||||
message:
|
||||
explicitMessage ||
|
||||
(typeof errorRecord.type === "string" && errorRecord.type) ||
|
||||
"unknown error",
|
||||
};
|
||||
}
|
||||
if (record.success === false) {
|
||||
return {
|
||||
status,
|
||||
message: explicitMessage || "request was not accepted by BlueBubbles",
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function extractBlueBubblesResponseChatGuid(payload: unknown): string | null {
|
||||
const record = asBlueBubblesRecord(payload);
|
||||
if (!record) {
|
||||
return null;
|
||||
}
|
||||
const data = asBlueBubblesRecord(record.data);
|
||||
const result = asBlueBubblesRecord(record.result);
|
||||
const payloadRecord = asBlueBubblesRecord(record.payload);
|
||||
const roots = [record, data, result, payloadRecord];
|
||||
for (const root of roots) {
|
||||
if (!root) {
|
||||
continue;
|
||||
}
|
||||
const candidates = [root.chatGuid, root.chat_guid, root.guid, root.chatIdentifier];
|
||||
for (const candidate of candidates) {
|
||||
if (typeof candidate === "string" && candidate.trim()) {
|
||||
return candidate.trim();
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function clampPositiveNumber(value: number | undefined, fallback: number, max: number): number {
|
||||
if (typeof value !== "number" || !Number.isFinite(value)) {
|
||||
return fallback;
|
||||
}
|
||||
const normalized = Math.floor(value);
|
||||
if (normalized <= 0) {
|
||||
return fallback;
|
||||
}
|
||||
return Math.min(normalized, max);
|
||||
}
|
||||
|
||||
function resolveDeliveryConfig(
|
||||
opts: BlueBubblesSendOpts,
|
||||
account: ReturnType<typeof resolveBlueBubblesAccount>,
|
||||
): BlueBubblesDeliveryConfig {
|
||||
return {
|
||||
timeoutMs: clampPositiveNumber(
|
||||
opts.deliveryConfirmationTimeoutMs ?? account.config.sendConfirmationTimeoutMs,
|
||||
DEFAULT_DELIVERY_CONFIRMATION_TIMEOUT_MS,
|
||||
30_000,
|
||||
),
|
||||
retryCount: Math.min(
|
||||
clampPositiveNumber(
|
||||
opts.deliveryRetryCount ?? account.config.sendRetryCount,
|
||||
DEFAULT_DELIVERY_RETRY_COUNT,
|
||||
3,
|
||||
),
|
||||
3,
|
||||
),
|
||||
retryBaseDelayMs: clampPositiveNumber(
|
||||
opts.deliveryRetryBaseDelayMs ?? account.config.sendRetryBaseDelayMs,
|
||||
DEFAULT_DELIVERY_RETRY_BASE_DELAY_MS,
|
||||
30_000,
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function normalizeConfirmationBody(text: string): string {
|
||||
return stripMarkdown(text).replace(/\s+/g, " ").trim().toLowerCase();
|
||||
}
|
||||
|
||||
async function isMessageVisibleInRecentHistory(params: {
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
accountId: string;
|
||||
chatGuid: string;
|
||||
body: string;
|
||||
timeoutMs?: number;
|
||||
}): Promise<string | null> {
|
||||
const bodyNorm = normalizeConfirmationBody(params.body);
|
||||
if (!bodyNorm) {
|
||||
return null;
|
||||
}
|
||||
const history = await fetchBlueBubblesHistory(params.chatGuid, DELIVERY_HISTORY_CHECK_LIMIT, {
|
||||
serverUrl: params.baseUrl,
|
||||
password: params.password,
|
||||
accountId: params.accountId,
|
||||
timeoutMs: params.timeoutMs,
|
||||
});
|
||||
for (let i = history.entries.length - 1; i >= 0; i--) {
|
||||
const entry = history.entries[i];
|
||||
if (entry.sender !== "me") {
|
||||
continue;
|
||||
}
|
||||
if (normalizeConfirmationBody(entry.body) !== bodyNorm) {
|
||||
continue;
|
||||
}
|
||||
const messageId = entry.messageId?.trim();
|
||||
return messageId || "ok";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
async function waitForDeliveryConfirmation(params: {
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
accountId: string;
|
||||
chatGuid: string;
|
||||
messageId: string;
|
||||
body: string;
|
||||
confirmationTimeoutMs: number;
|
||||
timeoutMs?: number;
|
||||
}): Promise<{ confirmed: boolean; messageId: string }> {
|
||||
const webhookConfirmation = await waitForBlueBubblesOutboundConfirmation({
|
||||
accountId: params.accountId,
|
||||
chatGuid: params.chatGuid,
|
||||
messageId: params.messageId,
|
||||
body: params.body,
|
||||
timeoutMs: params.confirmationTimeoutMs,
|
||||
});
|
||||
if (webhookConfirmation) {
|
||||
return {
|
||||
confirmed: true,
|
||||
messageId: webhookConfirmation.messageId?.trim() || params.messageId,
|
||||
};
|
||||
}
|
||||
|
||||
const historyMessageId = await isMessageVisibleInRecentHistory({
|
||||
baseUrl: params.baseUrl,
|
||||
password: params.password,
|
||||
accountId: params.accountId,
|
||||
chatGuid: params.chatGuid,
|
||||
body: params.body,
|
||||
timeoutMs: params.timeoutMs,
|
||||
});
|
||||
if (historyMessageId) {
|
||||
return { confirmed: true, messageId: historyMessageId };
|
||||
}
|
||||
|
||||
return { confirmed: false, messageId: params.messageId };
|
||||
}
|
||||
|
||||
async function parseBlueBubblesMessageResponse(
|
||||
res: Response,
|
||||
): Promise<ParsedBlueBubblesMessageResponse> {
|
||||
const body = await res.text();
|
||||
if (!body) {
|
||||
return { messageId: "ok" };
|
||||
return { messageId: "ok", chatGuid: null };
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(body) as unknown;
|
||||
return { messageId: extractBlueBubblesMessageId(parsed) };
|
||||
} catch {
|
||||
return { messageId: "ok" };
|
||||
const responseError = extractBlueBubblesResponseError(parsed);
|
||||
if (responseError) {
|
||||
const statusSuffix = responseError.status ? ` (${responseError.status})` : "";
|
||||
throw new Error(
|
||||
`BlueBubbles send failed${statusSuffix}: ${responseError.message || "unknown"}`,
|
||||
);
|
||||
}
|
||||
return {
|
||||
messageId: extractBlueBubblesMessageId(parsed),
|
||||
chatGuid: extractBlueBubblesResponseChatGuid(parsed),
|
||||
};
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.message.startsWith("BlueBubbles send failed")) {
|
||||
throw error;
|
||||
}
|
||||
return { messageId: "ok", chatGuid: null };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -397,25 +622,135 @@ export async function createChatForHandle(params: {
|
||||
return { chatGuid, messageId };
|
||||
}
|
||||
|
||||
async function sendTextMessageOnce(params: {
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
payload: Record<string, unknown>;
|
||||
timeoutMs?: number;
|
||||
}): Promise<ParsedBlueBubblesMessageResponse> {
|
||||
const url = buildBlueBubblesApiUrl({
|
||||
baseUrl: params.baseUrl,
|
||||
path: "/api/v1/message/text",
|
||||
password: params.password,
|
||||
});
|
||||
const res = await blueBubblesFetchWithTimeout(
|
||||
url,
|
||||
{
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(params.payload),
|
||||
},
|
||||
params.timeoutMs,
|
||||
);
|
||||
if (!res.ok) {
|
||||
const errorText = await res.text();
|
||||
throw new Error(`BlueBubbles send failed (${res.status}): ${errorText || "unknown"}`);
|
||||
}
|
||||
return parseBlueBubblesMessageResponse(res);
|
||||
}
|
||||
|
||||
async function sendMessageWithDeliveryRetries(params: {
|
||||
accountId: string;
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
message: string;
|
||||
confirmationChatGuid?: string | null;
|
||||
resolveConfirmationChatGuid?: () => Promise<string | null>;
|
||||
timeoutMs?: number;
|
||||
delivery: BlueBubblesDeliveryConfig;
|
||||
send: () => Promise<ParsedBlueBubblesMessageResponse>;
|
||||
}): Promise<BlueBubblesSendResult> {
|
||||
const maxAttempts = Math.max(1, params.delivery.retryCount + 1);
|
||||
let resolvedChatGuid = params.confirmationChatGuid?.trim() || null;
|
||||
let lastMessageId = "ok";
|
||||
|
||||
for (let attemptIndex = 0; attemptIndex < maxAttempts; attemptIndex++) {
|
||||
const sendResult = await params.send();
|
||||
lastMessageId = sendResult.messageId;
|
||||
resolvedChatGuid = sendResult.chatGuid?.trim() || resolvedChatGuid;
|
||||
if (!resolvedChatGuid && params.resolveConfirmationChatGuid) {
|
||||
resolvedChatGuid = await params.resolveConfirmationChatGuid();
|
||||
}
|
||||
if (!resolvedChatGuid) {
|
||||
warnBlueBubbles(
|
||||
"send accepted by BlueBubbles, but no chatGuid was available to verify delivery. Returning the API result without confirmation.",
|
||||
);
|
||||
return { messageId: lastMessageId };
|
||||
}
|
||||
|
||||
const confirmation = await waitForDeliveryConfirmation({
|
||||
baseUrl: params.baseUrl,
|
||||
password: params.password,
|
||||
accountId: params.accountId,
|
||||
chatGuid: resolvedChatGuid,
|
||||
messageId: sendResult.messageId,
|
||||
body: params.message,
|
||||
confirmationTimeoutMs: params.delivery.timeoutMs,
|
||||
timeoutMs: params.timeoutMs,
|
||||
});
|
||||
if (confirmation.confirmed) {
|
||||
return { messageId: confirmation.messageId || sendResult.messageId };
|
||||
}
|
||||
|
||||
const attemptsLeft = maxAttempts - attemptIndex - 1;
|
||||
const messageIdHint = sendResult.messageId?.trim();
|
||||
const attemptLabel = `attempt ${attemptIndex + 1}/${maxAttempts}`;
|
||||
if (attemptsLeft <= 0) {
|
||||
throw new Error(
|
||||
`BlueBubbles send could not be confirmed after ${maxAttempts} attempts. The API accepted the request${messageIdHint && messageIdHint !== "ok" && messageIdHint !== "unknown" ? ` (messageId=${messageIdHint})` : ""}, but no webhook or history confirmation arrived within ${params.delivery.timeoutMs}ms.`,
|
||||
);
|
||||
}
|
||||
|
||||
warnBlueBubbles(
|
||||
`send delivery not confirmed for ${attemptLabel}; retrying in ${params.delivery.retryBaseDelayMs * 2 ** attemptIndex}ms.`,
|
||||
);
|
||||
await sleep(params.delivery.retryBaseDelayMs * 2 ** attemptIndex);
|
||||
}
|
||||
|
||||
return { messageId: lastMessageId };
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new chat (DM) and sends an initial message.
|
||||
* Requires Private API to be enabled in BlueBubbles.
|
||||
*/
|
||||
async function createNewChatWithMessage(params: {
|
||||
accountId: string;
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
address: string;
|
||||
message: string;
|
||||
timeoutMs?: number;
|
||||
delivery: BlueBubblesDeliveryConfig;
|
||||
}): Promise<BlueBubblesSendResult> {
|
||||
const result = await createChatForHandle({
|
||||
return await sendMessageWithDeliveryRetries({
|
||||
accountId: params.accountId,
|
||||
baseUrl: params.baseUrl,
|
||||
password: params.password,
|
||||
address: params.address,
|
||||
message: params.message,
|
||||
timeoutMs: params.timeoutMs,
|
||||
delivery: params.delivery,
|
||||
resolveConfirmationChatGuid: async () =>
|
||||
await resolveChatGuidForTarget({
|
||||
baseUrl: params.baseUrl,
|
||||
password: params.password,
|
||||
timeoutMs: params.timeoutMs,
|
||||
target: {
|
||||
kind: "handle",
|
||||
address: params.address,
|
||||
},
|
||||
}),
|
||||
send: async () => {
|
||||
const result = await createChatForHandle({
|
||||
baseUrl: params.baseUrl,
|
||||
password: params.password,
|
||||
address: params.address,
|
||||
message: params.message,
|
||||
timeoutMs: params.timeoutMs,
|
||||
});
|
||||
return { messageId: result.messageId, chatGuid: result.chatGuid };
|
||||
},
|
||||
});
|
||||
return { messageId: result.messageId };
|
||||
}
|
||||
|
||||
export async function sendMessageBlueBubbles(
|
||||
@@ -450,6 +785,7 @@ export async function sendMessageBlueBubbles(
|
||||
throw new Error("BlueBubbles password is required");
|
||||
}
|
||||
const privateApiStatus = getCachedBlueBubblesPrivateApiStatus(account.accountId);
|
||||
const delivery = resolveDeliveryConfig(opts, account);
|
||||
|
||||
const target = resolveBlueBubblesSendTarget(to);
|
||||
const chatGuid = await resolveChatGuidForTarget({
|
||||
@@ -463,11 +799,13 @@ export async function sendMessageBlueBubbles(
|
||||
// auto-create a new DM chat using the /api/v1/chat/new endpoint
|
||||
if (target.kind === "handle") {
|
||||
return createNewChatWithMessage({
|
||||
accountId: account.accountId,
|
||||
baseUrl,
|
||||
password,
|
||||
address: target.address,
|
||||
message: strippedText,
|
||||
timeoutMs: opts.timeoutMs,
|
||||
delivery,
|
||||
});
|
||||
}
|
||||
throw new Error(
|
||||
@@ -490,43 +828,40 @@ export async function sendMessageBlueBubbles(
|
||||
if (privateApiDecision.warningMessage) {
|
||||
warnBlueBubbles(privateApiDecision.warningMessage);
|
||||
}
|
||||
const payload: Record<string, unknown> = {
|
||||
chatGuid,
|
||||
tempGuid: crypto.randomUUID(),
|
||||
message: strippedText,
|
||||
};
|
||||
if (privateApiDecision.canUsePrivateApi) {
|
||||
payload.method = "private-api";
|
||||
}
|
||||
|
||||
// Add reply threading support
|
||||
if (wantsReplyThread && privateApiDecision.canUsePrivateApi) {
|
||||
payload.selectedMessageGuid = opts.replyToMessageGuid;
|
||||
payload.partIndex = typeof opts.replyToPartIndex === "number" ? opts.replyToPartIndex : 0;
|
||||
}
|
||||
|
||||
// Add message effects support
|
||||
if (effectId && privateApiDecision.canUsePrivateApi) {
|
||||
payload.effectId = effectId;
|
||||
}
|
||||
|
||||
const url = buildBlueBubblesApiUrl({
|
||||
return await sendMessageWithDeliveryRetries({
|
||||
accountId: account.accountId,
|
||||
baseUrl,
|
||||
path: "/api/v1/message/text",
|
||||
password,
|
||||
});
|
||||
const res = await blueBubblesFetchWithTimeout(
|
||||
url,
|
||||
{
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(payload),
|
||||
message: strippedText,
|
||||
confirmationChatGuid: chatGuid,
|
||||
timeoutMs: opts.timeoutMs,
|
||||
delivery,
|
||||
send: async () => {
|
||||
const payload: Record<string, unknown> = {
|
||||
chatGuid,
|
||||
tempGuid: crypto.randomUUID(),
|
||||
message: strippedText,
|
||||
};
|
||||
if (privateApiDecision.canUsePrivateApi) {
|
||||
payload.method = "private-api";
|
||||
}
|
||||
|
||||
if (wantsReplyThread && privateApiDecision.canUsePrivateApi) {
|
||||
payload.selectedMessageGuid = opts.replyToMessageGuid;
|
||||
payload.partIndex = typeof opts.replyToPartIndex === "number" ? opts.replyToPartIndex : 0;
|
||||
}
|
||||
|
||||
if (effectId && privateApiDecision.canUsePrivateApi) {
|
||||
payload.effectId = effectId;
|
||||
}
|
||||
|
||||
return await sendTextMessageOnce({
|
||||
baseUrl,
|
||||
password,
|
||||
payload,
|
||||
timeoutMs: opts.timeoutMs,
|
||||
});
|
||||
},
|
||||
opts.timeoutMs,
|
||||
);
|
||||
if (!res.ok) {
|
||||
const errorText = await res.text();
|
||||
throw new Error(`BlueBubbles send failed (${res.status}): ${errorText || "unknown"}`);
|
||||
}
|
||||
return parseBlueBubblesMessageResponse(res);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -53,6 +53,12 @@ export type BlueBubblesAccountConfig = {
|
||||
mediaLocalRoots?: string[];
|
||||
/** Send read receipts for incoming messages (default: true). */
|
||||
sendReadReceipts?: boolean;
|
||||
/** Wait this long for webhook or history confirmation before treating an outbound send as unconfirmed. */
|
||||
sendConfirmationTimeoutMs?: number;
|
||||
/** Retry count when BlueBubbles accepts a send but no confirmation arrives. */
|
||||
sendRetryCount?: number;
|
||||
/** Base delay between send retries; exponential backoff is applied from this value. */
|
||||
sendRetryBaseDelayMs?: number;
|
||||
/** Allow fetching from private/internal IP addresses (e.g. localhost). Required for same-host BlueBubbles setups. */
|
||||
allowPrivateNetwork?: boolean;
|
||||
/** Per-group configuration keyed by chat GUID or identifier. */
|
||||
|
||||
Reference in New Issue
Block a user