Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在位点出现问题的时候,清空位点缓存,这样只要修正配置文件就可以获取最新,而不需要重启canal #4577

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,11 @@ void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long b
*/
void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**
* 清楚游标緩存
* @param clientIdentity
* @throws CanalMetaManagerException
*/
default void removeCursorsCache(ClientIdentity clientIdentity) throws CanalMetaManagerException {};

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManage
batches.get(clientIdentity).clearPositionRanges();
}

@Override
public void removeCursorsCache(ClientIdentity clientIdentity) throws CanalMetaManagerException {
cursors.remove(clientIdentity);
}

// ============================

public static class MemoryClientIdentityBatch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ public boolean sink(EVENT event) {
eventSink.interrupt();
transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
binlogParser.reset();// 重新置位
removeLogPositionCacheWhenError(destination);
if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
// 处理 RejectedExecutionException
try {
Expand Down Expand Up @@ -355,6 +356,8 @@ public boolean sink(EVENT event) {
parseThread.start();
}



public void stop() {
super.stop();

Expand Down Expand Up @@ -471,6 +474,14 @@ protected void processDumpError(Throwable e) {
// do nothing
}

/**
* 如果是缓存,则清理缓存,允许修改底层的持久化文件的时候无需重启canal
* @param destination
*/
protected void removeLogPositionCacheWhenError(String destination) {
// do nothing
}

protected void startHeartBeat(ErosaConnection connection) {
lastEntryTime = 0L; // 初始化
if (timer == null) {// lazy初始化一下
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,14 @@ protected void processDumpError(Throwable e) {
super.processDumpError(e);
}

@Override
protected void removeLogPositionCacheWhenError(String destination) {
if (dumpErrorCount > dumpErrorCountThreshold + 1) {
// 这里+1 是允许主从切换的时候重试一下再清缓存
logPositionManager.removeLogPositionCache(destination);
}
}

public void setSupportBinlogFormats(String formatStrs) {
String[] formats = StringUtils.split(formatStrs, ',');
if (formats != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ public interface CanalLogPositionManager extends CanalLifeCycle {

void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException;

/**
* 一般使用在异常场景清理缓存,这样持久化的存储更新后能够得到最新的数据,不需要重启机器
*/
default void removeLogPositionCache(String destination){}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro
secondary.persistLogPosition(destination, logPosition);
}
}

@Override
public void removeLogPositionCache(String destination) {
primary.removeLogPositionCache(destination);
secondary.removeLogPositionCache(destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,9 @@ private LogPosition loadDataFromFile(File dataFile) {
throw new CanalMetaManagerException(e);
}
}

@Override
public void removeLogPositionCache(String destination) {
memoryLogPositionManager.removeLogPositionCache(destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ public Set<String> destinations() {
return positions.keySet();
}

@Override
public void removeLogPositionCache(String destination) {
positions.remove(destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,17 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro
// do nothing
logger.info("destination [{}] persist LogPosition:{}", destination, logPosition);
}

@Override
public void removeLogPositionCache(String destination) {
List<ClientIdentity> clientIdentities = metaManager.listAllSubscribeInfo(destination);
LogPosition result = null;
if (CollectionUtils.isEmpty(clientIdentities)) {
return;
}
for (ClientIdentity clientIdentity : clientIdentities) {
metaManager.removeCursorsCache(clientIdentity);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public void persistLogPosition(final String destination, final LogPosition logPo
}
});
}

@Override
public void removeLogPositionCache(String destination) {
memoryLogPositionManager.removeLogPositionCache(destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro
persistTasks.add(destination);
memoryLogPositionManager.persistLogPosition(destination, logPosition);
}

@Override
public void removeLogPositionCache(String destination) {
memoryLogPositionManager.removeLogPositionCache(destination);
}
}