您现在的位置是:主页 > news > 个人可以做电视台网站吗/百度权重域名

个人可以做电视台网站吗/百度权重域名

admin2025/4/29 2:08:08news

简介个人可以做电视台网站吗,百度权重域名,由音乐学院做的网站,项目招商网站大全只有集群模式的mongo才能够使用changeStream的功能, 这个功能主要就是能够实时的监听到mongo数据库的变更, 并且将变更的信息获取到, changeStream的原理就是一直去监听mongo的 opLog 日志这个文件的变更, 去读取这个文件&#xf…

个人可以做电视台网站吗,百度权重域名,由音乐学院做的网站,项目招商网站大全只有集群模式的mongo才能够使用changeStream的功能, 这个功能主要就是能够实时的监听到mongo数据库的变更, 并且将变更的信息获取到, changeStream的原理就是一直去监听mongo的 opLog 日志这个文件的变更, 去读取这个文件&#xf…
只有集群模式的mongo才能够使用changeStream的功能,
这个功能主要就是能够实时的监听到mongo数据库的变更,
并且将变更的信息获取到,
changeStream的原理就是一直去监听mongo的 opLog 日志这个文件的变更,
去读取这个文件,众所周知,我们对于mongo数据库的操作都会在opLog
这个文件记录下来,我们只要获取到opLog变更的信息就可以
而不管是怎么封装,changeStream的原理都是一样的,就是一直不停的去
读取opLog这个文件,然后取得更新的消息,最后将消息返回,
当然,这样的操作也支持一些过滤的条件和一些参数

Mongo原生驱动使用changeStream功能

主要的核心就是几个依赖

<properties><driver.version>4.1.1</driver.version></properties>
<dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-sync</artifactId><version>${driver.version}</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-core</artifactId><version>4.1.1</version></dependency><!-- reactive --><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-reactivestreams</artifactId><version>${driver.version}</version></dependency>

操作的方式也比较简洁

   public class MongoChangeStreamTest {private MongoClient mongoClient;private MongoDatabase database;@Beforepublic void init(){mongoClient = MongoClients.create("mongodb://127.0.0.1:37017,127.0.0.1:37018,127.0.0.1:37019");database = mongoClient.getDatabase("myrepo");}@Testpublic void testChange(){MongoCollection<Document> collection = database.getCollection("cs");Document doc = new Document("name", "MongoDB").append("type", "database").append("count", 1).append("age", 21).append("info", "xx");collection.insertOne(doc);}@Testpublic void testChangeStream(){MongoCollection<Document> collection = database.getCollection("cs");collection.find().forEach(doc -> System.out.println(doc.toJson()));ChangeStreamIterable<Document> watch = collection.watch();MongoCursor<ChangeStreamDocument<Document>> iterator = watch.iterator();while (iterator.hasNext()){System.out.println(iterator.next());}}@Testpublic void testWatch(){MongoCollection<Document> collection = database.getCollection("cs");collection.watch(asList(Aggregates.match(Filters.in("operationType", asList("insert", "update", "replace", "delete"))))).fullDocument(FullDocument.UPDATE_LOOKUP).forEach(System.out::println);}@Afterpublic void close(){mongoClient.close();}
}

Spring操作changeStream

以上是几种用法,这些都是mongo给我们提供的java驱动包自带的,当然,我们也可以使用spring帮助我们整合好的,
首先还是引入依赖

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency>

Spring为我们提供了可以配置的使用方式,总的意思就是在Spring里面有一个监听器的容器,我可以往里面放入我自己定义的监听器,自己定义的监听器当然可以选择各种各样的操作类型,好比我想要过滤一些操作,监听一些集合的操作之类的,这些都可以通过配置的方式去进行实现,

@Configuration
public class MongoConfig {@BeanMessageListenerContainer messageListenerContainer(MongoTemplate template, DocumnetMessageListener documnetMessageListener) {Executor executor = Executors.newSingleThreadExecutor();MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {@Overridepublic boolean isAutoStartup() {return true;}};ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(documnetMessageListener).collection("cs")  //需要监听的集合名,不指定默认监听数据库的.filter(newAggregation(match(where("operationType").in("insert", "update", "replace"))))  //过滤需要监听的操作类型,可以根据需求指定过滤条件.fullDocumentLookup(FullDocument.UPDATE_LOOKUP)  //不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息.build();messageListenerContainer.register(request, Document.class);return messageListenerContainer;}}@Component
@Slf4j
public class DocumnetMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {@Overridepublic void onMessage(Message<ChangeStreamDocument<Document>, Document> message) {System.out.println("receive message: " + message);log.info("Received Message in collection: {},message raw: {}, message body:{}",message.getProperties().getCollectionName(), message.getRaw(), message.getBody());}
}

大概就是上面的两个配置类,一个配置类,指定将监听的容器注入到Spring容器里面,然后指定监听的规则,另外一个配置好的就是将自己定义的监听器注入到容器里面,然后实现监听到消息之后到规则。

使用起来也特别到简单,下面分享一下测试用例
简而言之,就是让我们这个容器启动起来产生作用,然后让当前的线程保持下去。

   @Testpublic void testWatch3() throws InterruptedException {System.out.println(mars.getDatabase().getName());messageListenerContainer.start();Thread.currentThread().join();}

重点来了

你以为这个就结束了吗?
可怜我接到的任务并不是了解他们怎么用,然后在项目当中去进行使用,我的任务是分析Spring是如何实现的,然后将这个实现写入到我们当前到项目中,让别人可以像使用Spring一样很轻松的实现这个功能。通俗点来讲,也就是我们要自己对这方面写一个框架,像Spring一样,我在网上翻阅了一下,github上面其实也有很多关于mongodb的框架,例如morphia,但是它好像并没有支持这个功能,最后俺还是选择了抄袭Spring。

首先去Spring的源代码找一下这方面的代码
在这里插入图片描述
大概就这样的结构,关于这方面的核心代码就是这些,当然也会用到其他方面的功能,我看到的好像有Aggregation、和Converter方面的功能,这方面感觉也是一个很大的模块,所以暂时实在是没有能力去搭理它,稍后我会说明一下他们各自的功能以及尽可能的替代方式。

在这里插入图片描述
可以看到这个整体的框架,起到启动作用的就是CursorReadingTask这个类,然后ChangeStreamTask是其中的一个实现
在这个类里面有一个比较重要的方法实现是初始化游标

@Overrideprotected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate template, RequestOptions options,Class<?> targetType) {List<Document> filter = Collections.emptyList();BsonDocument resumeToken = new BsonDocument();Collation collation = null;FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT: FullDocument.UPDATE_LOOKUP;BsonTimestamp startAt = null;boolean resumeAfter = true;if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions) {ChangeStreamOptions changeStreamOptions = ((ChangeStreamRequestOptions) options).getChangeStreamOptions();filter = prepareFilter(template, changeStreamOptions);if (changeStreamOptions.getFilter().isPresent()) {Object val = changeStreamOptions.getFilter().get();if (val instanceof Aggregation) {collation = ((Aggregation) val).getOptions().getCollation().map(org.springframework.data.mongodb.core.query.Collation::toMongoCollation).orElse(null);}}if (changeStreamOptions.getResumeToken().isPresent()) {resumeToken = changeStreamOptions.getResumeToken().get().asDocument();resumeAfter = changeStreamOptions.isResumeAfter();}fullDocument = changeStreamOptions.getFullDocumentLookup().orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT: FullDocument.UPDATE_LOOKUP);startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);}MongoDatabase db = StringUtils.hasText(options.getDatabaseName())? template.getMongoDbFactory().getMongoDatabase(options.getDatabaseName()): template.getDb();ChangeStreamIterable<Document> iterable;if (StringUtils.hasText(options.getCollectionName())) {iterable = filter.isEmpty() ? db.getCollection(options.getCollectionName()).watch(Document.class): db.getCollection(options.getCollectionName()).watch(filter, Document.class);} else {iterable = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);}if (!options.maxAwaitTime().isZero()) {iterable = iterable.maxAwaitTime(options.maxAwaitTime().toMillis(), TimeUnit.MILLISECONDS);}if (!resumeToken.isEmpty()) {if (resumeAfter) {iterable = iterable.resumeAfter(resumeToken);} else {iterable = iterable.startAfter(resumeToken);}}if (startAt != null) {iterable.startAtOperationTime(startAt);}if (collation != null) {iterable = iterable.collation(collation);}iterable = iterable.fullDocument(fullDocument);return iterable.iterator();}

这里面需要把Mongo的操作模版传入进去,还有操作选项,这个操作选项RequestOptions我们可以很明显的看出ChangeStreamRequest的内部类ChangeStreamRequestOptions对它有一个实现,还有准备过滤器这个方法,

List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options) {if (!options.getFilter().isPresent()) {return Collections.emptyList();}Object filter = options.getFilter().orElse(null);if (filter instanceof Aggregation) {Aggregation agg = (Aggregation) filter;AggregationOperationContext context = agg instanceof TypedAggregation? new TypeBasedAggregationOperationContext(((TypedAggregation<?>) agg).getInputType(),template.getConverter().getMappingContext(), queryMapper): Aggregation.DEFAULT_CONTEXT;return agg.toPipeline(new PrefixingDelegatingAggregationOperationContext(context, "fullDocument", denylist));}if (filter instanceof List) {return (List<Document>) filter;}throw new IllegalArgumentException("ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");}

可以看出我们使用到了Aggregation这个功能,这个功能可以看到主要实现到是过滤器类似到操作,又看到还要使用到操作模版里面到类型转换器,类型转换器这方面又是一个巨复杂到问题,记得好像是SpringMVC方面最先提出来的,甚至也可以像我们这个功能一下可以手动@Bean往容器里面注入一个,
由于我们并没有使用操作mongo的操作模版,而是使用了自己的操作模版,所以没有办法使用,转换器也没有那么的灵活,变成了静态的工具类(这个代码我觉得真是太没有灵魂了)过滤器暂时还不能理解,所以最终,我选择放弃,
所以最终它是这个样子的

List<Document> prepareFilter(MongoTemplate template, ChangeStreamOptions options) {if (!options.getFilter().isPresent()) {return Collections.emptyList();}Object filter = options.getFilter().orElse(null);if (filter instanceof List) {return (List<Document>) filter;}throw new IllegalArgumentException("ChangeStreamRequestOptions.filter mut be either an Aggregation or a plain list of Documents");
}

其他的地方大致就不需要改动什么了最后项目成功启动,不过有一个问题就是,目前并不支持各种操作的过滤,这个和Aggregation这个功能有关,以后会持续性的改进一下。

后面说的迷迷糊糊的,感觉也很难看得懂,如果有问题可以加我的WX交流一下 q1050564479 。