-
Notifications
You must be signed in to change notification settings - Fork 237
feat: strong consistency variant for informer list and byIndex #3316
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
613f442
d648654
e52c086
593bdd9
1bf38ec
1d243ea
d229607
9b23fde
820caad
c2c1782
0a1988b
2d0af48
17c0fa1
9b2e403
ad1941b
322c815
a7c61aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| import java.util.function.Function; | ||
| import java.util.function.Predicate; | ||
| import java.util.function.UnaryOperator; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import org.slf4j.Logger; | ||
|
|
@@ -111,7 +112,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< | |
| res.ifPresentOrElse( | ||
| r -> { | ||
| R latestResource = (R) r.getResource().orElseThrow(); | ||
|
|
||
| // as previous resource version we use the one from successful update, since | ||
| // we process new event here only if that is more recent then the event from our update. | ||
| // Note that this is equivalent with the scenario when an informer watch connection | ||
|
|
@@ -222,11 +222,6 @@ public Optional<R> getCachedValue(ResourceID resourceID) { | |
| return get(resourceID); | ||
| } | ||
|
|
||
| @Override | ||
| public Stream<R> list(String namespace, Predicate<R> predicate) { | ||
| return manager().list(namespace, predicate); | ||
| } | ||
|
|
||
| void setTemporalResourceCache(TemporaryResourceCache<R> temporaryResourceCache) { | ||
| this.temporaryResourceCache = temporaryResourceCache; | ||
| } | ||
|
|
@@ -239,19 +234,134 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) { | |
| this.indexers.putAll(indexers); | ||
| } | ||
|
|
||
| @Override | ||
| public Stream<R> list(String namespace, Predicate<R> predicate) { | ||
| return manager().list(namespace, predicate); | ||
| } | ||
|
|
||
| @Override | ||
| public Stream<R> list(Predicate<R> predicate) { | ||
| return cache.list(predicate); | ||
| } | ||
|
|
||
| @Override | ||
| public List<R> byIndex(String indexName, String indexKey) { | ||
| return manager().byIndex(indexName, indexKey); | ||
| } | ||
|
|
||
| @Override | ||
| public Stream<ResourceID> keys() { | ||
| return cache.keys(); | ||
| public Stream<R> byIndexStream(String indexName, String indexKey) { | ||
| return manager().byIndexStream(indexName, indexKey); | ||
| } | ||
|
|
||
| /** | ||
| * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is | ||
| * useful when resources are updated using {@link | ||
| * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. | ||
| */ | ||
| public Stream<R> listWithStrongConsistency(String namespace, Predicate<R> predicate) { | ||
| return mergeWithWithTempCacheResources( | ||
| manager().list(namespace, predicate), namespace, predicate); | ||
| } | ||
|
|
||
| /** | ||
| * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when | ||
| * resources are updated using {@link | ||
| * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. | ||
| */ | ||
| public Stream<R> listWithStrongConsistency(Predicate<R> predicate) { | ||
| return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); | ||
| } | ||
|
|
||
| /** | ||
| * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is | ||
| * useful when resources are updated using {@link | ||
| * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. | ||
| */ | ||
| public Stream<R> byIndexStreamWithStrongConsistency(String indexName, String indexKey) { | ||
| return mergeWithWithTempCacheResources( | ||
| manager().byIndexStream(indexName, indexKey), indexName, indexKey); | ||
| } | ||
|
|
||
| private Stream<R> mergeWithWithTempCacheResources( | ||
| Stream<R> stream, String indexName, String indexKey) { | ||
| return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey); | ||
| } | ||
|
|
||
| private Stream<R> mergeWithWithTempCacheResources( | ||
| Stream<R> stream, String namespace, Predicate<R> predicate) { | ||
| return mergeWithWithTempCacheResources(stream, namespace, predicate, null, null); | ||
| } | ||
|
|
||
| private Stream<R> mergeWithWithTempCacheResources( | ||
| Stream<R> stream, | ||
| String namespace, | ||
| Predicate<R> predicate, | ||
| String indexName, | ||
| String indexKey) { | ||
| if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { | ||
| return stream; | ||
| } | ||
| var allTempResources = temporaryResourceCache.getResources(); | ||
| Map<ResourceID, R> tempResources; | ||
| if (namespace == null && predicate == null) { | ||
| tempResources = new HashMap<>(allTempResources); | ||
| } else { | ||
| // filtering the temp cache according the user input (predicate, namespace) | ||
| tempResources = | ||
| allTempResources.entrySet().stream() | ||
| .filter( | ||
| e -> { | ||
| if (namespace != null) { | ||
| var res = | ||
| e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false); | ||
| if (!res) return false; | ||
| } | ||
| if (predicate != null) { | ||
| return predicate.test(e.getValue()); | ||
| } | ||
| return true; | ||
| }) | ||
| .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
| } | ||
| if (tempResources.isEmpty()) { | ||
| return stream; | ||
| } | ||
| var upToDateList = | ||
| stream | ||
| .map( | ||
| r -> { | ||
| var resourceID = ResourceID.fromResource(r); | ||
| // removing the id from the related temp resources | ||
| // this is important so we can detect ghost resources: | ||
| // all that remains is ghost resource | ||
| var tempResource = tempResources.remove(resourceID); | ||
| // using the latest version | ||
| if (tempResource != null | ||
| && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { | ||
| return tempResource; | ||
| } | ||
| return r; | ||
| }) | ||
| .toList(); | ||
| Stream<R> tempResourceStream; | ||
| // ghost resource handling | ||
| if (indexName != null && indexKey != null) { | ||
| var indexer = indexers.get(indexName); | ||
| if (indexer == null) { | ||
| throw new IllegalArgumentException("Indexer not found for: " + indexName); | ||
| } | ||
| // we check if the ghost resource is part of the index | ||
| tempResourceStream = | ||
|
csviri marked this conversation as resolved.
|
||
| tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)); | ||
| } else { | ||
| tempResourceStream = tempResources.values().stream(); | ||
| } | ||
| return Stream.concat(tempResourceStream, upToDateList.stream()); | ||
| } | ||
|
|
||
|
Comment on lines
+339
to
361
|
||
| @Override | ||
| public Stream<R> list(Predicate<R> predicate) { | ||
| return cache.list(predicate); | ||
| public Stream<ResourceID> keys() { | ||
| return cache.keys(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.