From 613f4424d0c6b26fe8ed10630fadb8e0a75f336a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 27 Apr 2026 13:24:52 +0200 Subject: [PATCH 01/17] feat: strong consistency variant for informer list and byIndex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/reconciler/IndexedResourceCache.java | 4 + .../source/informer/InformerManager.java | 9 ++- .../source/informer/InformerWrapper.java | 5 ++ .../informer/ManagedInformerEventSource.java | 72 ++++++++++++++--- .../informer/TemporaryResourceCache.java | 4 + .../informer/InformerEventSourceTest.java | 78 ++++++++++++++++++- 6 files changed, 157 insertions(+), 15 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java index 5ccc5894a1..b8f8f3381a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java @@ -16,9 +16,13 @@ package io.javaoperatorsdk.operator.api.reconciler; import java.util.List; +import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.HasMetadata; public interface IndexedResourceCache extends ResourceCache { + List byIndex(String indexName, String indexKey); + + Stream byIndexStream(String indexName, String indexKey); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 6632ce631e..a3ef72a2a3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -256,12 +256,13 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + public Stream byIndexStream(String indexName, String indexKey) { + return sources.values().stream().map(s -> s.byIndex(indexName, indexKey)).flatMap(List::stream); + } + @Override public List byIndex(String indexName, String indexKey) { - return sources.values().stream() - .map(s -> s.byIndex(indexName, indexKey)) - .flatMap(List::stream) - .collect(Collectors.toList()); + return byIndexStream(indexName, indexKey).collect(Collectors.toList()); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 2f57f879b8..9ce4b52875 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -195,6 +195,11 @@ public List byIndex(String indexName, String indexKey) { return informer.getIndexer().byIndex(indexName, indexKey); } + @Override + public Stream byIndexStream(String indexName, String indexKey) { + return byIndex(indexName, indexKey).stream(); + } + @Override public String toString() { return informerInfo() + " (" + informer + ')'; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 26543e8322..dbb38d5b40 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -222,11 +222,6 @@ public Optional getCachedValue(ResourceID resourceID) { return get(resourceID); } - @Override - public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); - } - void setTemporalResourceCache(TemporaryResourceCache temporaryResourceCache) { this.temporaryResourceCache = temporaryResourceCache; } @@ -239,19 +234,76 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + @Override + public Stream list(String namespace, Predicate predicate) { + return manager().list(namespace, predicate); + } + + @Override + public Stream list(Predicate predicate) { + return cache.list(predicate); + } + @Override public List byIndex(String indexName, String indexKey) { return manager().byIndex(indexName, indexKey); } - @Override - public Stream keys() { - return cache.keys(); + public Stream byIndexStream(String indexName, String indexKey) { + return manager().byIndexStream(indexName, indexKey); + } + + /** + * Like {@link #list(String, Predicate)} but replaces resources with their newer version from the + * {@link TemporaryResourceCache} when available, to provide stronger consistency. This is needed + * when resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}, which caches the updated + * resource in the {@link TemporaryResourceCache} until the informer catches up. + */ + public Stream listWithStrongConsistency(String namespace, Predicate predicate) { + return replaceWithTempCacheVersions(manager().list(namespace, predicate)); + } + + /** + * Like {@link #list(Predicate)} but replaces resources with their newer version from the {@link + * TemporaryResourceCache} when available, to provide stronger consistency. This is needed when + * resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}, which caches the updated + * resource in the {@link TemporaryResourceCache} until the informer catches up. + */ + public Stream listWithStrongConsistency(Predicate predicate) { + return replaceWithTempCacheVersions(cache.list(predicate)); + } + + /** + * Like {@link #byIndexStream(String, String)} but replaces resources with their newer version + * from the {@link TemporaryResourceCache} when available, to provide stronger consistency. This + * is needed when resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}, which caches the updated + * resource in the {@link TemporaryResourceCache} until the informer catches up. + */ + public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { + return replaceWithTempCacheVersions(manager().byIndexStream(indexName, indexKey)); + } + + private Stream replaceWithTempCacheVersions(Stream stream) { + if (!comparableResourceVersions) { + return stream; + } + var tempResources = temporaryResourceCache.getResources(); + if (tempResources.isEmpty()) { + return stream; + } + return stream.map( + r -> { + var tempResource = tempResources.get(ResourceID.fromResource(r)); + return tempResource != null ? tempResource : r; + }); } @Override - public Stream list(Predicate predicate) { - return cache.list(predicate); + public Stream keys() { + return cache.keys(); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 5a4486f756..62ac8046c3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -275,4 +275,8 @@ private void checkGhostResources() { public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + + synchronized Map getResources() { + return Map.copyOf(cache); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index e60ac02280..0610734335 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -16,11 +16,13 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,6 +59,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -87,10 +90,12 @@ void setup() { when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any())) .thenReturn(Set.of(ResourceID.fromResource(testDeployment()))); when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig); - when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); + when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class); when(informerConfig.getGhostResourceCacheCheckInterval()) .thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL); + when(informerConfig.isComparableResourceVersions()).thenReturn(true); + when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); informerEventSource = spy( new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { @@ -533,6 +538,77 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception)); } + @Test + void listWithStrongConsistencyReplacesResourceFromTempCache() { + var original = testDeployment(); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(original), newer)); + + var mim = mock(InformerManager.class); + when(mim.list(any(String.class), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + + assertThat(result).containsExactly(newer); + } + + @Test + void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { + var original = testDeployment(); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of()); + + var mim = mock(InformerManager.class); + when(mim.list(any(String.class), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void listWithStrongConsistencyReplacesOnlyMatchingResources() { + var dep1 = testDeployment(); + var dep2 = testDeployment(); + dep2.getMetadata().setName("other"); + var newerDep1 = testDeployment(); + newerDep1.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(dep1), newerDep1)); + + var mim = mock(InformerManager.class); + when(mim.list(any(String.class), any())).thenReturn(Stream.of(dep1, dep2)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + + assertThat(result).containsExactly(newerDep1, dep2); + } + + @Test + void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { + var original = testDeployment(); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(original), newer)); + + var mim = mock(InformerManager.class); + when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + + assertThat(result).containsExactly(newer); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta()); From d6486541c595eeeed9989a5960f250d1484a1608 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 28 Apr 2026 16:05:27 +0200 Subject: [PATCH 02/17] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 74 ++++++++++++++++--- .../informer/InformerEventSourceTest.java | 51 +++++++++++-- 2 files changed, 108 insertions(+), 17 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index dbb38d5b40..5c12eee19c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -23,6 +23,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -111,7 +112,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< res.ifPresentOrElse( r -> { R latestResource = (R) r.getResource().orElseThrow(); - // as previous resource version we use the one from successful update, since // we process new event here only if that is more recent then the event from our update. // Note that this is equivalent with the scenario when an informer watch connection @@ -261,7 +261,7 @@ public Stream byIndexStream(String indexName, String indexKey) { * resource in the {@link TemporaryResourceCache} until the informer catches up. */ public Stream listWithStrongConsistency(String namespace, Predicate predicate) { - return replaceWithTempCacheVersions(manager().list(namespace, predicate)); + return replaceWithTempCacheVersions(manager().list(namespace, predicate), namespace, predicate); } /** @@ -272,7 +272,7 @@ public Stream listWithStrongConsistency(String namespace, Predicate predic * resource in the {@link TemporaryResourceCache} until the informer catches up. */ public Stream listWithStrongConsistency(Predicate predicate) { - return replaceWithTempCacheVersions(cache.list(predicate)); + return replaceWithTempCacheVersions(cache.list(predicate), null, (Predicate) null); } /** @@ -283,22 +283,74 @@ public Stream listWithStrongConsistency(Predicate predicate) { * resource in the {@link TemporaryResourceCache} until the informer catches up. */ public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { - return replaceWithTempCacheVersions(manager().byIndexStream(indexName, indexKey)); + return replaceWithTempCacheVersions( + manager().byIndexStream(indexName, indexKey), indexName, indexKey); + } + + private Stream replaceWithTempCacheVersions( + Stream stream, String indexName, String indexKey) { + return replaceWithTempCacheVersions(stream, null, null, indexName, indexKey); + } + + private Stream replaceWithTempCacheVersions( + Stream stream, String namespace, Predicate predicate) { + return replaceWithTempCacheVersions(stream, namespace, predicate, null, null); } - private Stream replaceWithTempCacheVersions(Stream stream) { + private Stream replaceWithTempCacheVersions( + Stream stream, + String namespace, + Predicate predicate, + String indexName, + String indexKey) { if (!comparableResourceVersions) { return stream; } - var tempResources = temporaryResourceCache.getResources(); + + var tempResources = + temporaryResourceCache.getResources().entrySet().stream() + .filter( + e -> { + if (namespace != null) { + var res = + e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false); + if (!res) return false; + } + if (predicate != null) { + return predicate.test(e.getValue()); + } + return true; + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if (tempResources.isEmpty()) { return stream; } - return stream.map( - r -> { - var tempResource = tempResources.get(ResourceID.fromResource(r)); - return tempResource != null ? tempResource : r; - }); + + var upToDateSteam = + stream.map( + r -> { + var resourceID = ResourceID.fromResource(r); + var tempResource = tempResources.get(resourceID); + tempResources.remove(resourceID); + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + return tempResource; + } + return r; + }); + Stream tempResourceStream; + if (indexName != null && indexKey != null) { + var indexer = indexers.get(indexName); + if (indexer == null) { + throw new IllegalArgumentException("Indexer not found for: " + indexName); + } + tempResourceStream = + tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)); + } else { + tempResourceStream = tempResources.values().stream(); + } + return Stream.concat(tempResourceStream, upToDateSteam); } @Override diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 0610734335..6ace46a7b3 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -58,6 +58,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -551,7 +552,7 @@ void listWithStrongConsistencyReplacesResourceFromTempCache() { when(mim.list(any(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); assertThat(result).containsExactly(newer); } @@ -582,13 +583,13 @@ void listWithStrongConsistencyReplacesOnlyMatchingResources() { when(temporaryResourceCache.getResources()) .thenReturn(Map.of(ResourceID.fromResource(dep1), newerDep1)); - var mim = mock(InformerManager.class); - when(mim.list(any(String.class), any())).thenReturn(Stream.of(dep1, dep2)); - doReturn(mim).when(informerEventSource).manager(); + var informerManager = mock(InformerManager.class); + when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2)); + doReturn(informerManager).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); - assertThat(result).containsExactly(newerDep1, dep2); + assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2); } @Test @@ -609,6 +610,44 @@ void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { assertThat(result).containsExactly(newer); } + @Test + void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("5"); + var olderTemp = testDeployment(); + olderTemp.getMetadata().setResourceVersion("3"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); + + var mim = mock(InformerManager.class); + when(mim.list(any(String.class), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("5"); + var olderTemp = testDeployment(); + olderTemp.getMetadata().setResourceVersion("3"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); + + var mim = mock(InformerManager.class); + when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + + assertThat(result).containsExactly(original); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta()); From e52c0869bf359fd57c9d160d7b93ff5175101d55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 28 Apr 2026 17:04:14 +0200 Subject: [PATCH 03/17] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 33 +++++++++++-------- .../informer/TemporaryResourceCache.java | 2 +- .../informer/InformerEventSourceTest.java | 11 ++++--- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 5c12eee19c..debd63b173 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -307,6 +307,11 @@ private Stream replaceWithTempCacheVersions( return stream; } + var streamList = stream.toList(); + log.debug("Stream content before temp cache replacement: {}", streamList); + log.debug("Temporary resource cache content: {}", temporaryResourceCache.getResources()); + stream = streamList.stream(); + var tempResources = temporaryResourceCache.getResources().entrySet().stream() .filter( @@ -327,18 +332,19 @@ private Stream replaceWithTempCacheVersions( return stream; } - var upToDateSteam = - stream.map( - r -> { - var resourceID = ResourceID.fromResource(r); - var tempResource = tempResources.get(resourceID); - tempResources.remove(resourceID); - if (tempResource != null - && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { - return tempResource; - } - return r; - }); + var upToDateList = + stream + .map( + r -> { + var resourceID = ResourceID.fromResource(r); + var tempResource = tempResources.remove(resourceID); + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + return tempResource; + } + return r; + }) + .toList(); Stream tempResourceStream; if (indexName != null && indexKey != null) { var indexer = indexers.get(indexName); @@ -350,7 +356,8 @@ private Stream replaceWithTempCacheVersions( } else { tempResourceStream = tempResources.values().stream(); } - return Stream.concat(tempResourceStream, upToDateSteam); + + return Stream.concat(tempResourceStream, upToDateList.stream()); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 62ac8046c3..f969a4b398 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -277,6 +277,6 @@ public synchronized Optional getResourceFromCache(ResourceID resourceID) { } synchronized Map getResources() { - return Map.copyOf(cache); + return new HashMap<>(cache); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 6ace46a7b3..3bdd80cbe2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -16,6 +16,7 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -601,9 +602,10 @@ void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { when(temporaryResourceCache.getResources()) .thenReturn(Map.of(ResourceID.fromResource(original), newer)); - var mim = mock(InformerManager.class); - when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); - doReturn(mim).when(informerEventSource).manager(); + var informerManager = mock(InformerManager.class); + when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + doReturn(informerManager).when(informerEventSource).manager(); + informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); @@ -621,7 +623,7 @@ void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); var mim = mock(InformerManager.class); - when(mim.list(any(String.class), any())).thenReturn(Stream.of(original)); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); @@ -642,6 +644,7 @@ void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion var mim = mock(InformerManager.class); when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); + informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); From 593bdd93a678c38b03785dffd3cd8c81c160dc49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 28 Apr 2026 17:31:53 +0200 Subject: [PATCH 04/17] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index debd63b173..4413f51c49 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -307,11 +307,6 @@ private Stream replaceWithTempCacheVersions( return stream; } - var streamList = stream.toList(); - log.debug("Stream content before temp cache replacement: {}", streamList); - log.debug("Temporary resource cache content: {}", temporaryResourceCache.getResources()); - stream = streamList.stream(); - var tempResources = temporaryResourceCache.getResources().entrySet().stream() .filter( From 1bf38ec24910033af12d9ce76ec68e5758a56204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 29 Apr 2026 14:21:03 +0200 Subject: [PATCH 05/17] unit test for ghost resource handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/InformerEventSourceTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 3bdd80cbe2..16a3a9bcd4 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -651,6 +651,24 @@ void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion assertThat(result).containsExactly(original); } + @Test + void listWithStrongConsistencyKeepsAddsGhostResources() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(ghostResource), ghostResource)); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + + assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta()); From 1d243ea5076a64c4f077f690c6e62be9bd7f6352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 29 Apr 2026 15:29:29 +0200 Subject: [PATCH 06/17] Update operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 4413f51c49..1fbf4ecf54 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -272,7 +272,7 @@ public Stream listWithStrongConsistency(String namespace, Predicate predic * resource in the {@link TemporaryResourceCache} until the informer catches up. */ public Stream listWithStrongConsistency(Predicate predicate) { - return replaceWithTempCacheVersions(cache.list(predicate), null, (Predicate) null); + return replaceWithTempCacheVersions(cache.list(predicate), null, predicate); } /** From d22960724522a46e8023bfcbcb2b7c81ed3a9810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 29 Apr 2026 16:31:13 +0200 Subject: [PATCH 07/17] Update operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros --- .../event/source/informer/InformerEventSourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 16a3a9bcd4..95c38356d3 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -652,7 +652,7 @@ void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion } @Test - void listWithStrongConsistencyKeepsAddsGhostResources() { + void listWithStrongConsistencyAddsGhostResources() { var resource = testDeployment(); var ghostResource = testDeployment(); ghostResource.getMetadata().setName("ghost"); From 9b23fde0da29189edbf8a994599867c790df7e1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 09:22:46 +0200 Subject: [PATCH 08/17] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/InformerEventSourceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 95c38356d3..7313cc3a48 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -550,7 +550,7 @@ void listWithStrongConsistencyReplacesResourceFromTempCache() { .thenReturn(Map.of(ResourceID.fromResource(original), newer)); var mim = mock(InformerManager.class); - when(mim.list(any(String.class), any())).thenReturn(Stream.of(original)); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); @@ -565,7 +565,7 @@ void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { when(temporaryResourceCache.getResources()).thenReturn(Map.of()); var mim = mock(InformerManager.class); - when(mim.list(any(String.class), any())).thenReturn(Stream.of(original)); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); From 820caadc00961483b62a88dab7333c7bf463afd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 12:42:04 +0200 Subject: [PATCH 09/17] javadoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 1fbf4ecf54..96d93ab194 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -254,33 +254,27 @@ public Stream byIndexStream(String indexName, String indexKey) { } /** - * Like {@link #list(String, Predicate)} but replaces resources with their newer version from the - * {@link TemporaryResourceCache} when available, to provide stronger consistency. This is needed - * when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}, which caches the updated - * resource in the {@link TemporaryResourceCache} until the informer catches up. + * Like {@link #list(String, Predicate)} but to provides stronger consistency. This is needed when + * resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream listWithStrongConsistency(String namespace, Predicate predicate) { return replaceWithTempCacheVersions(manager().list(namespace, predicate), namespace, predicate); } /** - * Like {@link #list(Predicate)} but replaces resources with their newer version from the {@link - * TemporaryResourceCache} when available, to provide stronger consistency. This is needed when + * Like {@link #list(Predicate)} but to provides stronger consistency. This is needed when * resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}, which caches the updated - * resource in the {@link TemporaryResourceCache} until the informer catches up. + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream listWithStrongConsistency(Predicate predicate) { return replaceWithTempCacheVersions(cache.list(predicate), null, predicate); } /** - * Like {@link #byIndexStream(String, String)} but replaces resources with their newer version - * from the {@link TemporaryResourceCache} when available, to provide stronger consistency. This - * is needed when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}, which caches the updated - * resource in the {@link TemporaryResourceCache} until the informer catches up. + * Like {@link #byIndexStream(String, String)} but to provides stronger consistency. This is + * needed when resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { return replaceWithTempCacheVersions( From c2c178209df9fed2d67f1ece3bdd0715e50a49cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 13:13:17 +0200 Subject: [PATCH 10/17] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 38 +++++++++++-------- .../informer/TemporaryResourceCache.java | 4 ++ 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 96d93ab194..a079013702 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -297,25 +297,31 @@ private Stream replaceWithTempCacheVersions( Predicate predicate, String indexName, String indexKey) { - if (!comparableResourceVersions) { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { return stream; } - var tempResources = - temporaryResourceCache.getResources().entrySet().stream() - .filter( - e -> { - if (namespace != null) { - var res = - e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false); - if (!res) return false; - } - if (predicate != null) { - return predicate.test(e.getValue()); - } - return true; - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + var allTempResources = temporaryResourceCache.getResources(); + Map tempResources; + if (namespace == null && predicate == null) { + tempResources = new HashMap<>(allTempResources); + } else { + tempResources = + allTempResources.entrySet().stream() + .filter( + e -> { + if (namespace != null) { + var res = + e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false); + if (!res) return false; + } + if (predicate != null) { + return predicate.test(e.getValue()); + } + return true; + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } if (tempResources.isEmpty()) { return stream; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index f969a4b398..39543843b8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -276,6 +276,10 @@ public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + synchronized boolean isEmpty() { + return cache.isEmpty(); + } + synchronized Map getResources() { return new HashMap<>(cache); } From 0a1988b76d723e16417c952265ea1891ef5d29b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 13:28:33 +0200 Subject: [PATCH 11/17] nameing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index a079013702..ecc6b3b76b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -259,7 +259,8 @@ public Stream byIndexStream(String indexName, String indexKey) { * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream listWithStrongConsistency(String namespace, Predicate predicate) { - return replaceWithTempCacheVersions(manager().list(namespace, predicate), namespace, predicate); + return mergeWithWithTempCacheResources( + manager().list(namespace, predicate), namespace, predicate); } /** @@ -268,7 +269,7 @@ public Stream listWithStrongConsistency(String namespace, Predicate predic * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream listWithStrongConsistency(Predicate predicate) { - return replaceWithTempCacheVersions(cache.list(predicate), null, predicate); + return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); } /** @@ -277,21 +278,21 @@ public Stream listWithStrongConsistency(Predicate predicate) { * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { - return replaceWithTempCacheVersions( + return mergeWithWithTempCacheResources( manager().byIndexStream(indexName, indexKey), indexName, indexKey); } - private Stream replaceWithTempCacheVersions( + private Stream mergeWithWithTempCacheResources( Stream stream, String indexName, String indexKey) { - return replaceWithTempCacheVersions(stream, null, null, indexName, indexKey); + return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey); } - private Stream replaceWithTempCacheVersions( + private Stream mergeWithWithTempCacheResources( Stream stream, String namespace, Predicate predicate) { - return replaceWithTempCacheVersions(stream, namespace, predicate, null, null); + return mergeWithWithTempCacheResources(stream, namespace, predicate, null, null); } - private Stream replaceWithTempCacheVersions( + private Stream mergeWithWithTempCacheResources( Stream stream, String namespace, Predicate predicate, From 2d0af4871cdaf8e36a2250a436b29a9dc14b4c21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 14:48:59 +0200 Subject: [PATCH 12/17] Update operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Chris Laprun Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index ecc6b3b76b..cc5fd9dfc5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -264,7 +264,7 @@ public Stream listWithStrongConsistency(String namespace, Predicate predic } /** - * Like {@link #list(Predicate)} but to provides stronger consistency. This is needed when + * Like {@link #list(Predicate)} but providing stronger consistency. This is needed when * resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ From 17c0fa1bba5266fecc698d46c1ece7ce8cbd5fb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 14:49:21 +0200 Subject: [PATCH 13/17] Update operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Chris Laprun Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index cc5fd9dfc5..6af4217ecd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -254,7 +254,7 @@ public Stream byIndexStream(String indexName, String indexKey) { } /** - * Like {@link #list(String, Predicate)} but to provides stronger consistency. This is needed when + * Like {@link #list(String, Predicate)} but providing stronger consistency. This is needed when * resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ From 9b2e403d7fa8d3b47b6abcc36e13bca5de98c55b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 14:50:11 +0200 Subject: [PATCH 14/17] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Chris Laprun Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 6af4217ecd..c46323c15c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -273,7 +273,7 @@ public Stream listWithStrongConsistency(Predicate predicate) { } /** - * Like {@link #byIndexStream(String, String)} but to provides stronger consistency. This is + * Like {@link #byIndexStream(String, String)} but providing stronger consistency. This is * needed when resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ From ad1941b13b1d6486697fc2240074fc603bbeab65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 14:54:40 +0200 Subject: [PATCH 15/17] format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/ManagedInformerEventSource.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index c46323c15c..307b425f4e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -264,17 +264,16 @@ public Stream listWithStrongConsistency(String namespace, Predicate predic } /** - * Like {@link #list(Predicate)} but providing stronger consistency. This is needed when - * resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + * Like {@link #list(Predicate)} but providing stronger consistency. This is needed when resources + * are updated using {@link io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream listWithStrongConsistency(Predicate predicate) { return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); } /** - * Like {@link #byIndexStream(String, String)} but providing stronger consistency. This is - * needed when resources are updated using {@link + * Like {@link #byIndexStream(String, String)} but providing stronger consistency. This is needed + * when resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { From 322c815f1356ae4958a4a9a04098be191f892a53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 14:59:04 +0200 Subject: [PATCH 16/17] comments for implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/ManagedInformerEventSource.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 307b425f4e..f54f1a08a3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -300,12 +300,12 @@ private Stream mergeWithWithTempCacheResources( if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { return stream; } - var allTempResources = temporaryResourceCache.getResources(); Map tempResources; if (namespace == null && predicate == null) { tempResources = new HashMap<>(allTempResources); } else { + // filtering the temp cache according the user input (predicate, namespace) tempResources = allTempResources.entrySet().stream() .filter( @@ -322,17 +322,19 @@ private Stream mergeWithWithTempCacheResources( }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - if (tempResources.isEmpty()) { return stream; } - var upToDateList = stream .map( r -> { var resourceID = ResourceID.fromResource(r); + // removing the id from the related temp resources + // this is important so we can detect ghost resources: + // all that remains is ghost resource var tempResource = tempResources.remove(resourceID); + // using the latest version if (tempResource != null && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { return tempResource; @@ -341,17 +343,18 @@ private Stream mergeWithWithTempCacheResources( }) .toList(); Stream tempResourceStream; + // ghost resource handling if (indexName != null && indexKey != null) { var indexer = indexers.get(indexName); if (indexer == null) { throw new IllegalArgumentException("Indexer not found for: " + indexName); } + // we check if the ghost resource is part of the index tempResourceStream = tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)); } else { tempResourceStream = tempResources.values().stream(); } - return Stream.concat(tempResourceStream, upToDateList.stream()); } From a7c61aa504a8c1f7350bbef29ba289f99bc228c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 15:02:34 +0200 Subject: [PATCH 17/17] javadoc facelift MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/api/reconciler/ResourceOperations.java | 3 ++- .../source/informer/ManagedInformerEventSource.java | 13 +++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java index de4d00d717..6f9e73d8ed 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java @@ -37,7 +37,8 @@ /** * Provides useful operations to manipulate resources (server-side apply, patch, etc.) in an * idiomatic way, in particular to make sure that the latest version of the resource is present in - * the caches for the next reconciliation. + * the caches for the next reconciliation. In other words provides read-cache-after-write + * consistency. * * @param

the resource type on which this object operates */ diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index f54f1a08a3..69a5f36bf4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -254,8 +254,8 @@ public Stream byIndexStream(String indexName, String indexKey) { } /** - * Like {@link #list(String, Predicate)} but providing stronger consistency. This is needed when - * resources are updated using {@link + * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is + * useful when resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream listWithStrongConsistency(String namespace, Predicate predicate) { @@ -264,16 +264,17 @@ public Stream listWithStrongConsistency(String namespace, Predicate predic } /** - * Like {@link #list(Predicate)} but providing stronger consistency. This is needed when resources - * are updated using {@link io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when + * resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream listWithStrongConsistency(Predicate predicate) { return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); } /** - * Like {@link #byIndexStream(String, String)} but providing stronger consistency. This is needed - * when resources are updated using {@link + * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is + * useful when resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) {