Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiplePeers tests #265

Merged
merged 28 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Blockchain/Sources/Blockchain/Blockchain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {
try await dataProvider.blockImported(block: block, state: state)

publish(RuntimeEvents.BlockImported(block: block, state: state, parentState: parent))

logger.info("Block imported: #\(block.header.timeslot) \(block.hash)")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public enum BlockchainDataProviderError: Error, Equatable {
case uncanonical(hash: Data32)
}

public actor BlockchainDataProvider: Sendable {
public actor BlockchainDataProvider {
public private(set) var bestHead: HeadInfo
public private(set) var finalizedHead: HeadInfo
private let dataProvider: BlockchainDataProviderProtocol
Expand Down Expand Up @@ -53,7 +53,7 @@ public actor BlockchainDataProvider: Sendable {
bestHead = HeadInfo(hash: block.hash, timeslot: block.header.timeslot, number: number)
}

logger.debug("block imported: \(block.hash)")
logger.debug("Block imported: #\(bestHead.timeslot) \(block.hash)")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Utils

public actor InMemoryDataProvider: Sendable {
public actor InMemoryDataProvider {
public private(set) var heads: Set<Data32>
public private(set) var finalizedHead: Data32

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ extension Accumulation {

let rightQueueItems = accumulationQueue.array[index...]
let leftQueueItems = accumulationQueue.array[0 ..< index]
var allQueueItems = rightQueueItems.flatMap { $0 } + leftQueueItems.flatMap { $0 } + newQueueItems
var allQueueItems = rightQueueItems.flatMap(\.self) + leftQueueItems.flatMap(\.self) + newQueueItems

editAccumulatedItems(items: &allQueueItems, accumulatedPackages: Set(zeroPrereqReports.map(\.packageSpecification.workPackageHash)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ extension Guaranteeing {
}

let recentWorkPackageHashes: Set<Data32> = Set(recentHistory.items.flatMap(\.lookup.keys))
let accumulateHistoryReports = Set(accumulationHistory.array.flatMap { $0 })
let accumulateQueueReports = Set(accumulationQueue.array.flatMap { $0 }
let accumulateHistoryReports = Set(accumulationHistory.array.flatMap(\.self))
let accumulateQueueReports = Set(accumulationQueue.array.flatMap(\.self)
.flatMap(\.workReport.refinementContext.prerequisiteWorkPackages))
let pendingWorkReportHashes = Set(reports.array.flatMap { $0?.workReport.refinementContext.prerequisiteWorkPackages ?? [] })
let pipelinedWorkReportHashes = recentWorkPackageHashes.union(accumulateHistoryReports).union(accumulateQueueReports)
Expand Down
2 changes: 1 addition & 1 deletion Blockchain/Sources/Blockchain/State/StateTrie.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public enum StateTrieError: Error {
case invalidParent
}

public actor StateTrie: Sendable {
public actor StateTrie {
private let backend: StateBackendProtocol
public private(set) var rootHash: Data32
private var nodes: [Data: TrieNode] = [:]
Expand Down
12 changes: 12 additions & 0 deletions Blockchain/Sources/Blockchain/Types/EpochMarker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,15 @@ public struct EpochMarker: Sendable, Equatable, Codable {
self.validators = validators
}
}

extension EpochMarker: Dummy {
public typealias Config = ProtocolConfigRef

public static func dummy(config: Config) -> EpochMarker {
EpochMarker(
entropy: Data32(),
ticketsEntropy: Data32(),
validators: try! ConfigFixedSizeArray(config: config, defaultValue: Data32())
)
}
}
4 changes: 2 additions & 2 deletions Blockchain/Sources/Blockchain/Types/Header.swift
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ extension HeaderRef: Codable {

extension Header.Unsigned: Dummy {
public typealias Config = ProtocolConfigRef
public static func dummy(config _: Config) -> Header.Unsigned {
public static func dummy(config: Config) -> Header.Unsigned {
Header.Unsigned(
parentHash: Data32(),
priorStateRoot: Data32(),
extrinsicsHash: Data32(),
timeslot: 0,
epoch: nil,
epoch: EpochMarker.dummy(config: config),
winningTickets: nil,
offendersMarkers: [],
authorIndex: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@
self.context = context
}

public func _callImpl(config: ProtocolConfigRef, state: VMState) async throws {
public func _callImpl(config _: ProtocolConfigRef, state: VMState) async throws {

Check warning on line 912 in Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift

View check run for this annotation

Codecov / codecov/patch

Blockchain/Sources/Blockchain/VMInvocations/HostCall/HostCalls.swift#L912

Added line #L912 was not covered by tests
let pvmIndex: UInt64 = state.readRegister(Registers.Index(raw: 7))
let startAddr: UInt32 = state.readRegister(Registers.Index(raw: 8))

Expand Down
2 changes: 1 addition & 1 deletion Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
await withSpan("BlockAuthor.newBlock", logger: logger) { _ in
// TODO: add timeout
let block = try await createNewBlock(timeslot: timeslot, claim: claim)
logger.info("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)")
logger.debug("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)")
publish(RuntimeEvents.BlockAuthored(block: block))
}
}
Expand Down
2 changes: 1 addition & 1 deletion Codec/Sources/Codec/JamDecoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private struct JamKeyedDecodingContainer<K: CodingKey>: KeyedDecodingContainerPr
throw DecodingError.dataCorrupted(
DecodingError.Context(
codingPath: decoder.codingPath,
debugDescription: "Invalid boolean value: \(byte)"
debugDescription: "Decode key \(key.stringValue) with invalid boolean value: \(byte)"
)
)
}
Expand Down
2 changes: 1 addition & 1 deletion Database/Sources/Database/RocksDBBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
}

public func add(block: BlockRef) async throws {
logger.trace("add(block:) \(block.hash)")
logger.debug("add(block:) \(block.hash)")

// TODO: batch put

Expand Down
9 changes: 4 additions & 5 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,16 @@ public final class QuicStream: Sendable {
}

public func send(data: Data, start: Bool = false, finish: Bool = false) throws {
logger.trace("Sending \(data.count) bytes")

try storage.read { storage in
guard let storage, let api = storage.connection.api else {
throw QuicError.alreadyClosed
}

logger.debug("\(storage.connection.id) \(id) sending \(data.count) bytes data \(data.toHexString())")
let messageLength = data.count

if messageLength == 0 {
logger.trace("No data to send.")
logger.debug("No data to send.")
throw SendError.emptyData // Throw a specific error or return
}

Expand Down Expand Up @@ -173,7 +172,7 @@ private class StreamHandle {
fileprivate func callbackHandler(event: UnsafePointer<QUIC_STREAM_EVENT>) -> QuicStatus {
switch event.pointee.Type {
case QUIC_STREAM_EVENT_SEND_COMPLETE:
logger.trace("Stream send completed")
logger.debug("Stream send completed")
if let clientContext = event.pointee.SEND_COMPLETE.ClientContext {
clientContext.deallocate() // !! deallocate
}
Expand All @@ -188,7 +187,7 @@ private class StreamHandle {
totalSize += Int(buffer.Length)
}

logger.trace("Stream received \(totalSize) bytes")
logger.debug("Stream received \(totalSize) bytes")

var receivedData = Data(capacity: totalSize)

Expand Down
20 changes: 10 additions & 10 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
let data = try request.encode()
let kind = request.kind
let stream = try createStream(kind: kind)
try stream.send(message: data)
try await stream.send(message: data)

return try await receiveData(stream: stream)
}
Expand Down Expand Up @@ -234,7 +234,7 @@
impl.addStream(stream)
Task {
guard let byte = await stream.receiveByte() else {
logger.debug("stream closed without receiving kind. status: \(stream.status)")
logger.warning("stream closed without receiving kind. status: \(stream.status)")

Check warning on line 237 in Networking/Sources/Networking/Connection.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Connection.swift#L237

Added line #L237 was not covered by tests
return
}
if let upKind = Handler.PresistentHandler.StreamKind(rawValue: byte) {
Expand All @@ -246,14 +246,14 @@
if existingStream.stream.id < stream.stream.id {
// The new stream has a higher ID, so reset the existing one
existingStream.close(abort: false)
logger.debug(
logger.info(
"Reset older UP stream with lower ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
} else {
// The existing stream has a higher ID or is equal, so reset the new one
stream.close(abort: false)
logger.debug(
logger.info(
"Duplicate UP stream detected, closing new stream with lower or equal ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
Expand All @@ -278,9 +278,9 @@
let data = try await receiveData(stream: stream)
let request = try decoder.decode(data: data)
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
try stream.send(message: resp, finish: true)
try await stream.send(message: resp, finish: true)
} catch {
logger.debug("Failed to handle request", metadata: ["error": "\(error)"])
logger.error("Failed to handle request", metadata: ["error": "\(error)"])
stream.close(abort: true)
}
}
Expand Down Expand Up @@ -318,7 +318,7 @@
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid request length: \(length)")
logger.error("Invalid request length: \(length)")
// TODO: report bad peer
throw ConnectionError.invalidLength
}
Expand All @@ -336,9 +336,9 @@
do {
try await handler.streamOpened(connection: connection, stream: stream, kind: kind)
} catch {
logger.debug(
logger.error(

Check warning on line 339 in Networking/Sources/Networking/Connection.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Connection.swift#L339

Added line #L339 was not covered by tests
"Failed to setup presistent stream",
metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"]
metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "error": "\(error)"]

Check warning on line 341 in Networking/Sources/Networking/Connection.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Connection.swift#L341

Added line #L341 was not covered by tests
)
}
logger.debug(
Expand All @@ -352,7 +352,7 @@
try await handler.handle(connection: connection, message: msg)
}
} catch {
logger.debug("UP stream run loop failed: \(error)")
logger.error("UP stream run loop failed: \(error) \(connection.id) \(stream.id)")

Check warning on line 355 in Networking/Sources/Networking/Connection.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Connection.swift#L355

Added line #L355 was not covered by tests
stream.close(abort: true)
}

Expand Down
35 changes: 20 additions & 15 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,24 @@ public final class Peer<Handler: StreamHandler>: Sendable {
}
for connection in connections {
if let stream = try? connection.createPreistentStream(kind: kind) {
let res = Result(catching: { try stream.send(message: messageData) })
switch res {
case .success:
break
case let .failure(error):
impl.logger.warning(
"Failed to send message",
metadata: [
"connectionId": "\(connection.id)",
"kind": "\(kind)",
"error": "\(error)",
]
)
Task {
let res = await Result {
try await stream.send(message: messageData)
}
switch res {
case .success:
break
case let .failure(error):
impl.logger.warning(
"Failed to send message",
metadata: [
"connectionId": "\(connection.id)",
"kind": "\(kind)",
"message": "\(messageData)",
"error": "\(error)",
]
)
}
}
}
}
Expand Down Expand Up @@ -298,7 +303,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
var state = reconnectStates.read { reconnectStates in
reconnectStates[address] ?? .init()
}

logger.debug("reconnecting to \(address) \(state.attempt) attempts")
guard state.attempt < maxRetryAttempts else {
logger.warning("reconnecting to \(address) exceeded max attempts")
return
Expand Down Expand Up @@ -338,6 +343,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
states[connection.id] ?? .init()
}

logger.debug("Reopen attempt for stream \(kind) on connection \(connection.id) \(state.attempt) attempts")
guard state.attempt < maxRetryAttempts else {
logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts")
return
Expand Down Expand Up @@ -391,7 +397,6 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
)
return .code(.alpnNegFailure)
}
logger.debug("new connection: \(addr) role: \(role)")
if impl.addConnection(connection, addr: addr, role: role) {
return .code(.success)
} else {
Expand Down
54 changes: 36 additions & 18 deletions Networking/Sources/Networking/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,39 @@

var id: UniqueId { get }
var status: StreamStatus { get }
func send(message: Message) throws
func send(message: Message) async throws
func close(abort: Bool)
}

actor StreamSender {
private let stream: QuicStream
private var status: StreamStatus

init(stream: QuicStream, status: StreamStatus) {
self.stream = stream
self.status = status
}

func send(message: Data, finish: Bool = false) throws {
guard status == .open || status == .sendOnly else {
throw StreamError.notOpen

Check warning on line 45 in Networking/Sources/Networking/Stream.swift

View check run for this annotation

Codecov / codecov/patch

Networking/Sources/Networking/Stream.swift#L45

Added line #L45 was not covered by tests
}

let length = UInt32(message.count)
var lengthData = Data(repeating: 0, count: 4)
lengthData.withUnsafeMutableBytes { ptr in
ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self)
}

try stream.send(data: lengthData, finish: false)
try stream.send(data: message, finish: finish)

if finish {
status = .receiveOnly
}
}
}

final class Stream<Handler: StreamHandler>: Sendable, StreamProtocol {
typealias Message = Handler.PresistentHandler.Message

Expand All @@ -41,6 +70,7 @@
private let channel: Channel<Data> = .init(capacity: 100)
private let nextData: Mutex<Data?> = .init(nil)
private let _status: ThreadSafeContainer<StreamStatus> = .init(.open)
private let sender: StreamSender
let connectionId: UniqueId
let kind: Handler.PresistentHandler.StreamKind?

Expand All @@ -63,10 +93,11 @@
self.connectionId = connectionId
self.impl = impl
self.kind = kind
sender = StreamSender(stream: stream, status: .open)
}

public func send(message: Handler.PresistentHandler.Message) throws {
try send(message: message.encode(), finish: false)
public func send(message: Handler.PresistentHandler.Message) async throws {
try await send(message: message.encode(), finish: false)
}

/// send raw data
Expand All @@ -91,21 +122,8 @@
}

// send message with length prefix
func send(message: Data, finish: Bool = false) throws {
guard canSend else {
throw StreamError.notOpen
}

let length = UInt32(message.count)
var lengthData = Data(repeating: 0, count: 4)
lengthData.withUnsafeMutableBytes { ptr in
ptr.storeBytes(of: UInt32(littleEndian: length), as: UInt32.self)
}
try stream.send(data: lengthData, finish: false)
try stream.send(data: message, finish: finish)
if finish {
status = .receiveOnly
}
func send(message: Data, finish: Bool = false) async throws {
try await sender.send(message: message, finish: finish)
}

func received(data: Data?) {
Expand Down
Loading