Line data Source code
1 : /*
2 : * Famedly Matrix SDK
3 : * Copyright (C) 2019, 2020, 2021 Famedly GmbH
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU Affero General Public License as
7 : * published by the Free Software Foundation, either version 3 of the
8 : * License, or (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU Affero General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU Affero General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : import 'dart:async';
20 : import 'dart:convert';
21 :
22 : import 'package:collection/collection.dart';
23 :
24 : import 'package:matrix/matrix.dart';
25 : import 'package:matrix/src/models/timeline_chunk.dart';
26 :
27 : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
28 : /// automatically. The initial
29 : /// event list will be retreived when created by the `room.getTimeline()` method.
30 :
31 : class Timeline {
32 : final Room room;
33 39 : List<Event> get events => chunk.events;
34 :
35 : /// Map of event ID to map of type to set of aggregated events
36 : final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
37 :
38 : final void Function()? onUpdate;
39 : final void Function(int index)? onChange;
40 : final void Function(int index)? onInsert;
41 : final void Function(int index)? onRemove;
42 : final void Function()? onNewEvent;
43 :
44 : StreamSubscription<Event>? timelineSub;
45 : StreamSubscription<Event>? historySub;
46 : StreamSubscription<SyncUpdate>? roomSub;
47 : StreamSubscription<String>? sessionIdReceivedSub;
48 : StreamSubscription<String>? cancelSendEventSub;
49 : bool isRequestingHistory = false;
50 : bool isRequestingFuture = false;
51 :
52 : bool allowNewEvent = true;
53 : bool isFragmentedTimeline = false;
54 :
55 : final Map<String, Event> _eventCache = {};
56 :
57 : TimelineChunk chunk;
58 :
59 : /// Searches for the event in this timeline. If not
60 : /// found, requests from the server. Requested events
61 : /// are cached.
62 4 : Future<Event?> getEventById(String id) async {
63 8 : for (final event in events) {
64 8 : if (event.eventId == id) return event;
65 : }
66 4 : if (_eventCache.containsKey(id)) return _eventCache[id];
67 4 : final requestedEvent = await room.getEventById(id);
68 : if (requestedEvent == null) return null;
69 4 : _eventCache[id] = requestedEvent;
70 4 : return _eventCache[id];
71 : }
72 :
73 : // When fetching history, we will collect them into the `_historyUpdates` set
74 : // first, and then only process all events at once, once we have the full history.
75 : // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
76 : // even if /sync's complete while history is being proccessed.
77 : bool _collectHistoryUpdates = false;
78 :
79 : // We confirmed, that there are no more events to load from the database.
80 : bool _fetchedAllDatabaseEvents = false;
81 :
82 2 : bool get canRequestHistory {
83 10 : if (!{Membership.join, Membership.leave}.contains(room.membership)) {
84 : return false;
85 : }
86 4 : if (events.isEmpty) return true;
87 1 : return !_fetchedAllDatabaseEvents ||
88 6 : (room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
89 : }
90 :
91 : /// Request more previous events from the server. [historyCount] defines how many events should
92 : /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
93 : /// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
94 : /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
95 : /// true by default, but this can be overridden.
96 : /// This method does not return a value.
97 4 : Future<void> requestHistory({
98 : int historyCount = Room.defaultHistoryCount,
99 : StateFilter? filter,
100 : }) async {
101 4 : if (isRequestingHistory) {
102 : return;
103 : }
104 :
105 4 : isRequestingHistory = true;
106 4 : await _requestEvents(
107 : direction: Direction.b,
108 : historyCount: historyCount,
109 : filter: filter,
110 : );
111 4 : isRequestingHistory = false;
112 : }
113 :
114 0 : bool get canRequestFuture => !allowNewEvent;
115 :
116 : /// Request more future events from the server. [historyCount] defines how many events should
117 : /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
118 : /// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
119 : /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
120 : /// true by default, but this can be overridden.
121 : /// This method does not return a value.
122 1 : Future<void> requestFuture({
123 : int historyCount = Room.defaultHistoryCount,
124 : StateFilter? filter,
125 : }) async {
126 1 : if (allowNewEvent) {
127 : return; // we shouldn't force to add new events if they will autatically be added
128 : }
129 :
130 1 : if (isRequestingFuture) return;
131 1 : isRequestingFuture = true;
132 1 : await _requestEvents(
133 : direction: Direction.f,
134 : historyCount: historyCount,
135 : filter: filter,
136 : );
137 1 : isRequestingFuture = false;
138 : }
139 :
140 5 : Future<void> _requestEvents({
141 : int historyCount = Room.defaultHistoryCount,
142 : required Direction direction,
143 : StateFilter? filter,
144 : }) async {
145 6 : onUpdate?.call();
146 :
147 : try {
148 : // Look up for events in the database first. With fragmented view, we should delete the database cache
149 5 : final eventsFromStore = isFragmentedTimeline
150 : ? null
151 16 : : await room.client.database.getEventList(
152 4 : room,
153 8 : start: events.length,
154 : limit: historyCount,
155 : );
156 :
157 4 : if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
158 2 : for (final e in eventsFromStore) {
159 1 : addAggregatedEvent(e);
160 : }
161 : // Fetch all users from database we have got here.
162 2 : for (final event in events) {
163 3 : if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
164 : continue;
165 : }
166 : final dbUser =
167 6 : await room.client.database.getUser(event.senderId, room);
168 0 : if (dbUser != null) room.setState(dbUser);
169 : }
170 :
171 1 : if (direction == Direction.b) {
172 2 : events.addAll(eventsFromStore);
173 4 : final startIndex = events.length - eventsFromStore.length;
174 2 : final endIndex = events.length;
175 2 : for (var i = startIndex; i < endIndex; i++) {
176 1 : onInsert?.call(i);
177 : }
178 : } else {
179 0 : events.insertAll(0, eventsFromStore);
180 0 : final startIndex = eventsFromStore.length;
181 : final endIndex = 0;
182 0 : for (var i = startIndex; i > endIndex; i--) {
183 0 : onInsert?.call(i);
184 : }
185 : }
186 : } else {
187 5 : _fetchedAllDatabaseEvents = true;
188 10 : Logs().i('No more events found in the store. Request from server...');
189 :
190 5 : if (isFragmentedTimeline) {
191 1 : await getRoomEvents(
192 : historyCount: historyCount,
193 : direction: direction,
194 : filter: filter,
195 : );
196 : } else {
197 8 : if (room.prev_batch == null) {
198 0 : Logs().i('No more events to request from server...');
199 : } else {
200 8 : await room.requestHistory(
201 : historyCount: historyCount,
202 : direction: direction,
203 4 : onHistoryReceived: () {
204 4 : _collectHistoryUpdates = true;
205 : },
206 : filter: filter,
207 : );
208 : }
209 : }
210 : }
211 : } finally {
212 5 : _collectHistoryUpdates = false;
213 5 : isRequestingHistory = false;
214 6 : onUpdate?.call();
215 : }
216 : }
217 :
218 : /// Request more previous events from the server. [historyCount] defines how much events should
219 : /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
220 : /// the historical events will be published in the onEvent stream. [filter] allows you to specify a
221 : /// [StateFilter] object to filter the events, which can include various criteria such as
222 : /// event types (e.g., [EventTypes.Message]) and other state-related filters.
223 : /// The [StateFilter] object will have [lazyLoadMembers] set to true by default, but this can be overridden.
224 : /// Returns the actual count of received timeline events.
225 1 : Future<int> getRoomEvents({
226 : int historyCount = Room.defaultHistoryCount,
227 : direction = Direction.b,
228 : StateFilter? filter,
229 : }) async {
230 : // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
231 1 : filter ??= StateFilter(lazyLoadMembers: true);
232 1 : filter.lazyLoadMembers ??= true;
233 :
234 3 : final resp = await room.client.getRoomEvents(
235 2 : room.id,
236 : direction,
237 3 : from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
238 : limit: historyCount,
239 2 : filter: jsonEncode(filter.toJson()),
240 : );
241 :
242 1 : if (resp.end == null) {
243 2 : Logs().w('We reached the end of the timeline');
244 : }
245 :
246 2 : final newNextBatch = direction == Direction.b ? resp.start : resp.end;
247 2 : final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
248 :
249 1 : final type = direction == Direction.b
250 : ? EventUpdateType.history
251 : : EventUpdateType.timeline;
252 :
253 3 : if ((resp.state?.length ?? 0) == 0 &&
254 3 : resp.start != resp.end &&
255 : newPrevBatch != null &&
256 : newNextBatch != null) {
257 1 : if (type == EventUpdateType.history) {
258 0 : Logs().w(
259 0 : '[nav] we can still request history prevBatch: $type $newPrevBatch',
260 : );
261 : } else {
262 2 : Logs().w(
263 1 : '[nav] we can still request timeline nextBatch: $type $newNextBatch',
264 : );
265 : }
266 : }
267 :
268 : final newEvents =
269 6 : resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
270 :
271 1 : if (!allowNewEvent) {
272 3 : if (resp.start == resp.end ||
273 2 : (resp.end == null && direction == Direction.f)) {
274 1 : allowNewEvent = true;
275 : }
276 :
277 1 : if (allowNewEvent) {
278 2 : Logs().d('We now allow sync update into the timeline.');
279 1 : newEvents.addAll(
280 5 : await room.client.database.getEventList(room, onlySending: true),
281 : );
282 : }
283 : }
284 :
285 : // Try to decrypt encrypted events but don't update the database.
286 2 : if (room.encrypted && room.client.encryptionEnabled) {
287 0 : for (var i = 0; i < newEvents.length; i++) {
288 0 : if (newEvents[i].type == EventTypes.Encrypted) {
289 0 : newEvents[i] = await room.client.encryption!.decryptRoomEvent(
290 0 : newEvents[i],
291 : );
292 : }
293 : }
294 : }
295 :
296 : // update chunk anchors
297 1 : if (type == EventUpdateType.history) {
298 0 : chunk.prevBatch = newPrevBatch ?? '';
299 :
300 0 : final offset = chunk.events.length;
301 :
302 0 : chunk.events.addAll(newEvents);
303 :
304 0 : for (var i = 0; i < newEvents.length; i++) {
305 0 : onInsert?.call(i + offset);
306 : }
307 : } else {
308 2 : chunk.nextBatch = newNextBatch ?? '';
309 4 : chunk.events.insertAll(0, newEvents.reversed);
310 :
311 3 : for (var i = 0; i < newEvents.length; i++) {
312 2 : onInsert?.call(i);
313 : }
314 : }
315 :
316 1 : if (onUpdate != null) {
317 2 : onUpdate!();
318 : }
319 2 : return resp.chunk.length;
320 : }
321 :
322 13 : Timeline({
323 : required this.room,
324 : this.onUpdate,
325 : this.onChange,
326 : this.onInsert,
327 : this.onRemove,
328 : this.onNewEvent,
329 : required this.chunk,
330 : }) {
331 78 : timelineSub = room.client.onTimelineEvent.stream.listen(
332 18 : (event) => _handleEventUpdate(
333 : event,
334 : EventUpdateType.timeline,
335 : ),
336 : );
337 78 : historySub = room.client.onHistoryEvent.stream.listen(
338 8 : (event) => _handleEventUpdate(
339 : event,
340 : EventUpdateType.history,
341 : ),
342 : );
343 :
344 : // If the timeline is limited we want to clear our events cache
345 65 : roomSub = room.client.onSync.stream
346 84 : .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
347 26 : .listen(_removeEventsNotInThisSync);
348 :
349 13 : sessionIdReceivedSub =
350 65 : room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
351 13 : cancelSendEventSub =
352 78 : room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
353 :
354 : // we want to populate our aggregated events
355 25 : for (final e in events) {
356 12 : addAggregatedEvent(e);
357 : }
358 :
359 : // we are using a fragmented timeline
360 39 : if (chunk.nextBatch != '') {
361 1 : allowNewEvent = false;
362 1 : isFragmentedTimeline = true;
363 : // fragmented timelines never read from the database.
364 1 : _fetchedAllDatabaseEvents = true;
365 : }
366 : }
367 :
368 4 : void _cleanUpCancelledEvent(String eventId) {
369 4 : final i = _findEvent(event_id: eventId);
370 12 : if (i < events.length) {
371 12 : removeAggregatedEvent(events[i]);
372 8 : events.removeAt(i);
373 6 : onRemove?.call(i);
374 6 : onUpdate?.call();
375 : }
376 : }
377 :
378 : /// Removes all entries from [events] which are not in this SyncUpdate.
379 4 : void _removeEventsNotInThisSync(SyncUpdate sync) {
380 29 : final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
381 8 : final keepEventIds = newSyncEvents.map((e) => e.eventId);
382 20 : events.removeWhere((e) => !keepEventIds.contains(e.eventId));
383 : }
384 :
385 : /// Don't forget to call this before you dismiss this object!
386 0 : void cancelSubscriptions() {
387 : // ignore: discarded_futures
388 0 : timelineSub?.cancel();
389 : // ignore: discarded_futures
390 0 : historySub?.cancel();
391 : // ignore: discarded_futures
392 0 : roomSub?.cancel();
393 : // ignore: discarded_futures
394 0 : sessionIdReceivedSub?.cancel();
395 : // ignore: discarded_futures
396 0 : cancelSendEventSub?.cancel();
397 : }
398 :
399 2 : void _sessionKeyReceived(String sessionId) async {
400 : var decryptAtLeastOneEvent = false;
401 2 : Future<void> decryptFn() async {
402 6 : final encryption = room.client.encryption;
403 6 : if (!room.client.encryptionEnabled || encryption == null) {
404 : return;
405 : }
406 7 : for (var i = 0; i < events.length; i++) {
407 4 : if (events[i].type == EventTypes.Encrypted &&
408 0 : events[i].messageType == MessageTypes.BadEncrypted &&
409 0 : events[i].content['session_id'] == sessionId) {
410 0 : events[i] = await encryption.decryptRoomEvent(
411 0 : events[i],
412 : store: true,
413 : updateType: EventUpdateType.history,
414 : );
415 0 : addAggregatedEvent(events[i]);
416 0 : onChange?.call(i);
417 0 : if (events[i].type != EventTypes.Encrypted) {
418 : decryptAtLeastOneEvent = true;
419 : }
420 : }
421 : }
422 : }
423 :
424 8 : await room.client.database.transaction(decryptFn);
425 0 : if (decryptAtLeastOneEvent) onUpdate?.call();
426 : }
427 :
428 : /// Request the keys for undecryptable events of this timeline
429 0 : void requestKeys({
430 : bool tryOnlineBackup = true,
431 : bool onlineKeyBackupOnly = true,
432 : }) {
433 0 : for (final event in events) {
434 0 : if (event.type == EventTypes.Encrypted &&
435 0 : event.messageType == MessageTypes.BadEncrypted &&
436 0 : event.content['can_request_session'] == true) {
437 0 : final sessionId = event.content.tryGet<String>('session_id');
438 0 : final senderKey = event.content.tryGet<String>('sender_key');
439 : if (sessionId != null && senderKey != null) {
440 0 : room.client.encryption?.keyManager.maybeAutoRequest(
441 0 : room.id,
442 : sessionId,
443 : senderKey,
444 : tryOnlineBackup: tryOnlineBackup,
445 : onlineKeyBackupOnly: onlineKeyBackupOnly,
446 : );
447 : }
448 : }
449 : }
450 : }
451 :
452 : /// Set the read marker to the last synced event in this timeline.
453 2 : Future<void> setReadMarker({String? eventId, bool? public}) async {
454 : eventId ??=
455 12 : events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
456 : if (eventId == null) return;
457 4 : return room.setReadMarker(eventId, mRead: eventId, public: public);
458 : }
459 :
460 9 : int _findEvent({String? event_id, String? unsigned_txid}) {
461 : // we want to find any existing event where either the passed event_id or the passed unsigned_txid
462 : // matches either the event_id or transaction_id of the existing event.
463 : // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
464 : // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
465 : // thus meaning we found our element.
466 : final searchNeedle = <String>{};
467 : if (event_id != null) {
468 9 : searchNeedle.add(event_id);
469 : }
470 : if (unsigned_txid != null) {
471 6 : searchNeedle.add(unsigned_txid);
472 : }
473 : int i;
474 34 : for (i = 0; i < events.length; i++) {
475 27 : final searchHaystack = <String>{events[i].eventId};
476 :
477 27 : final txnid = events[i].transactionId;
478 : if (txnid != null) {
479 6 : searchHaystack.add(txnid);
480 : }
481 18 : if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
482 : break;
483 : }
484 : }
485 : return i;
486 : }
487 :
488 5 : void _removeEventFromSet(Set<Event> eventSet, Event event) {
489 5 : eventSet.removeWhere(
490 4 : (e) =>
491 8 : e.matchesEventOrTransactionId(event.eventId) ||
492 4 : event.unsigned != null &&
493 8 : e.matchesEventOrTransactionId(event.transactionId),
494 : );
495 : }
496 :
497 13 : void addAggregatedEvent(Event event) {
498 : // we want to add an event to the aggregation tree
499 13 : final relationshipType = event.relationshipType;
500 13 : final relationshipEventId = event.relationshipEventId;
501 : if (relationshipType == null || relationshipEventId == null) {
502 : return; // nothing to do
503 : }
504 10 : final e = (aggregatedEvents[relationshipEventId] ??=
505 10 : <String, Set<Event>>{})[relationshipType] ??= <Event>{};
506 : // remove a potential old event
507 5 : _removeEventFromSet(e, event);
508 : // add the new one
509 5 : e.add(event);
510 5 : if (onChange != null) {
511 1 : final index = _findEvent(event_id: relationshipEventId);
512 2 : onChange?.call(index);
513 : }
514 : }
515 :
516 6 : void removeAggregatedEvent(Event event) {
517 18 : aggregatedEvents.remove(event.eventId);
518 6 : if (event.transactionId != null) {
519 6 : aggregatedEvents.remove(event.transactionId);
520 : }
521 16 : for (final types in aggregatedEvents.values) {
522 8 : for (final e in types.values) {
523 4 : _removeEventFromSet(e, event);
524 : }
525 : }
526 : }
527 :
528 9 : void _handleEventUpdate(
529 : Event event,
530 : EventUpdateType type, {
531 : bool update = true,
532 : }) {
533 : try {
534 36 : if (event.roomId != room.id) return;
535 :
536 13 : if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
537 : return;
538 : }
539 :
540 9 : if (type == EventUpdateType.timeline) {
541 9 : onNewEvent?.call();
542 : }
543 :
544 9 : if (!allowNewEvent) return;
545 :
546 9 : final status = event.status;
547 :
548 9 : final i = _findEvent(
549 9 : event_id: event.eventId,
550 9 : unsigned_txid: event.transactionId,
551 : );
552 :
553 27 : if (i < events.length) {
554 : // if the old status is larger than the new one, we also want to preserve the old status
555 27 : final oldStatus = events[i].status;
556 18 : events[i] = event;
557 : // do we preserve the status? we should allow 0 -> -1 updates and status increases
558 18 : if ((latestEventStatus(status, oldStatus) == oldStatus) &&
559 11 : !(status.isError && oldStatus.isSending)) {
560 21 : events[i].status = oldStatus;
561 : }
562 27 : addAggregatedEvent(events[i]);
563 11 : onChange?.call(i);
564 : } else {
565 7 : if (type == EventUpdateType.history &&
566 8 : events.indexWhere(
567 16 : (e) => e.eventId == event.eventId,
568 4 : ) !=
569 4 : -1) {
570 : return;
571 : }
572 14 : var index = events.length;
573 7 : if (type == EventUpdateType.history) {
574 8 : events.add(event);
575 : } else {
576 10 : index = events.firstIndexWhereNotError;
577 10 : events.insert(index, event);
578 : }
579 11 : onInsert?.call(index);
580 :
581 7 : addAggregatedEvent(event);
582 : }
583 :
584 : // Handle redaction events
585 18 : if (event.type == EventTypes.Redaction) {
586 6 : final index = _findEvent(event_id: event.redacts);
587 9 : if (index < events.length) {
588 9 : removeAggregatedEvent(events[index]);
589 :
590 : // Is the redacted event a reaction? Then update the event this
591 : // belongs to:
592 3 : if (onChange != null) {
593 3 : final relationshipEventId = events[index].relationshipEventId;
594 : if (relationshipEventId != null) {
595 0 : onChange?.call(_findEvent(event_id: relationshipEventId));
596 : return;
597 : }
598 : }
599 :
600 9 : events[index].setRedactionEvent(event);
601 4 : onChange?.call(index);
602 : }
603 : }
604 :
605 9 : if (update && !_collectHistoryUpdates) {
606 11 : onUpdate?.call();
607 : }
608 : } catch (e, s) {
609 0 : Logs().w('Handle event update failed', e, s);
610 : }
611 : }
612 :
613 0 : @Deprecated('Use [startSearch] instead.')
614 : Stream<List<Event>> searchEvent({
615 : String? searchTerm,
616 : int requestHistoryCount = 100,
617 : int maxHistoryRequests = 10,
618 : String? sinceEventId,
619 : int? limit,
620 : bool Function(Event)? searchFunc,
621 : }) =>
622 0 : startSearch(
623 : searchTerm: searchTerm,
624 : requestHistoryCount: requestHistoryCount,
625 : maxHistoryRequests: maxHistoryRequests,
626 : // ignore: deprecated_member_use_from_same_package
627 : sinceEventId: sinceEventId,
628 : limit: limit,
629 : searchFunc: searchFunc,
630 0 : ).map((result) => result.$1);
631 :
632 : /// Searches [searchTerm] in this timeline. It first searches in the
633 : /// cache, then in the database and then on the server. The search can
634 : /// take a while, which is why this returns a stream so the already found
635 : /// events can already be displayed.
636 : /// Override the [searchFunc] if you need another search. This will then
637 : /// ignore [searchTerm].
638 : /// Returns the List of Events and the next prevBatch at the end of the
639 : /// search.
640 0 : Stream<(List<Event>, String?)> startSearch({
641 : String? searchTerm,
642 : int requestHistoryCount = 100,
643 : int maxHistoryRequests = 10,
644 : String? prevBatch,
645 : @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
646 : int? limit,
647 : bool Function(Event)? searchFunc,
648 : }) async* {
649 0 : assert(searchTerm != null || searchFunc != null);
650 0 : searchFunc ??= (event) =>
651 0 : event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
652 0 : final found = <Event>[];
653 :
654 : if (sinceEventId == null) {
655 : // Search locally
656 0 : for (final event in events) {
657 0 : if (searchFunc(event)) {
658 0 : yield (found..add(event), null);
659 : }
660 : }
661 :
662 : // Search in database
663 0 : var start = events.length;
664 : while (true) {
665 0 : final eventsFromStore = await room.client.database.getEventList(
666 0 : room,
667 : start: start,
668 : limit: requestHistoryCount,
669 : );
670 0 : if (eventsFromStore.isEmpty) break;
671 0 : start += eventsFromStore.length;
672 0 : for (final event in eventsFromStore) {
673 0 : if (searchFunc(event)) {
674 0 : yield (found..add(event), null);
675 : }
676 : }
677 : }
678 : }
679 :
680 : // Search on the server
681 0 : prevBatch ??= room.prev_batch;
682 : if (sinceEventId != null) {
683 : prevBatch =
684 0 : (await room.client.getEventContext(room.id, sinceEventId)).end;
685 : }
686 0 : final encryption = room.client.encryption;
687 0 : for (var i = 0; i < maxHistoryRequests; i++) {
688 : if (prevBatch == null) break;
689 0 : if (limit != null && found.length >= limit) break;
690 : try {
691 0 : final resp = await room.client.getRoomEvents(
692 0 : room.id,
693 : Direction.b,
694 : from: prevBatch,
695 : limit: requestHistoryCount,
696 0 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
697 : );
698 0 : for (final matrixEvent in resp.chunk) {
699 0 : var event = Event.fromMatrixEvent(matrixEvent, room);
700 0 : if (event.type == EventTypes.Encrypted && encryption != null) {
701 0 : event = await encryption.decryptRoomEvent(event);
702 0 : if (event.type == EventTypes.Encrypted &&
703 0 : event.messageType == MessageTypes.BadEncrypted &&
704 0 : event.content['can_request_session'] == true) {
705 : // Await requestKey() here to ensure decrypted message bodies
706 0 : await event.requestKey();
707 : }
708 : }
709 0 : if (searchFunc(event)) {
710 0 : yield (found..add(event), resp.end);
711 0 : if (limit != null && found.length >= limit) break;
712 : }
713 : }
714 0 : prevBatch = resp.end;
715 : // We are at the beginning of the room
716 0 : if (resp.chunk.length < requestHistoryCount) break;
717 0 : } on MatrixException catch (e) {
718 : // We have no permission anymore to request the history
719 0 : if (e.error == MatrixError.M_FORBIDDEN) {
720 : break;
721 : }
722 : rethrow;
723 : }
724 : }
725 : return;
726 : }
727 : }
728 :
729 : extension on List<Event> {
730 5 : int get firstIndexWhereNotError {
731 5 : if (isEmpty) return 0;
732 20 : final index = indexWhere((event) => !event.status.isError);
733 11 : if (index == -1) return length;
734 : return index;
735 : }
736 : }
|