Skip to content

Commit

Permalink
update p2p test (#210)
Browse files Browse the repository at this point in the history
* update peer

* update peer test

* update more test
  • Loading branch information
MacOMNI authored Oct 29, 2024
1 parent 83bfd09 commit 23a8d3c
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Networking/Sources/MsQuicSwift/QuicConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private class ConnectionHandle {
fileprivate func callbackHandler(event: UnsafePointer<QUIC_CONNECTION_EVENT>) -> QuicStatus {
switch event.pointee.Type {
case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED:
logger.trace("Peer certificate received")
logger.debug("Peer certificate received")
if let connection {
let evtData = event.pointee.PEER_CERTIFICATE_RECEIVED
let data: Data?
Expand Down
9 changes: 5 additions & 4 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,16 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}

func shutdownComplete(_ connection: QuicConnection) {
logger.trace("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
logger.debug("connection shutdown complete", metadata: ["connectionId": "\(connection.id)"])
impl.connections.write { connections in
if let conn = connections.byId[connection.id] {
conn.closed()
connections.byId.removeValue(forKey: connection.id)
connections.byAddr.removeValue(forKey: conn.remoteAddress)
// remove publickey first,func closed will change state to closed
if let publicKey = conn.publicKey {
connections.byPublicKey.removeValue(forKey: publicKey)
}
conn.closed()
connections.byId.removeValue(forKey: connection.id)
connections.byAddr.removeValue(forKey: conn.remoteAddress)
}
}
}
Expand Down
157 changes: 157 additions & 0 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,163 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func largeDataRequest() async throws {
let handler1 = MockPresentStreamHandler()
let handler2 = MockPresentStreamHandler()
// Define the data size, 5MB
let dataSize = 5 * 1024 * 1024
var largeData = Data(capacity: dataSize)

// Generate random data
for _ in 0 ..< dataSize {
largeData.append(UInt8.random(in: 0 ... 255))
}

let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 9085)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler1,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 9086)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

try? await Task.sleep(for: .milliseconds(50))

let connection1 = try peer1.connect(
to: NetAddr(ipAddress: "127.0.0.1", port: 9086)!, role: .validator
)
try? await Task.sleep(for: .milliseconds(50))

let receivedData1 = try await connection1.request(
MockRequest(kind: .typeA, data: largeData)
)

// Verify that the received data matches the original large data
#expect(receivedData1 == largeData + Data(" response".utf8))

peer1.broadcast(
kind: .uniqueA, message: .init(kind: .uniqueA, data: largeData)
)
try? await Task.sleep(for: .milliseconds(50))

peer2.broadcast(
kind: .uniqueB, message: .init(kind: .uniqueB, data: largeData)
)
// Verify last received data
try? await Task.sleep(for: .milliseconds(1000))
await #expect(handler2.lastReceivedData == largeData)
await #expect(handler1.lastReceivedData == largeData)
}

@Test
func peerFailureRecovery() async throws {
let handler2 = MockPresentStreamHandler()
let messageData = Data("Post-recovery message".utf8)

let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 185)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

let peer2 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 186)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

let peer3 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 187)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)

try? await Task.sleep(for: .milliseconds(50))

let connection = try peer1.connect(
to: NetAddr(ipAddress: "127.0.0.1", port: 186)!, role: .validator
)
try? await Task.sleep(for: .milliseconds(50))

let receivedData = try await connection.request(
MockRequest(kind: .typeA, data: messageData)
)

#expect(receivedData == messageData + Data(" response".utf8))
try? await Task.sleep(for: .milliseconds(50))
// Simulate a peer failure by disconnecting one peer
connection.close(abort: true)
// Wait to simulate downtime
try? await Task.sleep(for: .milliseconds(200))
// check the peer is usable & connect to another peer
let connection2 = try peer1.connect(
to: peer3.listenAddress(),
role: .validator
)
try? await Task.sleep(for: .milliseconds(50))
let receivedData2 = try await connection2.request(
MockRequest(kind: .typeA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(50))
#expect(receivedData2 == messageData + Data(" response".utf8))
// Reconnect the failing peer
let reconnection = try peer1.connect(
to: peer2.listenAddress(),
role: .validator
)
try? await Task.sleep(for: .milliseconds(50))
let recoverData = try await reconnection.request(
MockRequest(kind: .typeA, data: messageData)
)
try? await Task.sleep(for: .milliseconds(50))
#expect(recoverData == messageData + Data(" response".utf8))
peer1.broadcast(
kind: .uniqueC, message: .init(kind: .uniqueC, data: messageData)
)
try? await Task.sleep(for: .milliseconds(50))
await #expect(handler2.lastReceivedData == messageData)
}

@Test
func peerBroadcast() async throws {
let handler1 = MockPresentStreamHandler()
Expand Down

0 comments on commit 23a8d3c

Please sign in to comment.