Skip to content

Commit

Permalink
messaging: fix the hangs caused by the 66e error
Browse files Browse the repository at this point in the history
  • Loading branch information
purpshell committed Jan 30, 2025
1 parent 40ebf66 commit 3fca643
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 15 deletions.
110 changes: 95 additions & 15 deletions src/Socket/messages-recv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import {
getHistoryMsg,
getNextPreKeys,
getStatusFromReceiptType, hkdf,
MISSING_KEYS_ERROR_TEXT,
NACK_REASONS,
NO_MESSAGE_FOUND_ERROR_TEXT,
unixTimestampSeconds,
xmppPreKey,
Expand Down Expand Up @@ -89,16 +91,20 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {

let sendActiveReceipts = false

const sendMessageAck = async({ tag, attrs, content }: BinaryNode) => {
const sendMessageAck = async({ tag, attrs, content }: BinaryNode, errorCode?: number) => {
const stanza: BinaryNode = {
tag: 'ack',
attrs: {
id: attrs.id,
to: attrs.from,
class: tag,
class: tag
}
}

if(!!errorCode) {
stanza.attrs.error = errorCode.toString()
}

if(!!attrs.participant) {
stanza.attrs.participant = attrs.participant
}
Expand All @@ -107,7 +113,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
stanza.attrs.recipient = attrs.recipient
}

if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable'))) {
if(!!attrs.type && (tag !== 'message' || getBinaryNodeChild({ tag, attrs, content }, 'unavailable') || errorCode !== 0)) {
stanza.attrs.type = attrs.type
}

Expand Down Expand Up @@ -595,7 +601,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
}

const handleReceipt = async(node: BinaryNode) => {
const handleReceipt = async(node: BinaryNode, offline: boolean) => {
const { attrs, content } = node
const isLid = attrs.from.includes('lid')
const isNodeFromMe = areJidsSameUser(attrs.participant || attrs.from, isLid ? authState.creds.me?.lid : authState.creds.me?.id)
Expand Down Expand Up @@ -687,7 +693,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
}

const handleNotification = async(node: BinaryNode) => {
const handleNotification = async(node: BinaryNode, offline: boolean) => {
const remoteJid = node.attrs.from
if(shouldIgnoreJid(remoteJid) && remoteJid !== '@s.whatsapp.net') {
logger.debug({ remoteJid, id: node.attrs.id }, 'ignored notification')
Expand Down Expand Up @@ -723,7 +729,12 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
}
}

const handleMessage = async(node: BinaryNode) => {
const handleMessage = async(node: BinaryNode, offline: boolean) => {
if(offline) {
await sendMessageAck(node)
return
}

if(shouldIgnoreJid(node.attrs.from) && node.attrs.from !== '@s.whatsapp.net') {
logger.debug({ key: node.attrs.key }, 'ignored message')
await sendMessageAck(node)
Expand Down Expand Up @@ -771,6 +782,10 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
await decrypt()
// message failed to decrypt
if(msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT) {
if(msg?.messageStubParameters?.[0] === MISSING_KEYS_ERROR_TEXT) {
return sendMessageAck(node, NACK_REASONS.ParsingError)
}

retryMutex.mutex(
async() => {
if(ws.isOpen) {
Expand Down Expand Up @@ -816,12 +831,14 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {

cleanMessage(msg, authState.creds.me!.id)

await sendMessageAck(node)

await upsertMessage(msg, node.attrs.offline ? 'append' : 'notify')
}
)
])
} finally {
await sendMessageAck(node)
} catch(error) {
logger.error({ error, node }, 'error in handling message')
}
}

Expand Down Expand Up @@ -965,35 +982,98 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => {
const processNodeWithBuffer = async<T>(
node: BinaryNode,
identifier: string,
exec: (node: BinaryNode) => Promise<T>
exec: (node: BinaryNode, offline: boolean) => Promise<T>
) => {
ev.buffer()
await execTask()
ev.flush()

function execTask() {
return exec(node)
return exec(node, false)
.catch(err => onUnexpectedError(err, identifier))
}
}

type MessageType = 'message' | 'call' | 'receipt' | 'notification'

type OfflineNode = {
type: MessageType
node: BinaryNode
}

const makeOfflineNodeProcessor = () => {
const nodeProcessorMap: Map<MessageType, (node: BinaryNode, offline: boolean) => Promise<void>> = new Map([
['message', handleMessage],
['call', handleCall],
['receipt', handleReceipt],
['notification', handleNotification]
])
const nodes: OfflineNode[] = []
let isProcessing = false

const enqueue = (type: MessageType, node: BinaryNode) => {
nodes.push({ type, node })

if(isProcessing) {
return
}

isProcessing = true

const promise = async() => {
while(nodes.length && ws.isOpen) {
const { type, node } = nodes.shift()!

const nodeProcessor = nodeProcessorMap.get(type)

if(!nodeProcessor) {
onUnexpectedError(
new Error(`unknown offline node type: ${type}`),
'processing offline node'
)
continue
}

await nodeProcessor(node, true)
}

isProcessing = false
}

promise().catch(error => onUnexpectedError(error, 'processing offline nodes'))
}

return { enqueue }
}

const offlineNodeProcessor = makeOfflineNodeProcessor()

const processNode = (type: MessageType, node: BinaryNode, identifier: string, exec: (node: BinaryNode, offline: boolean) => Promise<void>) => {
const isOffline = !!node.attrs.offline

if(isOffline) {
offlineNodeProcessor.enqueue(type, node)
} else {
processNodeWithBuffer(node, identifier, exec)
}
}

// recv a message
ws.on('CB:message', (node: BinaryNode) => {
processNodeWithBuffer(node, 'processing message', handleMessage)
processNode('message', node, 'processing message', handleMessage)
})

ws.on('CB:call', async(node: BinaryNode) => {
processNodeWithBuffer(node, 'handling call', handleCall)
processNode('call', node, 'handling call', handleCall)
})

ws.on('CB:receipt', node => {
processNodeWithBuffer(node, 'handling receipt', handleReceipt)
processNode('receipt', node, 'handling receipt', handleReceipt)
})

ws.on('CB:notification', async(node: BinaryNode) => {
processNodeWithBuffer(node, 'handling notification', handleNotification)
processNode('notification', node, 'handling notification', handleNotification)
})

ws.on('CB:ack,class:message', (node: BinaryNode) => {
handleBadAck(node)
.catch(error => onUnexpectedError(error, 'handling bad ack'))
Expand Down
9 changes: 9 additions & 0 deletions src/Socket/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,15 @@ export const makeSocket = (config: SocketConfig) => {
end(new Boom('Multi-device beta not joined', { statusCode: DisconnectReason.multideviceMismatch }))
})

ws.on('CB:ib,,offline_preview', (node: BinaryNode) => {
logger.info('offline preview received', node)
sendNode({
tag: 'ib',
attrs: {},
content: [{ tag: 'offline_batch', attrs: { count: '100' } }]
})
})

ws.on('CB:ib,,edge_routing', (node: BinaryNode) => {
const edgeRoutingNode = getBinaryNodeChild(node, 'edge_routing')
const routingInfo = getBinaryNodeChild(edgeRoutingNode, 'routing_info')
Expand Down
17 changes: 17 additions & 0 deletions src/Utils/decode-wa-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ import { areJidsSameUser, BinaryNode, isJidBroadcast, isJidGroup, isJidNewslette
import { unpadRandomMax16 } from './generics'

export const NO_MESSAGE_FOUND_ERROR_TEXT = 'Message absent from node'
export const MISSING_KEYS_ERROR_TEXT = 'Key used already or never filled'

export const NACK_REASONS = {
ParsingError: 487,
UnrecognizedStanza: 488,
UnrecognizedStanzaClass: 489,
UnrecognizedStanzaType: 490,
InvalidProtobuf: 491,
InvalidHostedCompanionStanza: 493,
MissingMessageSecret: 495,
SignalErrorOldCounter: 496,
MessageDeletedOnPeer: 499,
UnhandledError: 500,
UnsupportedAdminRevoke: 550,
UnsupportedLIDGroup: 551,
DBOperationFailed: 552
}

type MessageType = 'chat' | 'peer_broadcast' | 'other_broadcast' | 'group' | 'direct_peer_status' | 'other_status' | 'newsletter'

Expand Down

0 comments on commit 3fca643

Please sign in to comment.