使用条件限制
- 只用于 replica sets 和 sharded clusters ,单节点因为没有oplog故不支持。
- 复制协议必须是pv1 存储引擎必须是 WiredTiger
- MongoDB 3.6 版本只实现了集合粒度,4.0版本支持集群及库级别的ChangeStream
- ChangeStream 只发布持久化到大多数(majority-committed)节点的数据变化通知
- 要想在集合上创建ChangeStream游标用户必须对集合具有读权限
- 对于分片集合带有multi:true 的更新操作可能会导致发布孤立文档的变更消息,operationType=invalidate表示无效或非法操作
使用方式
原生
public static void main(String[] args) {String databaseName = "demo";String collectionName = "demo";MongoClient mongoClient = MongoClients.create("mongodb://intbee:mongoPass@127.0.0.1:37017,127.0.0.1:37018,127.0.0.1:37019/?replicaSet=mongos&authSource=admin&authMechanism=SCRAM-SHA-1");MongoDatabase db = mongoClient.getDatabase(databaseName);MongoCollection<Document> coll = db.getCollection(collectionName);// insert事件,update事件,delete事件,replace事件,invalidate事件List<Bson> pipeline = java.util.Collections.singletonList(Aggregates.match(Filters.or(Document.parse("{}"),Filters.in("operationType", Arrays.asList("insert", "update","replace", "delete")))));MongoCursor<ChangeStreamDocument<Document>> cursor = coll.watch(pipeline)//更新返回全部数据.fullDocument(FullDocument.UPDATE_LOOKUP).iterator();while (cursor.hasNext()) {ChangeStreamDocument<Document> next = cursor.next();String Operation = next.getOperationType().getValue();System.out.println(next.getDocumentKey().getObjectId("_id").getValue().toHexString());// insert,replace、deleteSystem.out.println("Operation : "+Operation);System.out.println("DocumentKey : "+next.getDocumentKey());System.out.println("FullDocument : "+next.getFullDocument());// delete or update=nullSystem.out.println("UpdateDescription : "+next.getUpdateDescription());// update: removedFields=[], updatedFields={"f":31.0}}}
Springboot
```java spring: data: mongodb: uri: mongodb://intbee:mongoPass@127.0.0.1:37017,127.0.0.1:37018,127.0.0.1:37019/demo?replicaSet=mongos&authSource=admin&authMechanism=SCRAM-SHA-1 /**- */ package com.qizai.modules.mongodb.config;
import java.util.concurrent.Executor; import java.util.concurrent.Executors;
import org.bson.Document; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest; import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer; import org.springframework.data.mongodb.core.messaging.Message; import org.springframework.data.mongodb.core.messaging.MessageListener; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import org.springframework.data.mongodb.core.query.Criteria;
import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument;
import lombok.Data; import lombok.extern.slf4j.Slf4j;
@Slf4j @Configuration public class MongoListenerConfig {
@Autowiredprivate MongoProperties mongoProperties;@BeanMongoClient mongoClient() {//解决spring boot 连接replicaSet问题return MongoClients.create(mongoProperties.getUri());}@Bean // 启动开始监听MessageListenerContainer messageListenerContainer(MongoTemplate template) {Executor executor = Executors.newSingleThreadExecutor();return new DefaultMessageListenerContainer(template, executor) {@Overridepublic boolean isAutoStartup() {return true;}};}@Autowiredprivate MessageListenerContainer messageListenerContainer;@EventListener(ApplicationStartedEvent.class) // 应用启动完后开始注册监听请求public void subscript() {DocMessageListener docMessageListener = new DocMessageListener();ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(docMessageListener)// 指定表.collection("demo").filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert", "update", "replace", "delete"))))// 更新返回全部数据.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();messageListenerContainer.register(request, Document.class);}// 集合监听处理class DocMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {@Overridepublic void onMessage(Message<ChangeStreamDocument<Document>, Document> message) {Document obj = message.getBody();// MongoDataAutoConfiguration// TODOlog.info("MessageListener : {}", obj);}}
}
```
