Skip to content

Commit

Permalink
list type numShards and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Sunjeet committed Jan 19, 2025
1 parent eb06338 commit 7b9dcb8
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import com.netflix.hollow.core.schema.HollowListSchema;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.logging.Logger;

public class HollowListTypeWriteState extends HollowTypeWriteState {
private static final Logger LOG = Logger.getLogger(HollowListTypeWriteState.class.getName());

/// statistics required for writing fixed length list data
private int bitsPerListPointer;
Expand Down Expand Up @@ -68,18 +70,10 @@ public void prepareForWrite() {
}

private void gatherStatistics() {
if(numShards == -1)
calculateNumShards();
revNumShards = numShards;
gatherShardingStats();

int maxOrdinal = ordinalMap.maxOrdinal();
int maxElementOrdinal = 0;

maxShardOrdinal = new int[numShards];
int minRecordLocationsPerShard = (maxOrdinal + 1) / numShards;
for(int i=0;i<numShards;i++)
maxShardOrdinal[i] = (i < ((maxOrdinal + 1) & (numShards - 1))) ? minRecordLocationsPerShard : minRecordLocationsPerShard - 1;

ByteData data = ordinalMap.getByteData().getUnderlyingArray();

totalOfListSizes = new long[numShards];
Expand Down Expand Up @@ -113,8 +107,8 @@ private void gatherStatistics() {
bitsPerListPointer = maxShardTotalOfListSizes == 0 ? 1 : 64 - Long.numberOfLeadingZeros(maxShardTotalOfListSizes);
}

private void calculateNumShards() {
int maxOrdinal = ordinalMap.maxOrdinal();
@Override
protected int typeStateNumShards(int maxOrdinal) {
ByteData data = ordinalMap.getByteData().getUnderlyingArray();

long maxElementOrdinal = 0;
Expand Down Expand Up @@ -144,9 +138,10 @@ private void calculateNumShards() {
long projectedSizeOfType = (bitsPerElement * totalOfListSizes) / 8;
projectedSizeOfType += (bitsPerListPointer * maxOrdinal + 1) / 8;

numShards = 1;
while(stateEngine.getTargetMaxTypeShardSize() * numShards < projectedSizeOfType)
numShards *= 2;
int targetNumShards = 1;
while(stateEngine.getTargetMaxTypeShardSize() * targetNumShards < projectedSizeOfType)
targetNumShards *= 2;
return targetNumShards;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,7 @@ public void prepareForWrite() {
}

private void gatherStatistics() {
int maxOrdinal = ordinalMap.maxOrdinal();
if(numShards == -1) {
numShards = calculateNumShards(maxOrdinal);
revNumShards = numShards;
} else {
revNumShards = numShards;
if (allowTypeResharding()) {
numShards = calculateNumShards(maxOrdinal);
if (numShards != revNumShards) { // re-sharding
// limit numShards to 2x or .5x of prevShards per producer cycle
numShards = numShards > revNumShards ? revNumShards * 2 : revNumShards / 2;

LOG.info(String.format("Num shards for type %s changing from %s to %s", schema.getName(), revNumShards, numShards));
addReshardingHeader(revNumShards, numShards); // SNAP: TODO: Here,
}
}
}

maxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, numShards);
if (revNumShards > 0) {
revMaxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, revNumShards);
}
gatherShardingStats();

int maxKeyOrdinal = 0;
int maxValueOrdinal = 0;
Expand Down Expand Up @@ -154,8 +133,9 @@ private void gatherStatistics() {

bitsPerMapPointer = 64 - Long.numberOfLeadingZeros(maxShardTotalOfMapBuckets);
}

int calculateNumShards(int maxOrdinal) {

@Override
protected int typeStateNumShards(int maxOrdinal) {
int maxKeyOrdinal = 0;
int maxValueOrdinal = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,11 @@ public void prepareForWrite() {

fieldStats.completeCalculations();

if (numShards == -1) {
numShards = targetNumShards(maxOrdinal);
revNumShards = numShards;
} else {
revNumShards = numShards;
if (allowTypeResharding()) { // SNAP: TODO: extend to other types
numShards = targetNumShards(maxOrdinal);
if (numShards != revNumShards) { // re-sharding
// limit numShards to 2x or .5x of prevShards per producer cycle
numShards = numShards > revNumShards ? revNumShards * 2 : revNumShards / 2;

LOG.info(String.format("Num shards for type %s changing from %s to %s", schema.getName(), revNumShards, numShards));
addReshardingHeader(revNumShards, numShards);
}
}
}

maxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, numShards);
if (revNumShards > 0) {
revMaxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, revNumShards);
}
gatherShardingStats();
}

private int targetNumShards(int maxOrdinal) {
@Override
protected int typeStateNumShards(int maxOrdinal) {
long projectedSizeOfType = ((long)fieldStats.getNumBitsPerRecord() * (maxOrdinal + 1)) / 8;
projectedSizeOfType += fieldStats.getTotalSizeOfAllVarLengthData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,7 @@ public void prepareForWrite() {
}

private void gatherStatistics() {
int maxOrdinal = ordinalMap.maxOrdinal();
if(numShards == -1) {
numShards = calculateNumShards(maxOrdinal);
revNumShards = numShards;
} else {
revNumShards = numShards;
if (allowTypeResharding()) {
numShards = calculateNumShards(maxOrdinal);
if (numShards != revNumShards) { // re-sharding
// limit numShards to 2x or .5x of prevShards per producer cycle
numShards = numShards > revNumShards ? revNumShards * 2 : revNumShards / 2;

LOG.info(String.format("Num shards for type %s changing from %s to %s", schema.getName(), revNumShards, numShards));
addReshardingHeader(revNumShards, numShards); // SNAP: TODO: Here,
}
}
}

maxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, numShards);
if (revNumShards > 0) {
revMaxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, revNumShards);
}
gatherShardingStats();

int maxElementOrdinal = 0;
int maxSetSize = 0;
Expand Down Expand Up @@ -141,8 +120,9 @@ private void gatherStatistics() {
bitsPerSetSizeValue = 64 - Long.numberOfLeadingZeros(maxSetSize);
bitsPerSetPointer = 64 - Long.numberOfLeadingZeros(maxShardTotalOfSetBuckets);
}

private int calculateNumShards(int maxOrdinal) {

@Override
protected int typeStateNumShards(int maxOrdinal) {
int maxSetSize = 0;
int maxElementOrdinal = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,33 @@ public boolean allowTypeResharding() {
return stateEngine.allowTypeResharding();
}

protected void gatherShardingStats() {
int maxOrdinal = ordinalMap.maxOrdinal();
if(numShards == -1) {
numShards = typeStateNumShards(maxOrdinal);
revNumShards = numShards;
} else {
revNumShards = numShards;
if (allowTypeResharding()) {
numShards = typeStateNumShards(maxOrdinal);
if (numShards != revNumShards) { // re-sharding
// limit numShards to 2x or .5x of prevShards per producer cycle
numShards = numShards > revNumShards ? revNumShards * 2 : revNumShards / 2;

LOG.info(String.format("Num shards for type %s changing from %s to %s", schema.getName(), revNumShards, numShards));
addReshardingHeader(revNumShards, numShards); // SNAP: TODO: Here,
}
}
}

maxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, numShards);
if (revNumShards > 0) {
revMaxShardOrdinal = calcMaxShardOrdinal(maxOrdinal, revNumShards);
}
}

protected abstract int typeStateNumShards(int maxOrdinal);

/**
* A header tag indicating that num shards for a type has changed since the prior version. Its value encodes
* the type(s) that were re-sharded along with the before and after num shards in the fwd delta direction.
Expand Down

0 comments on commit 7b9dcb8

Please sign in to comment.