diff --git a/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java b/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java index 17faf39560..681ed74b32 100644 --- a/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java +++ b/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java @@ -90,4 +90,11 @@ void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long b */ void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException; + /** + * 清楚游标緩存 + * @param clientIdentity + * @throws CanalMetaManagerException + */ + default void removeCursorsCache(ClientIdentity clientIdentity) throws CanalMetaManagerException {}; + } diff --git a/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java b/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java index 0883e44743..e5a3a478a4 100644 --- a/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java +++ b/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java @@ -113,6 +113,11 @@ public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManage batches.get(clientIdentity).clearPositionRanges(); } + @Override + public void removeCursorsCache(ClientIdentity clientIdentity) throws CanalMetaManagerException { + cursors.remove(clientIdentity); + } + // ============================ public static class MemoryClientIdentityBatch { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java index c535da4cdf..acc81a987b 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java @@ -327,6 +327,7 @@ public boolean sink(EVENT event) { eventSink.interrupt(); transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据 binlogParser.reset();// 重新置位 + removeLogPositionCacheWhenError(destination); if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) { // 处理 RejectedExecutionException try { @@ -355,6 +356,8 @@ public boolean sink(EVENT event) { parseThread.start(); } + + public void stop() { super.stop(); @@ -471,6 +474,14 @@ protected void processDumpError(Throwable e) { // do nothing } + /** + * 如果是缓存,则清理缓存,允许修改底层的持久化文件的时候无需重启canal + * @param destination + */ + protected void removeLogPositionCacheWhenError(String destination) { + // do nothing + } + protected void startHeartBeat(ErosaConnection connection) { lastEntryTime = 0L; // 初始化 if (timer == null) {// lazy初始化一下 diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java index 4e404e59fd..146e9d68d1 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java @@ -859,6 +859,14 @@ protected void processDumpError(Throwable e) { super.processDumpError(e); } + @Override + protected void removeLogPositionCacheWhenError(String destination) { + if (dumpErrorCount > dumpErrorCountThreshold + 1) { + // 这里+1 是允许主从切换的时候重试一下再清缓存 + logPositionManager.removeLogPositionCache(destination); + } + } + public void setSupportBinlogFormats(String formatStrs) { String[] formats = StringUtils.split(formatStrs, ','); if (formats != null) { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java index 2d7cd49f1d..3e09c49b19 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java @@ -13,4 +13,9 @@ public interface CanalLogPositionManager extends CanalLifeCycle { void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException; + /** + * 一般使用在异常场景清理缓存,这样持久化的存储更新后能够得到最新的数据,不需要重启机器 + */ + default void removeLogPositionCache(String destination){} + } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java index 8d6fea98ff..ed52df9902 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java @@ -80,4 +80,10 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro secondary.persistLogPosition(destination, logPosition); } } + + @Override + public void removeLogPositionCache(String destination) { + primary.removeLogPositionCache(destination); + secondary.removeLogPositionCache(destination); + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java index 4a0ab4a8b3..04720727c2 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java @@ -188,4 +188,9 @@ private LogPosition loadDataFromFile(File dataFile) { throw new CanalMetaManagerException(e); } } + + @Override + public void removeLogPositionCache(String destination) { + memoryLogPositionManager.removeLogPositionCache(destination); + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java index 1658dd4321..d2a676878e 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java @@ -40,4 +40,8 @@ public Set destinations() { return positions.keySet(); } + @Override + public void removeLogPositionCache(String destination) { + positions.remove(destination); + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java index 35b26460ee..44d2e0414f 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java @@ -75,4 +75,17 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro // do nothing logger.info("destination [{}] persist LogPosition:{}", destination, logPosition); } + + @Override + public void removeLogPositionCache(String destination) { + List clientIdentities = metaManager.listAllSubscribeInfo(destination); + LogPosition result = null; + if (CollectionUtils.isEmpty(clientIdentities)) { + return; + } + for (ClientIdentity clientIdentity : clientIdentities) { + metaManager.removeCursorsCache(clientIdentity); + } + + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java index 44f9d53405..2b2d2c16d0 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java @@ -81,4 +81,9 @@ public void persistLogPosition(final String destination, final LogPosition logPo } }); } + + @Override + public void removeLogPositionCache(String destination) { + memoryLogPositionManager.removeLogPositionCache(destination); + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java index 47d919a664..38f451b317 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java @@ -112,4 +112,9 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro persistTasks.add(destination); memoryLogPositionManager.persistLogPosition(destination, logPosition); } + + @Override + public void removeLogPositionCache(String destination) { + memoryLogPositionManager.removeLogPositionCache(destination); + } }