Skip to content

Commit

Permalink
more p2p test (#198)
Browse files Browse the repository at this point in the history
* Merge branch 'dev-p2p-test' of github.com:AcalaNetwork/boka into dev-p2p-test

* update more tests

* update more peer test

* update peer test

* update random
  • Loading branch information
MacOMNI authored Oct 28, 2024
1 parent 96aa905 commit b0d251f
Showing 1 changed file with 221 additions and 6 deletions.
227 changes: 221 additions & 6 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,22 @@ struct PeerTests {
}
}

actor DataStorage {
private(set) var lastReceivedData: Data?

func updateData(_ data: Data?) {
lastReceivedData = data
}
}

struct MockEphemeralStreamHandler: EphemeralStreamHandler {
typealias StreamKind = EphemeralStreamKind
typealias Request = MockRequest<EphemeralStreamKind>
private let dataStorage = DataStorage()

var lastReceivedData: Data? {
get async { await dataStorage.lastReceivedData }
}

func createDecoder(kind: StreamKind) -> any MessageDecoder<Request> {
MockEphemeralMessageDecoder(kind: kind)
Expand All @@ -99,12 +112,18 @@ struct PeerTests {
littleEndian: lengthData.withUnsafeBytes { $0.loadUnaligned(as: UInt32.self) }
)
let actualData = data.dropFirst(4).prefix(Int(length))

await dataStorage.updateData(actualData)
return actualData
}
}

struct MockPresentStreamHandler: PresistentStreamHandler {
private let dataStorage = DataStorage()

var lastReceivedData: Data? {
get async { await dataStorage.lastReceivedData }
}

func streamOpened(
connection _: any Networking.ConnectionInfoProtocol,
stream _: any Networking.StreamProtocol<PeerTests.MockRequest<PeerTests.UniquePresistentStreamKind>>,
Expand All @@ -113,8 +132,11 @@ struct PeerTests {

func handle(
connection _: any Networking.ConnectionInfoProtocol,
message _: PeerTests.MockRequest<PeerTests.UniquePresistentStreamKind>
) async throws {}
message: PeerTests.MockRequest<PeerTests.UniquePresistentStreamKind>
) async throws {
let data = message.data
await dataStorage.updateData(data)
}

typealias StreamKind = UniquePresistentStreamKind
typealias Request = MockRequest<UniquePresistentStreamKind>
Expand All @@ -132,13 +154,16 @@ struct PeerTests {

@Test
func peerBroadcast() async throws {
let handler1 = MockPresentStreamHandler()
let handler2 = MockPresentStreamHandler()

let peer1 = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8081)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32()),
presistentStreamHandler: MockPresentStreamHandler(),
presistentStreamHandler: handler1,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
Expand All @@ -150,7 +175,7 @@ struct PeerTests {
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 8082)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32()),
presistentStreamHandler: MockPresentStreamHandler(),
presistentStreamHandler: handler2,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
Expand All @@ -171,7 +196,10 @@ struct PeerTests {
peer2.broadcast(
kind: .uniqueB, message: .init(kind: .uniqueB, data: Data("I am jam".utf8))
)
try? await Task.sleep(for: .milliseconds(500))
// Verify last received data
try? await Task.sleep(for: .milliseconds(1000))
await #expect(handler2.lastReceivedData == Data("hello world".utf8))
await #expect(handler1.lastReceivedData == Data("I am jam".utf8))
}

@Test
Expand Down Expand Up @@ -222,4 +250,191 @@ struct PeerTests {
)
#expect(dataList2 == Data("I am jam".utf8))
}

@Test
func multiplePeerBroadcast() async throws {
var peers: [Peer<MockStreamHandler>] = []
// Create 100 peer nodes
for i in 0 ..< 100 {
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .builder,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(7081 + i))!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
peers.append(peer)
}

try? await Task.sleep(for: .milliseconds(100))

// Connect each peer to the next one in a circular network
for i in 0 ..< peers.count {
let nextPeerIndex = (i + 1) % peers.count
_ = try peers[i].connect(
to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(7081 + nextPeerIndex))!,
role: .validator
)
}

try? await Task.sleep(for: .milliseconds(100))

// Broadcast a message from each peer
for (i, peer) in peers.enumerated() {
let message = MockRequest(
kind: i % 2 == 0 ? UniquePresistentStreamKind.uniqueA : UniquePresistentStreamKind.uniqueB,
data: Data("Message from peer \(i)".utf8)
)
peer.broadcast(kind: message.kind, message: message)
}

// Wait for message propagation
try? await Task.sleep(for: .milliseconds(200))
}

@Test
func multiplePeerRequest() async throws {
var peers: [Peer<MockStreamHandler>] = []

// Create 100 peer nodes
for i in 0 ..< 100 {
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .builder,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(6091 + i))!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
peers.append(peer)
}

// Wait for peers to initialize
try? await Task.sleep(for: .milliseconds(100))

// Test request-response by having each peer request from the next peer
for i in 0 ..< 100 {
let messageData = Data("Request from peer \(i)".utf8)
let port = UInt16(6091 + (i + 1) % 100)
let type = (i + 1) % 2 == 0 ? EphemeralStreamKind.typeA : EphemeralStreamKind.typeB
let response = try await peers[i].connect(
to: NetAddr(ipAddress: "127.0.0.1", port: port)!,
role: .validator
).request(MockRequest(kind: type, data: messageData))
#expect(response == messageData, "Peer \(i) should receive correct response")
}
}

@Test
func highConcurrentRequest() async throws {
var peers: [Peer<MockStreamHandler>] = []

// Create 100 peers
for i in 0 ..< 100 {
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(8300 + i))!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
peers.append(peer)
}

for i in 0 ..< peers.count - 1 {
_ = try peers[i].connect(
to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(8300 + i + 1))!,
role: .validator
)
}

// Allow connections to establish
try? await Task.sleep(for: .milliseconds(100))

// Send multiple requests from each peer
for peer in peers {
let tasks = (1 ... 88).map { _ in
Task {
let net = try peer.listenAddress()
let random = Int.random(in: 0 ..< 100)
let type = random % 2 == 0 ? EphemeralStreamKind.typeA : EphemeralStreamKind.typeB
let messageData = Data("Concurrent request \(net.description) + \(random)".utf8)
let response = try await peer.connect(
to: net,
role: .validator
).request(MockRequest(kind: type, data: messageData))
#expect(response == messageData, "Peer should receive correct response")
}
}
// Wait for all tasks to complete
for task in tasks {
try await task.value
}
}
}

@Test
func broadcastSynchronization() async throws {
var peers: [Peer<MockStreamHandler>] = []
var handles: [MockPresentStreamHandler] = []

// Create 50 peers with unique addresses
for i in 0 ..< 50 {
let handle = MockPresentStreamHandler()
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: UInt16(100 + i))!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32()),
presistentStreamHandler: handle,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
handles.append(handle)
peers.append(peer)
}

// Connect each peer to form a fully connected network
for i in 0 ..< peers.count {
for j in 0 ..< peers.count where i != j {
_ = try peers[i].connect(
to: NetAddr(ipAddress: "127.0.0.1", port: UInt16(100 + j))!,
role: .validator
)
}
}

// Wait for all connections to establish
try? await Task.sleep(for: .seconds(10))

let centralPeer = peers[0]
let messagedata = Data("Sync message".utf8)
centralPeer.broadcast(kind: .uniqueA, message: MockRequest(kind: .uniqueA, data: messagedata))

// Wait for message to propagate
try? await Task.sleep(for: .seconds(60))

// Check that each peer received the broadcast
for i in 1 ..< handles.count {
let receivedData = await handles[i].lastReceivedData
#expect(receivedData == messagedata, "Handle should have received the broadcast message")
}
}
}

0 comments on commit b0d251f

Please sign in to comment.