Skip to content

Commit

Permalink
multiplePeers tests (#265)
Browse files Browse the repository at this point in the history
* multiplePeers tests

* update swiftlint

* update more tests

* update

* update more test

* update swiftlint

* update

* update

* update

* update swiftlint

* update  tests

* update node test

* update logger

* update test

* update node

* update tests

* update nodetest

* update test

* update tests

* update node test

* update swiftlint

* update swiftlint

* add some logger

* update some logger

* update test

* update swiftlint

* update node test
  • Loading branch information
MacOMNI authored Jan 16, 2025
1 parent e9f07cf commit 0e78055
Show file tree
Hide file tree
Showing 26 changed files with 283 additions and 111 deletions.
2 changes: 0 additions & 2 deletions Blockchain/Sources/Blockchain/Blockchain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {
try await dataProvider.blockImported(block: block, state: state)

publish(RuntimeEvents.BlockImported(block: block, state: state, parentState: parent))

logger.info("Block imported: #\(block.header.timeslot) \(block.hash)")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public enum BlockchainDataProviderError: Error, Equatable {
case uncanonical(hash: Data32)
}

public actor BlockchainDataProvider: Sendable {
public actor BlockchainDataProvider {
public private(set) var bestHead: HeadInfo
public private(set) var finalizedHead: HeadInfo
private let dataProvider: BlockchainDataProviderProtocol
Expand Down Expand Up @@ -53,7 +53,7 @@ public actor BlockchainDataProvider: Sendable {
bestHead = HeadInfo(hash: block.hash, timeslot: block.header.timeslot, number: number)
}

logger.debug("block imported: \(block.hash)")
logger.debug("Block imported: #\(bestHead.timeslot) \(block.hash)")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Utils

public actor InMemoryDataProvider: Sendable {
public actor InMemoryDataProvider {
public private(set) var heads: Set<Data32>
public private(set) var finalizedHead: Data32

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ extension Accumulation {

let rightQueueItems = accumulationQueue.array[index...]
let leftQueueItems = accumulationQueue.array[0 ..< index]
var allQueueItems = rightQueueItems.flatMap { $0 } + leftQueueItems.flatMap { $0 } + newQueueItems
var allQueueItems = rightQueueItems.flatMap(\.self) + leftQueueItems.flatMap(\.self) + newQueueItems

editAccumulatedItems(items: &allQueueItems, accumulatedPackages: Set(zeroPrereqReports.map(\.packageSpecification.workPackageHash)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ extension Guaranteeing {
}

let recentWorkPackageHashes: Set<Data32> = Set(recentHistory.items.flatMap(\.lookup.keys))
let accumulateHistoryReports = Set(accumulationHistory.array.flatMap { $0 })
let accumulateQueueReports = Set(accumulationQueue.array.flatMap { $0 }
let accumulateHistoryReports = Set(accumulationHistory.array.flatMap(\.self))
let accumulateQueueReports = Set(accumulationQueue.array.flatMap(\.self)
.flatMap(\.workReport.refinementContext.prerequisiteWorkPackages))
let pendingWorkReportHashes = Set(reports.array.flatMap { $0?.workReport.refinementContext.prerequisiteWorkPackages ?? [] })
let pipelinedWorkReportHashes = recentWorkPackageHashes.union(accumulateHistoryReports).union(accumulateQueueReports)
Expand Down
2 changes: 1 addition & 1 deletion Blockchain/Sources/Blockchain/State/StateTrie.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public enum StateTrieError: Error {
case invalidParent
}

public actor StateTrie: Sendable {
public actor StateTrie {
private let backend: StateBackendProtocol
public private(set) var rootHash: Data32
private var nodes: [Data: TrieNode] = [:]
Expand Down
12 changes: 12 additions & 0 deletions Blockchain/Sources/Blockchain/Types/EpochMarker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,15 @@ public struct EpochMarker: Sendable, Equatable, Codable {
self.validators = validators
}
}

extension EpochMarker: Dummy {
public typealias Config = ProtocolConfigRef

public static func dummy(config: Config) -> EpochMarker {
EpochMarker(
entropy: Data32(),
ticketsEntropy: Data32(),
validators: try! ConfigFixedSizeArray(config: config, defaultValue: Data32())
)
}
}
4 changes: 2 additions & 2 deletions Blockchain/Sources/Blockchain/Types/Header.swift
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ extension HeaderRef: Codable {

extension Header.Unsigned: Dummy {
public typealias Config = ProtocolConfigRef
public static func dummy(config _: Config) -> Header.Unsigned {
public static func dummy(config: Config) -> Header.Unsigned {
Header.Unsigned(
parentHash: Data32(),
priorStateRoot: Data32(),
extrinsicsHash: Data32(),
timeslot: 0,
epoch: nil,
epoch: EpochMarker.dummy(config: config),
winningTickets: nil,
offendersMarkers: [],
authorIndex: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ public class Invoke: HostCall {
self.context = context
}

public func _callImpl(config: ProtocolConfigRef, state: VMState) async throws {
public func _callImpl(config _: ProtocolConfigRef, state: VMState) async throws {
let pvmIndex: UInt64 = state.readRegister(Registers.Index(raw: 7))
let startAddr: UInt32 = state.readRegister(Registers.Index(raw: 8))

Expand Down
2 changes: 1 addition & 1 deletion Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
await withSpan("BlockAuthor.newBlock", logger: logger) { _ in
// TODO: add timeout
let block = try await createNewBlock(timeslot: timeslot, claim: claim)
logger.info("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)")
logger.debug("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)")
publish(RuntimeEvents.BlockAuthored(block: block))
}
}
Expand Down
2 changes: 1 addition & 1 deletion Codec/Sources/Codec/JamDecoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private struct JamKeyedDecodingContainer<K: CodingKey>: KeyedDecodingContainerPr
throw DecodingError.dataCorrupted(
DecodingError.Context(
codingPath: decoder.codingPath,
debugDescription: "Invalid boolean value: \(byte)"
debugDescription: "Decode key \(key.stringValue) with invalid boolean value: \(byte)"
)
)
}
Expand Down
2 changes: 1 addition & 1 deletion Database/Sources/Database/RocksDBBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
}

public func add(block: BlockRef) async throws {
logger.trace("add(block:) \(block.hash)")
logger.debug("add(block:) \(block.hash)")

// TODO: batch put

Expand Down
9 changes: 4 additions & 5 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,16 @@ public final class QuicStream: Sendable {
}

public func send(data: Data, start: Bool = false, finish: Bool = false) throws {
logger.trace("Sending \(data.count) bytes")

try storage.read { storage in
guard let storage, let api = storage.connection.api else {
throw QuicError.alreadyClosed
}

logger.debug("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())")
let messageLength = data.count

if messageLength == 0 {
logger.trace("No data to send.")
logger.debug("No data to send.")
throw SendError.emptyData // Throw a specific error or return
}

Expand Down Expand Up @@ -173,7 +172,7 @@ private class StreamHandle {
fileprivate func callbackHandler(event: UnsafePointer<QUIC_STREAM_EVENT>) -> QuicStatus {
switch event.pointee.Type {
case QUIC_STREAM_EVENT_SEND_COMPLETE:
logger.trace("Stream send completed")
logger.debug("Stream send completed")
if let clientContext = event.pointee.SEND_COMPLETE.ClientContext {
clientContext.deallocate() // !! deallocate
}
Expand All @@ -188,7 +187,7 @@ private class StreamHandle {
totalSize += Int(buffer.Length)
}

logger.trace("Stream received \(totalSize) bytes")
logger.debug("Stream received \(totalSize) bytes")

var receivedData = Data(capacity: totalSize)

Expand Down
20 changes: 10 additions & 10 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let data = try request.encode()
let kind = request.kind
let stream = try createStream(kind: kind)
try stream.send(message: data)
try await stream.send(message: data)

return try await receiveData(stream: stream)
}
Expand Down Expand Up @@ -234,7 +234,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
impl.addStream(stream)
Task {
guard let byte = await stream.receiveByte() else {
logger.debug("stream closed without receiving kind. status: \(stream.status)")
logger.warning("stream closed without receiving kind. status: \(stream.status)")
return
}
if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) {
Expand All @@ -246,14 +246,14 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
if existingStream.stream.id < stream.stream.id {
// The new stream has a higher ID, so reset the existing one
existingStream.close(abort: false)
logger.debug(
logger.info(
"Reset older UP stream with lower ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
} else {
// The existing stream has a higher ID or is equal, so reset the new one
stream.close(abort: false)
logger.debug(
logger.info(
"Duplicate UP stream detected, closing new stream with lower or equal ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
Expand All @@ -278,9 +278,9 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let data = try await receiveData(stream: stream)
let request = try decoder.decode(data: data)
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
try stream.send(message: resp, finish: true)
try await stream.send(message: resp, finish: true)
} catch {
logger.debug("Failed to handle request", metadata: ["error": "\(error)"])
logger.error("Failed to handle request", metadata: ["error": "\(error)"])
stream.close(abort: true)
}
}
Expand Down Expand Up @@ -318,7 +318,7 @@ private func receiveMaybeData(stream: Stream<some StreamHandler>) async throws -
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid request length: \(length)")
logger.error("Invalid request length: \(length)")
// TODO: report bad peer
throw ConnectionError.invalidLength
}
Expand All @@ -336,9 +336,9 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
do {
try await handler.streamOpened(connection: connection, stream: stream, kind: kind)
} catch {
logger.debug(
logger.error(
"Failed to setup presistent stream",
metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"]
metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "error": "\(error)"]
)
}
logger.debug(
Expand All @@ -352,7 +352,7 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
try await handler.handle(connection: connection, message: msg)
}
} catch {
logger.debug("UP stream run loop failed: \(error)")
logger.error("UP stream run loop failed: \(error) \(connection.id) \(stream.id)")
stream.close(abort: true)
}

Expand Down
35 changes: 20 additions & 15 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,24 @@ public final class Peer<Handler: StreamHandler>: Sendable {
}
for connection in connections {
if let stream = try? connection.createPreistentStream(kind: kind) {
let res = Result(catching: { try stream.send(message: messageData) })
switch res {
case .success:
break
case let .failure(error):
impl.logger.warning(
"Failed to send message",
metadata: [
"connectionId": "\(connection.id)",
"kind": "\(kind)",
"error": "\(error)",
]
)
Task {
let res = await Result {
try await stream.send(message: messageData)
}
switch res {
case .success:
break
case let .failure(error):
impl.logger.warning(
"Failed to send message",
metadata: [
"connectionId": "\(connection.id)",
"kind": "\(kind)",
"message": "\(messageData)",
"error": "\(error)",
]
)
}
}
}
}
Expand Down Expand Up @@ -298,7 +303,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
var state = reconnectStates.read { reconnectStates in
reconnectStates[address] ?? .init()
}

logger.debug("reconnecting to \(address) \(state.attempt) attempts")
guard state.attempt < maxRetryAttempts else {
logger.warning("reconnecting to \(address) exceeded max attempts")
return
Expand Down Expand Up @@ -338,6 +343,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
states[connection.id] ?? .init()
}

logger.debug("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts")
guard state.attempt < maxRetryAttempts else {
logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts")
return
Expand Down Expand Up @@ -391,7 +397,6 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
)
return .code(.alpnNegFailure)
}
logger.debug("new connection: \(addr) role: \(role)")
if impl.addConnection(connection, addr: addr, role: role) {
return .code(.success)
} else {
Expand Down
54 changes: 36 additions & 18 deletions Networking/Sources/Networking/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,39 @@ public protocol StreamProtocol<Message> {

var id: UniqueId { get }
var status: StreamStatus { get }
func send(message: Message) throws
func send(message: Message) async throws
func close(abort: Bool)
}

actor StreamSender {
private let stream: QuicStream
private var status: StreamStatus

init(stream: QuicStream, status: StreamStatus) {
self.stream = stream
self.status = status
}

func send(message: Data, finish: Bool = false) throws {
guard status == .open || status == .sendOnly else {
throw StreamError.notOpen
}

let length = UInt32(message.count)
var lengthData = Data(repeating: 0, count: 4)
lengthData.withUnsafeMutableBytes { ptr in
ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self)
}

try stream.send(data: lengthData, finish: false)
try stream.send(data: message, finish: finish)

if finish {
status = .receiveOnly
}
}
}

final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
typealias Message = Handler.PresistentHandler.Message

Expand All @@ -41,6 +70,7 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
private let channel: Channel<Data> = .init(capacity: 100)
private let nextData: Mutex<Data?> = .init(nil)
private let _status: ThreadSafeContainer<StreamStatus> = .init(.open)
private let sender: StreamSender
let connectionId: UniqueId
let kind: Handler.PresistentHandler.StreamKind?

Expand All @@ -63,10 +93,11 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
self.connectionId = connectionId
self.impl = impl
self.kind = kind
sender = StreamSender(stream: stream, status: .open)
}

public func send(message: Handler.PresistentHandler.Message) throws {
try send(message: message.encode(), finish: false)
public func send(message: Handler.PresistentHandler.Message) async throws {
try await send(message: message.encode(), finish: false)
}

/// send raw data
Expand All @@ -91,21 +122,8 @@ final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
}

// send message with length prefix
func send(message: Data, finish: Bool = false) throws {
guard canSend else {
throw StreamError.notOpen
}

let length = UInt32(message.count)
var lengthData = Data(repeating: 0, count: 4)
lengthData.withUnsafeMutableBytes { ptr in
ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self)
}
try stream.send(data: lengthData, finish: false)
try stream.send(data: message, finish: finish)
if finish {
status = .receiveOnly
}
func send(message: Data, finish: Bool = false) async throws {
try await sender.send(message: message, finish: finish)
}

func received(data: Data?) {
Expand Down
Loading

0 comments on commit 0e78055

Please sign in to comment.