Skip to content

Commit

Permalink
Change delta format (#19175)
Browse files Browse the repository at this point in the history
Change the delta file format to diff only against the previous
generation, as opposed to all existing generations. This maintains
backwards compatibility for now, but adds to the DB only the minimum
amount of hashes so we can sync. Once all clients update to this new
delta format, it will no longer be needed to store all the hashes into
the DB.
  • Loading branch information
altendky authored Jan 31, 2025
2 parents 4363032 + 5547cee commit 63d35aa
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 71 deletions.
123 changes: 91 additions & 32 deletions chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dataclasses import dataclass
from pathlib import Path
from random import Random
from typing import Any, Callable, Optional
from typing import Any, BinaryIO, Callable, Optional

import aiohttp
import pytest
Expand All @@ -27,6 +27,7 @@
ProofOfInclusion,
ProofOfInclusionLayer,
Root,
SerializedNode,
ServerInfo,
Side,
Status,
Expand All @@ -40,7 +41,7 @@
from chia.data_layer.data_store import DataStore
from chia.data_layer.download_data import insert_from_delta_file, write_files_for_root
from chia.data_layer.util.benchmark import generate_datastore
from chia.data_layer.util.merkle_blob import RawLeafMerkleNode
from chia.data_layer.util.merkle_blob import MerkleBlob, RawInternalMerkleNode, RawLeafMerkleNode, TreeIndex
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.byte_types import hexstr_to_bytes
Expand Down Expand Up @@ -1279,16 +1280,55 @@ async def mock_http_download(
assert sinfo.ignore_till == start_timestamp # we don't increase on second failure


@pytest.mark.parametrize(
"test_delta",
[True, False],
)
async def write_tree_to_file_old_format(
data_store: DataStore,
root: Root,
node_hash: bytes32,
store_id: bytes32,
writer: BinaryIO,
merkle_blob: Optional[MerkleBlob] = None,
hash_to_index: Optional[dict[bytes32, TreeIndex]] = None,
) -> None:
if node_hash == bytes32.zeros:
return

if merkle_blob is None:
merkle_blob = await data_store.get_merkle_blob(root.node_hash)
if hash_to_index is None:
hash_to_index = merkle_blob.get_hashes_indexes()

generation = await data_store.get_first_generation(node_hash, store_id)
# Root's generation is not the first time we see this hash, so it's not a new delta.
if root.generation != generation:
return

raw_index = hash_to_index[node_hash]
raw_node = merkle_blob.get_raw_node(raw_index)

to_write = b""
if isinstance(raw_node, RawInternalMerkleNode):
left_hash = merkle_blob.get_hash_at_index(raw_node.left)
right_hash = merkle_blob.get_hash_at_index(raw_node.right)
await write_tree_to_file_old_format(data_store, root, left_hash, store_id, writer, merkle_blob, hash_to_index)
await write_tree_to_file_old_format(data_store, root, right_hash, store_id, writer, merkle_blob, hash_to_index)
to_write = bytes(SerializedNode(False, bytes(left_hash), bytes(right_hash)))
elif isinstance(raw_node, RawLeafMerkleNode):
node = await data_store.get_terminal_node(raw_node.key, raw_node.value, store_id)
to_write = bytes(SerializedNode(True, node.key, node.value))
else:
raise Exception(f"Node is neither InternalNode nor TerminalNode: {raw_node}")

writer.write(len(to_write).to_bytes(4, byteorder="big"))
writer.write(to_write)


@pytest.mark.parametrize(argnames="test_delta", argvalues=["full", "delta", "old"])
@boolean_datacases(name="group_files_by_store", false="group by singleton", true="don't group by singleton")
@pytest.mark.anyio
async def test_data_server_files(
data_store: DataStore,
store_id: bytes32,
test_delta: bool,
test_delta: str,
group_files_by_store: bool,
tmp_path: Path,
) -> None:
Expand All @@ -1304,37 +1344,56 @@ async def test_data_server_files(

keys: list[bytes] = []
counter = 0

for batch in range(num_batches):
changelist: list[dict[str, Any]] = []
for operation in range(num_ops_per_batch):
if random.randint(0, 4) > 0 or len(keys) == 0:
key = counter.to_bytes(4, byteorder="big")
value = (2 * counter).to_bytes(4, byteorder="big")
keys.append(key)
changelist.append({"action": "insert", "key": key, "value": value})
num_repeats = 2

# Repeat twice to guarantee there will be hashes from the old file format
for _ in range(num_repeats):
for batch in range(num_batches):
changelist: list[dict[str, Any]] = []
if batch == num_batches - 1:
for key in keys:
changelist.append({"action": "delete", "key": key})
keys = []
counter = 0
else:
key = random.choice(keys)
keys.remove(key)
changelist.append({"action": "delete", "key": key})
counter += 1
await data_store_server.insert_batch(store_id, changelist, status=Status.COMMITTED)
root = await data_store_server.get_tree_root(store_id)
await data_store_server.add_node_hashes(store_id)
await write_files_for_root(
data_store_server, store_id, root, tmp_path, 0, group_by_store=group_files_by_store
)
roots.append(root)
for operation in range(num_ops_per_batch):
if random.randint(0, 4) > 0 or len(keys) == 0:
key = counter.to_bytes(4, byteorder="big")
value = (2 * counter).to_bytes(4, byteorder="big")
keys.append(key)
changelist.append({"action": "insert", "key": key, "value": value})
else:
key = random.choice(keys)
keys.remove(key)
changelist.append({"action": "delete", "key": key})
counter += 1

await data_store_server.insert_batch(store_id, changelist, status=Status.COMMITTED)
root = await data_store_server.get_tree_root(store_id)
await data_store_server.add_node_hashes(store_id)
if test_delta == "old":
node_hash = root.node_hash if root.node_hash is not None else bytes32.zeros
filename = get_delta_filename_path(
tmp_path, store_id, node_hash, root.generation, group_files_by_store
)
filename.parent.mkdir(parents=True, exist_ok=True)
with open(filename, "xb") as writer:
await write_tree_to_file_old_format(data_store_server, root, node_hash, store_id, writer)
else:
await write_files_for_root(
data_store_server, store_id, root, tmp_path, 0, group_by_store=group_files_by_store
)
roots.append(root)

generation = 1
assert len(roots) == num_batches
assert len(roots) == num_batches * num_repeats
for root in roots:
assert root.node_hash is not None
if not test_delta:
filename = get_full_tree_filename_path(tmp_path, store_id, root.node_hash, generation, group_files_by_store)
node_hash = root.node_hash if root.node_hash is not None else bytes32.zeros
if test_delta == "full":
filename = get_full_tree_filename_path(tmp_path, store_id, node_hash, generation, group_files_by_store)
assert filename.exists()
else:
filename = get_delta_filename_path(tmp_path, store_id, root.node_hash, generation, group_files_by_store)
filename = get_delta_filename_path(tmp_path, store_id, node_hash, generation, group_files_by_store)
assert filename.exists()
await data_store.insert_into_data_store_from_file(store_id, root.node_hash, tmp_path.joinpath(filename))
current_root = await data_store.get_tree_root(store_id=store_id)
Expand Down
1 change: 1 addition & 0 deletions chia/data_layer/data_layer_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def key_hash(key: bytes) -> bytes32:
return bytes32(sha256(b"\1" + key).digest())


# TODO: allow Optional[bytes32] for `node_hash` and resolve the filenames here
def get_full_tree_filename(store_id: bytes32, node_hash: bytes32, generation: int, group_by_store: bool = False) -> str:
if group_by_store:
return f"{store_id}/{node_hash}-full-{generation}-v1.0.dat"
Expand Down
134 changes: 95 additions & 39 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,7 @@ async def managed(
kid INTEGER,
vid INTEGER,
store_id BLOB NOT NULL CHECK(length(store_id) == 32),
PRIMARY KEY(store_id, hash),
FOREIGN KEY (kid) REFERENCES ids(kv_id),
FOREIGN KEY (vid) REFERENCES ids(kv_id)
PRIMARY KEY(store_id, hash)
)
"""
)
Expand All @@ -178,6 +176,11 @@ async def managed(
CREATE INDEX IF NOT EXISTS ids_blob_index ON ids(blob, store_id)
"""
)
await writer.execute(
"""
CREATE INDEX IF NOT EXISTS nodes_generation_index ON nodes(generation)
"""
)

yield self

Expand Down Expand Up @@ -236,32 +239,69 @@ async def insert_into_data_store_from_file(
if node_hash not in internal_nodes and node_hash not in terminal_nodes:
missing_hashes.append(node_hash)

# TODO: consider adding transactions around this code
root = await self.get_tree_root(store_id=store_id)
latest_blob = await self.get_merkle_blob(root.node_hash, read_only=True)
known_hashes: dict[bytes32, TreeIndex] = {}
if not latest_blob.empty():
nodes = latest_blob.get_nodes_with_indexes()
known_hashes = {node.hash: index for index, node in nodes}

new_missing_hashes: list[bytes32] = []
for hash in missing_hashes:
if hash in known_hashes:
assert root.node_hash is not None, "if root.node_hash were None then known_hashes would be empty"
merkle_blob_queries[root.node_hash].append(known_hashes[hash])
else:
new_missing_hashes.append(hash)

missing_hashes = new_missing_hashes
if missing_hashes:
async with self.db_wrapper.reader() as reader:
cursor = await reader.execute(
"SELECT MAX(generation) FROM nodes WHERE store_id = ?",
(store_id,),
)
row = await cursor.fetchone()
if row is None or row[0] is None:
current_generation = 0
else:
current_generation = row[0]

batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10)
found_hashes: set[bytes32] = set()

async with self.db_wrapper.reader() as reader:
for batch in to_batches(missing_hashes, batch_size):
placeholders = ",".join(["?"] * len(batch.entries))
query = f"""
SELECT hash, root_hash, idx
FROM nodes
WHERE store_id = ? AND hash IN ({placeholders})
LIMIT {len(placeholders)}
"""
while missing_hashes:
found_hashes: set[bytes32] = set()
async with self.db_wrapper.reader() as reader:
for batch in to_batches(missing_hashes, batch_size):
placeholders = ",".join(["?"] * len(batch.entries))
query = f"""
SELECT hash, root_hash, idx
FROM nodes
WHERE store_id = ? AND hash IN ({placeholders})
LIMIT {len(placeholders)}
"""

async with reader.execute(query, (store_id, *batch.entries)) as cursor:
rows = await cursor.fetchall()
for row in rows:
node_hash = bytes32(row["hash"])
root_hash_blob = bytes32(row["root_hash"])
index = row["idx"]
if node_hash in found_hashes:
raise Exception("Internal error: duplicate node_hash found in nodes table")
merkle_blob_queries[root_hash_blob].append(index)
found_hashes.add(node_hash)

if found_hashes != set(missing_hashes):
raise Exception("Invalid delta file, cannot find all the required hashes")
async with reader.execute(query, (store_id, *batch.entries)) as cursor:
rows = await cursor.fetchall()
for row in rows:
node_hash = bytes32(row["hash"])
root_hash_blob = bytes32(row["root_hash"])
index = row["idx"]
if node_hash in found_hashes:
raise Exception("Internal error: duplicate node_hash found in nodes table")
merkle_blob_queries[root_hash_blob].append(index)
found_hashes.add(node_hash)

missing_hashes = [hash for hash in missing_hashes if hash not in found_hashes]
if missing_hashes:
if current_generation < root.generation:
current_generation += 1
else:
raise Exception("Invalid delta file, cannot find all the required hashes")

await self.add_node_hashes(store_id, current_generation)
log.info(f"Missing hashes: added old hashes from generation {current_generation}")

for root_hash_blob, indexes in merkle_blob_queries.items():
merkle_blob = await self.get_merkle_blob(root_hash_blob, read_only=True)
Expand All @@ -276,7 +316,6 @@ async def insert_into_data_store_from_file(

merkle_blob = MerkleBlob.from_node_list(internal_nodes, terminal_nodes, root_hash)
await self.insert_root_from_merkle_blob(merkle_blob, store_id, Status.COMMITTED)
await self.add_node_hashes(store_id)

async def migrate_db(self, server_files_location: Path) -> None:
async with self.db_wrapper.reader() as reader:
Expand Down Expand Up @@ -359,7 +398,12 @@ async def migrate_db(self, server_files_location: Path) -> None:
log.error(f"Cannot recover data from {filename}: {e}")
break

async def get_merkle_blob(self, root_hash: Optional[bytes32], read_only: bool = False) -> MerkleBlob:
async def get_merkle_blob(
self,
root_hash: Optional[bytes32],
read_only: bool = False,
update_cache: bool = True,
) -> MerkleBlob:
if root_hash is None:
return MerkleBlob(blob=bytearray())

Expand All @@ -381,7 +425,10 @@ async def get_merkle_blob(self, root_hash: Optional[bytes32], read_only: bool =
raise MerkleBlobNotFoundError(root_hash=root_hash)

merkle_blob = MerkleBlob(blob=bytearray(row["blob"]))
self.recent_merkle_blobs.put(root_hash, copy.deepcopy(merkle_blob))

if update_cache:
self.recent_merkle_blobs.put(root_hash, copy.deepcopy(merkle_blob))

return merkle_blob

async def insert_root_from_merkle_blob(
Expand Down Expand Up @@ -553,12 +600,12 @@ async def add_node_hash(
(store_id, hash, root_hash, generation, index),
)

async def add_node_hashes(self, store_id: bytes32) -> None:
root = await self.get_tree_root(store_id=store_id)
async def add_node_hashes(self, store_id: bytes32, generation: Optional[int] = None) -> None:
root = await self.get_tree_root(store_id=store_id, generation=generation)
if root.node_hash is None:
return

merkle_blob = await self.get_merkle_blob(root_hash=root.node_hash)
merkle_blob = await self.get_merkle_blob(root_hash=root.node_hash, read_only=True, update_cache=False)
hash_to_index = merkle_blob.get_hashes_indexes()

existing_hashes = await self.get_existing_hashes(list(hash_to_index.keys()), store_id)
Expand Down Expand Up @@ -661,8 +708,6 @@ async def change_root_status(self, root: Root, status: Status = Status.PENDING)
root.generation,
),
)
if root.node_hash is not None and status == Status.COMMITTED:
await self.add_node_hashes(root.store_id)

async def check(self) -> None:
for check in self._checks:
Expand Down Expand Up @@ -1407,6 +1452,7 @@ async def write_tree_to_file(
writer: BinaryIO,
merkle_blob: Optional[MerkleBlob] = None,
hash_to_index: Optional[dict[bytes32, TreeIndex]] = None,
existing_hashes: Optional[set[bytes32]] = None,
) -> None:
if node_hash == bytes32.zeros:
return
Expand All @@ -1415,11 +1461,17 @@ async def write_tree_to_file(
merkle_blob = await self.get_merkle_blob(root.node_hash)
if hash_to_index is None:
hash_to_index = merkle_blob.get_hashes_indexes()
if existing_hashes is None:
if root.generation == 0:
existing_hashes = set()
else:
previous_root = await self.get_tree_root(store_id=store_id, generation=root.generation - 1)
previous_merkle_blob = await self.get_merkle_blob(previous_root.node_hash)
previous_hashes_indexes = previous_merkle_blob.get_hashes_indexes()
existing_hashes = {hash for hash in previous_hashes_indexes.keys()}

if deltas_only:
generation = await self.get_first_generation(node_hash, store_id)
# Root's generation is not the first time we see this hash, so it's not a new delta.
if root.generation != generation:
if node_hash in existing_hashes:
return

raw_index = hash_to_index[node_hash]
Expand All @@ -1429,8 +1481,12 @@ async def write_tree_to_file(
if isinstance(raw_node, RawInternalMerkleNode):
left_hash = merkle_blob.get_hash_at_index(raw_node.left)
right_hash = merkle_blob.get_hash_at_index(raw_node.right)
await self.write_tree_to_file(root, left_hash, store_id, deltas_only, writer, merkle_blob, hash_to_index)
await self.write_tree_to_file(root, right_hash, store_id, deltas_only, writer, merkle_blob, hash_to_index)
await self.write_tree_to_file(
root, left_hash, store_id, deltas_only, writer, merkle_blob, hash_to_index, existing_hashes
)
await self.write_tree_to_file(
root, right_hash, store_id, deltas_only, writer, merkle_blob, hash_to_index, existing_hashes
)
to_write = bytes(SerializedNode(False, bytes(left_hash), bytes(right_hash)))
elif isinstance(raw_node, RawLeafMerkleNode):
node = await self.get_terminal_node(raw_node.key, raw_node.value, store_id)
Expand Down

0 comments on commit 63d35aa

Please sign in to comment.