From 76cb1abaa8edd2279de95ce792d697753937debb Mon Sep 17 00:00:00 2001 From: wizhuo <598381702@qq.com> Date: Tue, 17 Jan 2023 16:05:44 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=9C=A8=E4=BD=8D=E7=82=B9=E5=87=BA?= =?UTF-8?q?=E7=8E=B0=E9=97=AE=E9=A2=98=E7=9A=84=E6=97=B6=E5=80=99=EF=BC=8C?= =?UTF-8?q?=E6=B8=85=E7=A9=BA=E4=BD=8D=E7=82=B9=E7=BC=93=E5=AD=98=EF=BC=8C?= =?UTF-8?q?=E8=BF=99=E6=A0=B7=E5=8F=AA=E8=A6=81=E4=BF=AE=E6=AD=A3=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E6=96=87=E4=BB=B6=E5=B0=B1=E5=8F=AF=E4=BB=A5=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=9C=80=E6=96=B0=EF=BC=8C=E8=80=8C=E4=B8=8D=E9=9C=80?= =?UTF-8?q?=E8=A6=81=E9=87=8D=E5=90=AFcanal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../alibaba/otter/canal/meta/CanalMetaManager.java | 7 +++++++ .../alibaba/otter/canal/meta/MemoryMetaManager.java | 5 +++++ .../canal/parse/inbound/AbstractEventParser.java | 11 +++++++++++ .../canal/parse/inbound/mysql/MysqlEventParser.java | 8 ++++++++ .../canal/parse/index/CanalLogPositionManager.java | 5 +++++ .../parse/index/FailbackLogPositionManager.java | 6 ++++++ .../parse/index/FileMixedLogPositionManager.java | 5 +++++ .../canal/parse/index/MemoryLogPositionManager.java | 4 ++++ .../canal/parse/index/MetaLogPositionManager.java | 13 +++++++++++++ .../canal/parse/index/MixedLogPositionManager.java | 5 +++++ .../parse/index/PeriodMixedLogPositionManager.java | 5 +++++ 11 files changed, 74 insertions(+) diff --git a/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java b/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java index 17faf39560..681ed74b32 100644 --- a/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java +++ b/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java @@ -90,4 +90,11 @@ void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long b */ void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManagerException; + /** + * 清楚游标緩存 + * @param clientIdentity + * @throws CanalMetaManagerException + */ + default void removeCursorsCache(ClientIdentity clientIdentity) throws CanalMetaManagerException {}; + } diff --git a/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java b/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java index 0883e44743..e5a3a478a4 100644 --- a/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java +++ b/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java @@ -113,6 +113,11 @@ public void clearAllBatchs(ClientIdentity clientIdentity) throws CanalMetaManage batches.get(clientIdentity).clearPositionRanges(); } + @Override + public void removeCursorsCache(ClientIdentity clientIdentity) throws CanalMetaManagerException { + cursors.remove(clientIdentity); + } + // ============================ public static class MemoryClientIdentityBatch { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java index c535da4cdf..897f2ec708 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java @@ -327,6 +327,7 @@ public boolean sink(EVENT event) { eventSink.interrupt(); transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据 binlogParser.reset();// 重新置位 + removeLogPositionCacheWhenError(destination); if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) { // 处理 RejectedExecutionException try { @@ -355,6 +356,8 @@ public boolean sink(EVENT event) { parseThread.start(); } + + public void stop() { super.stop(); @@ -471,6 +474,14 @@ protected void processDumpError(Throwable e) { // do nothing } + /** + * 如果是缓存,则清理缓存,允许修改底层的持久化文件的时候无需重启canal + * @param destination + */ + protected void removeLogPositionCacheWhenError(String destination) { + logPositionManager.removeLogPositionCache(destination); + } + protected void startHeartBeat(ErosaConnection connection) { lastEntryTime = 0L; // 初始化 if (timer == null) {// lazy初始化一下 diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java index 4e404e59fd..146e9d68d1 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java @@ -859,6 +859,14 @@ protected void processDumpError(Throwable e) { super.processDumpError(e); } + @Override + protected void removeLogPositionCacheWhenError(String destination) { + if (dumpErrorCount > dumpErrorCountThreshold + 1) { + // 这里+1 是允许主从切换的时候重试一下再清缓存 + logPositionManager.removeLogPositionCache(destination); + } + } + public void setSupportBinlogFormats(String formatStrs) { String[] formats = StringUtils.split(formatStrs, ','); if (formats != null) { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java index 2d7cd49f1d..3e09c49b19 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java @@ -13,4 +13,9 @@ public interface CanalLogPositionManager extends CanalLifeCycle { void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException; + /** + * 一般使用在异常场景清理缓存,这样持久化的存储更新后能够得到最新的数据,不需要重启机器 + */ + default void removeLogPositionCache(String destination){} + } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java index 8d6fea98ff..ed52df9902 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java @@ -80,4 +80,10 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro secondary.persistLogPosition(destination, logPosition); } } + + @Override + public void removeLogPositionCache(String destination) { + primary.removeLogPositionCache(destination); + secondary.removeLogPositionCache(destination); + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java index 4a0ab4a8b3..04720727c2 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManager.java @@ -188,4 +188,9 @@ private LogPosition loadDataFromFile(File dataFile) { throw new CanalMetaManagerException(e); } } + + @Override + public void removeLogPositionCache(String destination) { + memoryLogPositionManager.removeLogPositionCache(destination); + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java index 1658dd4321..d2a676878e 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MemoryLogPositionManager.java @@ -40,4 +40,8 @@ public Set destinations() { return positions.keySet(); } + @Override + public void removeLogPositionCache(String destination) { + positions.remove(destination); + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java index 35b26460ee..44d2e0414f 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MetaLogPositionManager.java @@ -75,4 +75,17 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro // do nothing logger.info("destination [{}] persist LogPosition:{}", destination, logPosition); } + + @Override + public void removeLogPositionCache(String destination) { + List clientIdentities = metaManager.listAllSubscribeInfo(destination); + LogPosition result = null; + if (CollectionUtils.isEmpty(clientIdentities)) { + return; + } + for (ClientIdentity clientIdentity : clientIdentities) { + metaManager.removeCursorsCache(clientIdentity); + } + + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java index 44f9d53405..2b2d2c16d0 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/MixedLogPositionManager.java @@ -81,4 +81,9 @@ public void persistLogPosition(final String destination, final LogPosition logPo } }); } + + @Override + public void removeLogPositionCache(String destination) { + memoryLogPositionManager.removeLogPositionCache(destination); + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java b/parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java index 47d919a664..38f451b317 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/index/PeriodMixedLogPositionManager.java @@ -112,4 +112,9 @@ public void persistLogPosition(String destination, LogPosition logPosition) thro persistTasks.add(destination); memoryLogPositionManager.persistLogPosition(destination, logPosition); } + + @Override + public void removeLogPositionCache(String destination) { + memoryLogPositionManager.removeLogPositionCache(destination); + } } From ec4cc105420d52e84a352ed30a1b9ced19c0d545 Mon Sep 17 00:00:00 2001 From: wizhuo <598381702@qq.com> Date: Thu, 19 Jan 2023 09:41:01 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E7=9A=84=E6=97=B6?= =?UTF-8?q?=E5=80=99=E9=BB=98=E8=AE=A4=E4=B8=8D=E6=B8=85=E4=BD=8D=E7=82=B9?= =?UTF-8?q?=E7=BC=93=E5=AD=98=EF=BC=8C=E5=8F=AA=E6=9C=89mysql=20=E7=A1=AE?= =?UTF-8?q?=E8=AE=A4=E6=98=AF=E4=BD=8D=E7=82=B9=E6=B6=88=E8=B4=B9=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E7=9A=84=E6=97=B6=E5=80=99=E6=89=8D=E6=B8=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../alibaba/otter/canal/parse/inbound/AbstractEventParser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java index 897f2ec708..acc81a987b 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java @@ -479,7 +479,7 @@ protected void processDumpError(Throwable e) { * @param destination */ protected void removeLogPositionCacheWhenError(String destination) { - logPositionManager.removeLogPositionCache(destination); + // do nothing } protected void startHeartBeat(ErosaConnection connection) {