1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
/**
* @author tangfan 2018/12/17 14:22
*/
@Component
public class CanalClient {
private static final Logger logger = LoggerFactory.getLogger(CanalClient.class);
private CanalService canalService;
private DatabaseProperties.CanalConfig canalConfig;
private CanalConnector canalConnector;
private boolean started;
@Autowired
public CanalClient(CanalService canalService, DatabaseProperties databaseProperties) {
this.canalService = canalService;
this.canalConfig = databaseProperties.getCanal();
}
@PostConstruct
public void start() {
if (canalConfig != null && StringUtils.isNotEmpty(canalConfig.getIp())) {
// 创建链接
this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()), canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
logger.info("同步任务初始化完成。Canal 配置:{}", canalConfig);
if (!this.started) {
canalConnector.connect();
canalConnector.subscribe(this.canalConfig.getSubscribe());
canalConnector.rollback();
this.started = true;
}
}
}
@PreDestroy
public void stop() {
if (this.started) {
try {
canalConnector.disconnect();
} catch (CanalClientException exp) {
logger.warn("canal 客户端停止异常", exp);
}
this.started = false;
}
}
/**
* 执行一次
*/
public void handle() {
int batchSize = canalConfig.getBatchSize();
if (batchSize < 0 || batchSize >= 9999) {
throw new CleanServiceException(ExceptionCode.ARGUMENT_ERROR, "同步获取批次参数设置超过界限");
}
if (this.canalConfig.getInterval() == null || this.canalConfig.getInterval().toMillis() <= 0) {
throw new CleanServiceException(ExceptionCode.ARGUMENT_ERROR, "同步获取批次参数设置时间间隔超过界限");
}
long batchId;
Message message = null;
try {
message = canalConnector.getWithoutAck(batchSize);
} catch (Exception exp) {
logger.error("获取canal数据失败", exp);
}
if (message != null) {
batchId = message.getId();
int size = message.getEntries().size();
if (batchId != -1 && size != 0) {
try {
saveEntry(batchId, message.getEntries());
// 提交确认
canalConnector.ack(batchId);
} catch (Exception exp) {
logger.error("canal service 处理失败,batchId:{}", batchId, exp);
// 处理失败, 回滚数据
if (batchId > 0) {
canalConnector.rollback(batchId);
logger.info("回滚批次:{},共计:{}项。", batchId, size);
}
}
}
}
}
/**
* 保存同步实体
*
* @param entrys
*/
private void saveEntry(long batchId, List<CanalEntry.Entry> entrys) {
if (entrys != null) {
logger.info("批次:{}, 确认数量:{}项", batchId, entrys.size());
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
if (rowChage != null) {
CanalEntry.EventType eventType = rowChage.getEventType();
if (rowChage.getRowDatasList() != null) {
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
switch (eventType) {
case INSERT:
canalService.insert(afterColumnsList, schemaName, tableName);
break;
case UPDATE:
canalService.update(afterColumnsList, schemaName, tableName);
break;
case DELETE:
canalService.delete(afterColumnsList, schemaName, tableName);
break;
default:
logger.info("未知操作类型:{}", eventType);
break;
}
}
}
}
}
}
}
}
|