springboot连接canal处理数据

使用方式1

  • 初始化 Canal连接 canal 服务:CanalConnectors.newSingleConnector(hostname,port,destination…)
  • 定时获取canal的Message.getEntries,如果有数据 entries.size > 0,则为行数据变更操作;
  • 根据事件类型(INSERT、UPDATE…),获取变更记录数据,解析处理;

引入canal POM依赖:

...
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
    <exclusions>
        <exclusion>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

创建canal定时抓取canal事件类,注解@Component;

@Scheduled(fixedDelay = 100)
public void run() throws Exception {

    long batchId = -1;
    try {
        int batchSize = 1000;
        //依此从canal种取1000条数据
        Message message = canalConnector.getWithoutAck(batchSize);
        batchId = message.getId();
        List<CanalEntry.Entry> entries = message.getEntries();
        if (batchId != -1 && entries.size() > 0) {
            for (CanalEntry.Entry entry : entries) {
                //若是行记录更新
                if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                    //解析处理
                    handleCanalEvent(entry);
                }
            }
        }

        canalConnector.ack(batchId);
    } catch (Exception e) {
        // 解决断线重新问题;
        logger.error("CanalScheduling-run-001 发生异常: " + e.getMessage(), e);
        try {
            canalConnector = CanalConnectors.newSingleConnector(
                        new InetSocketAddress(canal.hostname, canal.port), canal.destination,
                        canal.userName, canal.passWord);

            canalConnector.connect();
            canalConnector.subscribe("dbname.tablename"); //订阅侦听的"数据库.表名"
            if (batchId != -1) {
                canalConnector.rollback(batchId);
            } else {
                canalConnector.rollback();
            }
        } catch (Exception ex) {
            batchId = -1;
        }
    }
}

以上源码说明:

方法是用@Scheduled注解的,定时100毫秒执行一次;一开始canalConnector是没有初始的,首次执行会先报错到外 catch 进行初始化连接;

有偿试过,如果将 canalConnector 做为一个 @Bean 初化化连接,然后将 canalConnector 注入使用,如果 canal 重启断开,这个 @Bean 的canalConnector 就没再使用;

因此需要将 canalConnector 初始连接入到 定时方法里 (@Scheduled),但又不能每次在开始就重复连接次,因此将canalConnector的连接操作放到 catch,如果有异常,就给他重新连接下;

开始获取 withoutAck 的 Message 信息,每次获取 1000 条数据,判断下 message.getEntries 是否内容(entries.size() > 0);获取每个 entry;

再判断每个entry 是否行变更的entryType,如果是就放到处理方法里处理;

private void handleCanalEvent(CanalEntry.Entry entry) throws Exception {
        //我们可以拿到binlog类型,是创建还是修改或者其它进而根据业务就行修改代码
        //CanalEntry.EventType eventType = entry.getHeader().getEventType();
        // ​
        //获取数据库相关信息
        String database = entry.getHeader().getSchemaName();
        String table = entry.getHeader().getTableName();
        database = database == null ? "": database.toLowerCase();
        table = table == null ? "": table.toLowerCase();

        logger.info("handleCanalEvent-001 当前数据库及表: database: " + database + "; table: " + table);

        //判断是否要处理的数据库与表
        if (!canal.database.equals(database) || !canal.table.equals(table)) {
            return;
        }
        
        CanalEntry.RowChange change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        CanalEntry.EventType eventType = change.getEventType();
        
        for (CanalEntry.RowData rowData : change.getRowDatasList()) {
            //获取变更主键id
            List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
            String primaryKey = "id";
            CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()
                    && primaryKey.equals(column.getName())).findFirst().orElse(null);
            //Map<String, Object> dataMap = parseColumnsToMap(columns);
            //根据数据库,表,id信息重新从数据库捞数据构建索引

            // add ==========================
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            if (eventType == CanalEntry.EventType.DELETE) {
                parseColumnsToMap(rowData.getBeforeColumnsList());
            } else if (eventType == CanalEntry.EventType.INSERT) {
                parseColumnsToMap(rowData.getAfterColumnsList());
            } else {
                parseColumnsToMap(rowData.getBeforeColumnsList());
                parseColumnsToMap(rowData.getAfterColumnsList());
            }
            // =========================
        }
    }

以上代码首先根据 CanalEntry.Entry 的入参,获取该entry的数据库及表名信息,判断是否要处理的数据库及表信息,当然这个可以上 canal 安装服务的 conf/example/instance.properties 上直接限制数据库及表名匹配;

再获取RowData,根据 eventType 获取相应的 rowData数据,如果Delete为getBeforeColumnList();

Map<String,Object> parseColumnsToMap(List<CanalEntry.Column> columns){
    Map<String,Object> jsonMap = new HashMap<>();
    columns.forEach(column -> {
        if(column == null){
            return;
        }
        jsonMap.put(column.getName(), column.getValue());
    });
    return jsonMap;
}

上面 List<CanalEntry.Column> columns 为某数据表行的所有列字段及值;

使用方式2

实现EntryHandler<数据表实体类>接口 来实现侦听数据行变更处理

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;

@CanalTable("tablename") // 要侦听的物理表名
@Component
public class CanalBizTableHandler implements EntryHandler<BizTable> {

    // 注入 BizTableDao 
    @Autowired
    private BizTableDao bizTableDao;

    @Override
    public void insert(BizTableVO bizTable) {
        bizTableDao.insert(bizTable);
        
    }
}

这种看起来比较简单,但前提是源数据要保证 bizTable 所有字段都要有值,或类型正确,不然会报错,而且无法在这里前置判断字段值修改;

BizTable实体类:

@Data
public class BizTable {
 
    @Id
    private Long id;
    @Column(name = "username")
    private String username;
}
欢迎您的到来,感谢您的支持!

为您推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注