ExtendedCatalogFacade improvements for handling generic CatalogInfos

* Add methods to ExtendedCatalogFacade to handle add/remove operations
for CatalogInfos in a generic way.

* Improve BusAmqpIntegrationTests to check incoming remote objects are fully resolved
* Refactor remote events configuration separating autoconfiguration from configuration
This commit is contained in:
Gabriel Roldan 2024-02-11 19:26:34 -03:00
parent e64e75eae3
commit 63296ac6f5
41 changed files with 1187 additions and 479 deletions

2
.gitignore vendored
View File

@ -41,3 +41,5 @@ docker-compose_datadir*
# Yourkit Java Profiler docker compose env file with private token
.env.yjp
hs_err_pid*.log

View File

@ -21,6 +21,8 @@ import org.geoserver.cloud.event.catalog.CatalogInfoRemoved;
import org.geoserver.cloud.event.info.ConfigInfoType;
import org.geoserver.cloud.event.info.InfoEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import java.util.Optional;
@ -53,6 +55,7 @@ public class RemoteEventResourcePoolProcessor {
}
@EventListener(CatalogInfoRemoved.class)
@Order(Ordered.HIGHEST_PRECEDENCE)
public void onCatalogRemoteRemoveEvent(CatalogInfoRemoved event) {
event.remote()
.ifPresentOrElse(
@ -61,6 +64,7 @@ public class RemoteEventResourcePoolProcessor {
}
@EventListener(CatalogInfoModified.class)
@Order(Ordered.HIGHEST_PRECEDENCE)
public void onCatalogRemoteModifyEvent(CatalogInfoModified event) {
event.remote()
.ifPresentOrElse(
@ -86,11 +90,11 @@ public class RemoteEventResourcePoolProcessor {
info.ifPresentOrElse(
object -> {
log.info(
log.debug(
"Evicting ResourcePool cache entry for {}({}) upon {}",
infoType,
id,
event);
event.toShortString());
ResourcePool resourcePool = rawCatalog.getResourcePool();
CacheClearingListener cleaner = new CacheClearingListener(resourcePool);
object.accept(cleaner);

View File

@ -43,5 +43,17 @@
<artifactId>spring-boot-autoconfigure-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.geoserver</groupId>
<artifactId>gs-main</artifactId>
<version>${gs.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -4,7 +4,7 @@
*/
package org.geoserver.cloud.autoconfigure.catalog.backend.datadir;
import org.geoserver.catalog.plugin.ExtendedCatalogFacade;
import org.geoserver.catalog.plugin.CatalogPlugin;
import org.geoserver.cloud.autoconfigure.catalog.event.ConditionalOnCatalogEvents;
import org.geoserver.cloud.event.remote.datadir.RemoteEventDataDirectoryProcessor;
import org.geoserver.config.plugin.RepositoryGeoServerFacade;
@ -20,7 +20,7 @@ public class RemoteEventDataDirectoryAutoConfiguration {
@Bean
RemoteEventDataDirectoryProcessor dataDirectoryRemoteEventProcessor(
@Qualifier("geoserverFacade") RepositoryGeoServerFacade configFacade,
@Qualifier("catalogFacade") ExtendedCatalogFacade catalogFacade) {
return new RemoteEventDataDirectoryProcessor(configFacade, catalogFacade);
@Qualifier("rawCatalog") CatalogPlugin rawCatalog) {
return new RemoteEventDataDirectoryProcessor(configFacade, rawCatalog);
}
}

View File

@ -11,14 +11,11 @@ import lombok.extern.slf4j.Slf4j;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.DataStoreInfo;
import org.geoserver.catalog.Info;
import org.geoserver.catalog.LayerGroupInfo;
import org.geoserver.catalog.LayerInfo;
import org.geoserver.catalog.NamespaceInfo;
import org.geoserver.catalog.ResourceInfo;
import org.geoserver.catalog.StoreInfo;
import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.impl.ModificationProxy;
import org.geoserver.catalog.impl.ResolvingProxy;
import org.geoserver.catalog.plugin.CatalogPlugin;
import org.geoserver.catalog.plugin.ExtendedCatalogFacade;
import org.geoserver.catalog.plugin.Patch;
import org.geoserver.cloud.event.UpdateSequenceEvent;
@ -30,6 +27,7 @@ import org.geoserver.cloud.event.info.InfoAdded;
import org.geoserver.cloud.event.info.InfoModified;
import org.geoserver.cloud.event.info.InfoRemoved;
import org.geoserver.config.GeoServerInfo;
import org.geoserver.config.LoggingInfo;
import org.geoserver.config.ServiceInfo;
import org.geoserver.config.SettingsInfo;
import org.geoserver.config.plugin.RepositoryGeoServerFacade;
@ -37,25 +35,44 @@ import org.springframework.context.event.EventListener;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/** Listens to {@link RemoteCatalogEvent}s and updates the local catalog */
/**
* Listens to {@link RemoteCatalogEvent}s and updates the local catalog
*
* @since 1.0
*/
@Slf4j(topic = "org.geoserver.cloud.event.remote.datadir")
@RequiredArgsConstructor
public class RemoteEventDataDirectoryProcessor {
private final @NonNull RepositoryGeoServerFacade configFacade;
private final @NonNull ExtendedCatalogFacade catalogFacade;
private final @NonNull CatalogPlugin rawCatalog;
@EventListener(classes = {UpdateSequenceEvent.class})
public void onUpdateSequenceEvent(UpdateSequenceEvent updateSequenceEvent) {
if (updateSequenceEvent.isRemote()) {
final Long updateSequence = updateSequenceEvent.getUpdateSequence();
log.debug("processing update sequence {} from {}", updateSequence, updateSequenceEvent);
GeoServerInfo info = ModificationProxy.unwrap(configFacade.getGlobal());
long current = info.getUpdateSequence();
ExtendedCatalogFacade catalogFacade() {
return rawCatalog.getFacade();
}
@EventListener(UpdateSequenceEvent.class)
void onRemoteUpdateSequenceEvent(UpdateSequenceEvent event) {
if (event.isLocal()) {
return;
}
final long updateSequence = event.getUpdateSequence();
GeoServerInfo info = ModificationProxy.unwrap(configFacade.getGlobal());
final long current = info.getUpdateSequence();
if (updateSequence > current) {
info.setUpdateSequence(updateSequence);
log.info("replaced update sequence {} by {}", current, updateSequence);
log.debug(
"replaced update sequence {} by {} due to reomote event {}",
current,
updateSequence,
event.toShortString());
} else {
log.debug(
"remote event has update sequence {} lower than locally seen {}, leaving it untouched. {}",
updateSequence,
current,
event.toShortString());
}
}
@ -66,46 +83,37 @@ public class RemoteEventDataDirectoryProcessor {
}
final ConfigInfoType type = event.getObjectType();
final String objectId = event.getObjectId();
switch (type) {
case NAMESPACE:
remove(objectId, catalogFacade::getNamespace, catalogFacade::remove);
break;
case WORKSPACE:
remove(objectId, catalogFacade::getWorkspace, catalogFacade::remove);
break;
case COVERAGE, FEATURETYPE, WMSLAYER, WMTSLAYER:
remove(
objectId,
id -> catalogFacade.getResource(id, ResourceInfo.class),
catalogFacade::remove);
break;
case COVERAGESTORE, DATASTORE, WMSSTORE, WMTSSTORE:
remove(
objectId,
id -> catalogFacade.getStore(id, StoreInfo.class),
catalogFacade::remove);
break;
case LAYERGROUP:
remove(objectId, catalogFacade::getLayerGroup, catalogFacade::remove);
break;
case LAYER:
remove(objectId, catalogFacade::getLayer, catalogFacade::remove);
break;
case STYLE:
remove(objectId, catalogFacade::getStyle, catalogFacade::remove);
break;
case SERVICE:
remove(
objectId,
id -> configFacade.getService(id, ServiceInfo.class),
configFacade::remove);
break;
case SETTINGS:
remove(objectId, configFacade::getSettings, configFacade::remove);
break;
default:
log.warn("Don't know how to handle remote remove envent for {}", event);
break;
final ExtendedCatalogFacade facade = catalogFacade();
if (type.isA(CatalogInfo.class)) {
Class<? extends CatalogInfo> cinfotype = type.type();
facade.get(objectId, cinfotype)
.ifPresentOrElse(
info -> {
facade.remove(info);
log.debug("Removed {}({}), from local catalog", type, objectId);
},
() ->
log.warn(
"Can't remove {}({}), not present in local catalog",
type,
objectId));
} else {
boolean removed =
switch (type) {
case SERVICE -> remove(
objectId,
id -> configFacade.getService(id, ServiceInfo.class),
configFacade::remove);
case SETTINGS -> remove(
objectId, configFacade::getSettings, configFacade::remove);
default -> false;
};
if (removed) {
log.debug("Removed {}({}), from local config", type, objectId);
} else {
log.warn("Can't remove {}({}), not present in local config", type, objectId);
}
}
}
@ -116,76 +124,19 @@ public class RemoteEventDataDirectoryProcessor {
}
final String objectId = event.getObjectId();
final ConfigInfoType type = event.getObjectType();
log.debug("Handling remote add event {}({})", type, objectId);
if (log.isDebugEnabled()) log.debug("Adding object from event {}", event.toShortString());
final Info object = event.getObject();
if (object == null) {
log.error("Remote add event didn't send the object payload for {}({})", type, objectId);
return;
}
switch (type) {
case NAMESPACE:
catalogFacade.add((NamespaceInfo) object);
break;
case WORKSPACE:
catalogFacade.add((WorkspaceInfo) object);
break;
case COVERAGE, FEATURETYPE, WMSLAYER, WMTSLAYER:
catalogFacade.add((ResourceInfo) object);
break;
case COVERAGESTORE, DATASTORE, WMSSTORE, WMTSSTORE:
catalogFacade.add((StoreInfo) object);
break;
case LAYERGROUP:
catalogFacade.add((LayerGroupInfo) object);
break;
case LAYER:
catalogFacade.add((LayerInfo) object);
break;
case STYLE:
catalogFacade.add((StyleInfo) object);
break;
case SERVICE:
configFacade.add((ServiceInfo) object);
break;
case SETTINGS:
configFacade.add((SettingsInfo) object);
break;
case LOGGING:
// ignore
break;
default:
log.warn("Don't know how to handle remote envent {})", event);
break;
}
}
@EventListener(DefaultWorkspaceSet.class)
public void onRemoteDefaultWorkspaceEvent(DefaultWorkspaceSet event) {
if (event.isRemote()) {
String newId = event.getNewWorkspaceId();
WorkspaceInfo newDefault = newId == null ? null : catalogFacade.getWorkspace(newId);
catalogFacade.setDefaultWorkspace(newDefault);
}
}
@EventListener(DefaultNamespaceSet.class)
public void onRemoteDefaultNamespaceEvent(DefaultNamespaceSet event) {
if (event.isRemote()) {
String newId = event.getNewNamespaceId();
NamespaceInfo namespace = newId == null ? null : catalogFacade.getNamespace(newId);
catalogFacade.setDefaultNamespace(namespace);
}
}
@EventListener(DefaultDataStoreSet.class)
public void onRemoteDefaultDataStoreEvent(DefaultDataStoreSet event) {
if (event.isRemote()) {
String workspaceId = event.getWorkspaceId();
WorkspaceInfo workspace = catalogFacade.getWorkspace(workspaceId);
String storeId = event.getDefaultDataStoreId();
DataStoreInfo store =
storeId == null ? null : catalogFacade.getStore(storeId, DataStoreInfo.class);
catalogFacade.setDefaultDataStore(workspace, store);
ExtendedCatalogFacade facade = catalogFacade();
switch (object) {
case CatalogInfo info -> facade.add(info);
case ServiceInfo config -> configFacade.add(config);
case SettingsInfo config -> configFacade.add(config);
case LoggingInfo config -> log.debug("ignoring unused LoggingInfo");
default -> log.warn("Don't know how to handle remote envent {})", event);
}
}
@ -194,84 +145,116 @@ public class RemoteEventDataDirectoryProcessor {
if (event.isLocal()) {
return;
}
final String objectId = event.getObjectId();
final ConfigInfoType type = event.getObjectType();
if (type == ConfigInfoType.CATALOG) {
log.trace(
"remote catalog modify events handled by RemoteDefaultWorkspace/Namespace/Store event handlers");
if (event instanceof DefaultWorkspaceSet
|| event instanceof DefaultNamespaceSet
|| event instanceof DefaultDataStoreSet) {
// these are InfoModified events but have their own listeners
return;
}
log.debug("Handling remote modify event {}", event);
final ConfigInfoType type = event.getObjectType();
final Patch patch = event.getPatch();
if (patch == null) {
log.error("Remote event didn't send the patch payload {}", event);
return;
}
Info info = null;
switch (type) {
case NAMESPACE:
info = catalogFacade.getNamespace(objectId);
break;
case WORKSPACE:
info = catalogFacade.getWorkspace(objectId);
break;
case COVERAGE, FEATURETYPE, WMSLAYER, WMTSLAYER:
info = catalogFacade.getResource(objectId, ResourceInfo.class);
break;
case COVERAGESTORE, DATASTORE, WMSSTORE, WMTSSTORE:
info = catalogFacade.getStore(objectId, StoreInfo.class);
break;
case LAYERGROUP:
info = catalogFacade.getLayerGroup(objectId);
break;
case LAYER:
info = catalogFacade.getLayer(objectId);
break;
case STYLE:
info = catalogFacade.getStyle(objectId);
break;
case GEOSERVER:
info = ModificationProxy.unwrap(configFacade.getGlobal());
break;
case SERVICE:
info =
ModificationProxy.unwrap(
configFacade.getService(objectId, ServiceInfo.class));
break;
case SETTINGS:
info = ModificationProxy.unwrap(configFacade.getSettings(objectId));
break;
case LOGGING:
info = ModificationProxy.unwrap(configFacade.getLogging());
break;
default:
log.warn("Don't know how to handle remote modify envent {}", event);
return;
}
log.debug("Handling remote modify event {}", event);
Info info = loadInfo(event);
if (info == null) {
log.warn("Object not found on local Catalog, can't update upon {}", event);
return;
}
if (info instanceof CatalogInfo catalogInfo) {
// going directly through the CatalogFacade does not produce any further event
this.catalogFacade().update(catalogInfo, patch);
} else {
// config info. GeoServerFacade doesn't have an update(info, patch) method, apply
// the patch to the live object
info = ModificationProxy.unwrap(info);
patch.applyTo(info);
if (info instanceof CatalogInfo catalogInfo) {
// going directly through the CatalogFacade does not produce any further event
this.catalogFacade.update(catalogInfo, patch);
}
}
if (log.isDebugEnabled())
log.debug(
"Object updated: {}({}). Properties: {}",
type,
objectId,
patch.getPropertyNames().stream().collect(Collectors.joining(",")));
}
event.getObjectId(),
patch.getPropertyNames());
}
private <T extends Info> void remove(
private Info loadInfo(InfoModified event) {
final ConfigInfoType type = event.getObjectType();
final String objectId = event.getObjectId();
if (type.isA(CatalogInfo.class)) {
@SuppressWarnings("unchecked")
Class<? extends CatalogInfo> ctype = (Class<? extends CatalogInfo>) type.getType();
return catalogFacade().get(objectId, ctype).orElse(null);
}
Info configInfo =
switch (type) {
case GEOSERVER -> configFacade.getGlobal();
case SERVICE -> configFacade.getService(objectId, ServiceInfo.class);
case SETTINGS -> configFacade.getSettings(objectId);
case LOGGING -> configFacade.getLogging();
default -> {
log.warn("Don't know how to handle remote modify envent {}", event);
yield null;
}
};
return configInfo;
}
@EventListener(DefaultWorkspaceSet.class)
public void onRemoteDefaultWorkspaceEvent(DefaultWorkspaceSet event) {
if (event.isLocal()) {
return;
}
WorkspaceInfo newDefault = null;
if (null != event.getNewWorkspaceId()) {
// let the facade handle the resolving and eventual consistency
newDefault = ResolvingProxy.create(event.getNewWorkspaceId(), WorkspaceInfo.class);
}
catalogFacade().setDefaultWorkspace(newDefault);
}
@EventListener(DefaultNamespaceSet.class)
public void onRemoteDefaultNamespaceEvent(DefaultNamespaceSet event) {
if (event.isLocal()) {
return;
}
NamespaceInfo namespace = null;
if (null != event.getNewNamespaceId()) {
// let the facade handle the resolving and eventual consistency
namespace = ResolvingProxy.create(event.getNewNamespaceId(), NamespaceInfo.class);
}
catalogFacade().setDefaultNamespace(namespace);
}
@EventListener(DefaultDataStoreSet.class)
public void onRemoteDefaultDataStoreEvent(DefaultDataStoreSet event) {
if (event.isLocal()) {
return;
}
ExtendedCatalogFacade facade = catalogFacade();
WorkspaceInfo workspace =
ResolvingProxy.create(event.getWorkspaceId(), WorkspaceInfo.class);
DataStoreInfo store = null;
if (null != event.getDefaultDataStoreId()) {
store = ResolvingProxy.create(event.getDefaultDataStoreId(), DataStoreInfo.class);
}
facade.setDefaultDataStore(workspace, store);
}
private <T extends Info> boolean remove(
String id, Function<String, T> supplier, Consumer<? super T> remover) {
T info = supplier.apply(id);
if (info == null) {
log.warn("Can't remove {}, not present in local catalog", id);
} else {
remover.accept(info);
log.debug("Removed {}", id);
return false;
}
remover.accept(info);
return true;
}
}

View File

@ -0,0 +1,310 @@
/*
* (c) 2023 Open Source Geospatial Foundation - all rights reserved This code is licensed under the
* GPL 2.0 license, available at the root application directory.
*/
package org.geoserver.cloud.event.remote.datadir;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.impl.ModificationProxy;
import org.geoserver.catalog.impl.NamespaceInfoImpl;
import org.geoserver.catalog.impl.ResolvingProxy;
import org.geoserver.catalog.impl.WorkspaceInfoImpl;
import org.geoserver.catalog.plugin.CatalogPlugin;
import org.geoserver.catalog.plugin.ExtendedCatalogFacade;
import org.geoserver.catalog.plugin.Patch;
import org.geoserver.catalog.plugin.PropertyDiff;
import org.geoserver.cloud.event.UpdateSequenceEvent;
import org.geoserver.cloud.event.catalog.CatalogInfoAdded;
import org.geoserver.cloud.event.catalog.CatalogInfoModified;
import org.geoserver.cloud.event.catalog.CatalogInfoRemoved;
import org.geoserver.cloud.event.catalog.DefaultDataStoreSet;
import org.geoserver.cloud.event.catalog.DefaultNamespaceSet;
import org.geoserver.cloud.event.catalog.DefaultWorkspaceSet;
import org.geoserver.cloud.event.config.ConfigInfoModified;
import org.geoserver.cloud.event.config.GeoServerInfoModified;
import org.geoserver.cloud.event.config.ServiceAdded;
import org.geoserver.cloud.event.config.ServiceRemoved;
import org.geoserver.cloud.event.config.SettingsAdded;
import org.geoserver.cloud.event.config.SettingsRemoved;
import org.geoserver.cloud.event.info.InfoModified;
import org.geoserver.config.GeoServerInfo;
import org.geoserver.config.ServiceInfo;
import org.geoserver.config.SettingsInfo;
import org.geoserver.config.impl.GeoServerInfoImpl;
import org.geoserver.config.impl.ServiceInfoImpl;
import org.geoserver.config.impl.SettingsInfoImpl;
import org.geoserver.config.plugin.RepositoryGeoServerFacade;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Optional;
class RemoteEventDataDirectoryProcessorTest {
ExtendedCatalogFacade mockFacade;
RepositoryGeoServerFacade mockGeoServerFacade;
RemoteEventDataDirectoryProcessor processor;
GeoServerInfo global;
@BeforeEach
void setUp() throws Exception {
mockFacade = mock(ExtendedCatalogFacade.class);
var catalog = mock(CatalogPlugin.class);
when(catalog.getFacade()).thenReturn(mockFacade);
mockGeoServerFacade = mock(RepositoryGeoServerFacade.class);
global = new GeoServerInfoImpl();
when(mockGeoServerFacade.getGlobal()).thenReturn(global);
processor = new RemoteEventDataDirectoryProcessor(mockGeoServerFacade, catalog);
}
@Test
void testRemoteEventDataDirectoryProcessor() {
RepositoryGeoServerFacade configFacade = mock(RepositoryGeoServerFacade.class);
CatalogPlugin rawCatalog = mock(CatalogPlugin.class);
assertThrows(
NullPointerException.class,
() -> new RemoteEventDataDirectoryProcessor(null, rawCatalog));
assertThrows(
NullPointerException.class,
() -> new RemoteEventDataDirectoryProcessor(configFacade, null));
var catalogFacade = mock(ExtendedCatalogFacade.class);
when(rawCatalog.getFacade()).thenReturn(catalogFacade);
var processor = new RemoteEventDataDirectoryProcessor(configFacade, rawCatalog);
assertSame(catalogFacade, processor.catalogFacade());
}
@Test
void testOnRemoteUpdateSequenceEvent() {
global.setUpdateSequence(1);
UpdateSequenceEvent event = UpdateSequenceEvent.createLocal(10);
event.setRemote(false);
processor.onRemoteUpdateSequenceEvent(event);
verify(mockGeoServerFacade, never()).getGlobal();
event.setRemote(true);
processor.onRemoteUpdateSequenceEvent(event);
assertThat(global.getUpdateSequence()).isEqualTo(10);
clearInvocations(mockGeoServerFacade);
event = UpdateSequenceEvent.createLocal(global.getUpdateSequence() - 1);
event.setRemote(true);
processor.onRemoteUpdateSequenceEvent(event);
verify(mockGeoServerFacade, times(1)).getGlobal();
assertThat(global.getUpdateSequence()).isEqualTo(10);
}
@Test
void testOnRemoteRemoveEvent_CatalogInfo() {
CatalogInfoRemoved event =
CatalogInfoRemoved.createLocal(
101, ResolvingProxy.create("ws1", WorkspaceInfo.class));
processor.onRemoteRemoveEvent(event);
verifyNoMoreInteractions(mockFacade);
WorkspaceInfo info = new WorkspaceInfoImpl();
((WorkspaceInfoImpl) info).setId("ws1");
((WorkspaceInfoImpl) info).setName("workspace1");
Optional<WorkspaceInfo> found = Optional.of(info);
when(mockFacade.get("ws1", WorkspaceInfo.class)).thenReturn(found);
clearInvocations(mockFacade);
event.setRemote(true);
processor.onRemoteRemoveEvent(event);
// cast cause it'll call the generic remove method
verify(mockFacade).remove((CatalogInfo) info);
clearInvocations(mockFacade);
when(mockFacade.get("ws1", WorkspaceInfo.class)).thenReturn(Optional.empty());
processor.onRemoteRemoveEvent(event);
verify(mockFacade, times(1)).get("ws1", WorkspaceInfo.class);
verifyNoMoreInteractions(mockFacade);
}
@Test
void testOnRemoteRemoveEvent_ConfigInfo() {
ServiceInfoImpl service = new ServiceInfoImpl();
service.setId("wms1");
SettingsInfoImpl settings = new SettingsInfoImpl();
settings.setId("settings1");
settings.setWorkspace(new WorkspaceInfoImpl());
((WorkspaceInfoImpl) settings.getWorkspace()).setId("ws-id");
when(mockGeoServerFacade.getService(service.getId(), ServiceInfo.class))
.thenReturn(service);
when(mockGeoServerFacade.getSettings(settings.getId())).thenReturn(settings);
ServiceRemoved serviceEvent = ServiceRemoved.createLocal(101, service);
SettingsRemoved settingsEvent = SettingsRemoved.createLocal(102, settings);
processor.onRemoteRemoveEvent(serviceEvent);
verifyNoMoreInteractions(mockGeoServerFacade);
clearInvocations(mockGeoServerFacade);
processor.onRemoteRemoveEvent(settingsEvent);
verifyNoMoreInteractions(mockGeoServerFacade);
clearInvocations(mockGeoServerFacade);
serviceEvent.setRemote(true);
settingsEvent.setRemote(true);
clearInvocations(mockGeoServerFacade);
processor.onRemoteRemoveEvent(serviceEvent);
verify(mockGeoServerFacade, times(1)).getService(service.getId(), ServiceInfo.class);
verify(mockGeoServerFacade, times(1)).remove(service);
verifyNoMoreInteractions(mockGeoServerFacade);
clearInvocations(mockGeoServerFacade);
processor.onRemoteRemoveEvent(settingsEvent);
verify(mockGeoServerFacade, times(1)).getSettings(settings.getId());
verify(mockGeoServerFacade, times(1)).remove(settings);
verifyNoMoreInteractions(mockGeoServerFacade);
}
@Test
void testOnRemoteAddEvent_CatalogInfo() {
NamespaceInfoImpl info = new NamespaceInfoImpl();
info.setId("ns1");
info.setPrefix("ns");
info.setURI("ns");
CatalogInfoAdded event = CatalogInfoAdded.createLocal(10, info);
event.setRemote(false);
processor.onRemoteAddEvent(event);
verifyNoMoreInteractions(mockFacade);
event.setRemote(true);
processor.onRemoteAddEvent(event);
verify(mockFacade, times(1)).add((CatalogInfo) info);
}
@Test
void testOnRemoteAddEvent_ServiceInfo() {
ServiceInfoImpl service = new ServiceInfoImpl();
service.setId("s1");
ServiceAdded event = ServiceAdded.createLocal(0, service);
event.setRemote(true);
processor.onRemoteAddEvent(event);
verify(mockGeoServerFacade, times(1)).add(service);
}
@Test
void testOnRemoteAddEvent_SettingsInfo() {
SettingsInfoImpl settings = new SettingsInfoImpl();
settings.setId("settings1");
SettingsAdded event = SettingsAdded.createLocal(0, settings);
event.setRemote(true);
processor.onRemoteAddEvent(event);
verify(mockGeoServerFacade, times(1)).add(settings);
}
@Test
void testOnRemoteModifyEvent_CatalogInfo() {
WorkspaceInfo info = new WorkspaceInfoImpl();
((WorkspaceInfoImpl) info).setId("id");
((WorkspaceInfoImpl) info).setName("oldName");
Patch patch = simplePatch("name", "oldName", "newName");
InfoModified event = CatalogInfoModified.createLocal(1, info, patch);
event.setRemote(false);
processor.onRemoteModifyEvent(event);
verifyNoMoreInteractions(mockFacade);
when(mockFacade.get(info.getId(), WorkspaceInfo.class)).thenReturn(Optional.of(info));
event.setRemote(true);
processor.onRemoteModifyEvent(event);
verify(mockFacade, times(1)).update(info, patch);
}
private Patch simplePatch(String property, Object oldValue, Object newValue) {
return PropertyDiff.valueOf(List.of(property), List.of(oldValue), List.of(newValue))
.toPatch();
}
@Test
void testOnRemoteModifyEvent_IgnoresSetDefaultEvents() {
processor.onRemoteModifyEvent(mock(DefaultWorkspaceSet.class));
processor.onRemoteModifyEvent(mock(DefaultNamespaceSet.class));
processor.onRemoteModifyEvent(mock(DefaultDataStoreSet.class));
verifyNoMoreInteractions(mockFacade);
verifyNoMoreInteractions(mockGeoServerFacade);
}
@Test
void testOnRemoteModifyEvent_GeoServerInfo() {
GeoServerInfo global = this.global;
when(mockGeoServerFacade.getGlobal())
.thenReturn(ModificationProxy.create(global, GeoServerInfo.class));
global.setFeatureTypeCacheSize(1);
var proxied = mockGeoServerFacade.getGlobal();
proxied.setFeatureTypeCacheSize(1000);
Patch patch = PropertyDiff.valueOf(ModificationProxy.handler(proxied)).toPatch();
ConfigInfoModified event = ConfigInfoModified.createLocal(1, global, patch);
event.setRemote(true);
processor.onRemoteModifyEvent(event);
assertThat(global.getFeatureTypeCacheSize()).isEqualTo(1_000);
}
@Test
void testOnRemoteModifyEvent_SettingsInfo() {
SettingsInfoImpl settings = new SettingsInfoImpl();
settings.setId("set1");
settings.setWorkspace(new WorkspaceInfoImpl());
((WorkspaceInfoImpl) settings.getWorkspace()).setId("ws1");
when(mockGeoServerFacade.getSettings(settings.getId()))
.thenReturn(ModificationProxy.create(settings, SettingsInfo.class));
settings.setCharset("ISO-8859-1");
var proxied = mockGeoServerFacade.getSettings(settings.getId());
proxied.setCharset("UTF-8");
Patch patch = PropertyDiff.valueOf(ModificationProxy.handler(proxied)).toPatch();
ConfigInfoModified event = GeoServerInfoModified.createLocal(1, settings, patch);
event.setRemote(true);
processor.onRemoteModifyEvent(event);
assertThat(settings.getCharset()).isEqualTo("UTF-8");
}
@Test
void testOnRemoteModifyEvent_ServiceInfo() {
ServiceInfoImpl service = new ServiceInfoImpl();
service.setId("serv1");
when(mockGeoServerFacade.getService(service.getId(), ServiceInfo.class))
.thenReturn(ModificationProxy.create(service, ServiceInfo.class));
service.setTitle("old title");
var proxied = mockGeoServerFacade.getService(service.getId(), ServiceInfo.class);
proxied.setTitle("new title");
Patch patch = PropertyDiff.valueOf(ModificationProxy.handler(proxied)).toPatch();
InfoModified event = GeoServerInfoModified.createLocal(1, service, patch);
event.setRemote(true);
processor.onRemoteModifyEvent(event);
assertThat(service.getTitle()).isEqualTo("new title");
}
}

View File

@ -74,6 +74,18 @@ class CachingCatalogFacadeImpl extends ForwardingExtendedCatalogFacade
return idCache.evictIfPresent(key);
}
@Override
@CachePut(key = "new org.geoserver.cloud.catalog.cache.CatalogInfoKey(#p0)")
public <T extends CatalogInfo> T add(@NonNull T info) {
return super.add(info);
}
@Override
@CacheEvict(key = "new org.geoserver.cloud.catalog.cache.CatalogInfoKey(#p0)")
public void remove(@NonNull CatalogInfo info) {
super.remove(info);
}
@CachePut(key = "new org.geoserver.cloud.catalog.cache.CatalogInfoKey(#p0)")
@Override
public StoreInfo add(StoreInfo store) {

View File

@ -63,6 +63,11 @@
<artifactId>rabbitmq</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>

View File

@ -6,10 +6,11 @@ package org.geoserver.cloud.autoconfigure.event.bus;
import lombok.extern.slf4j.Slf4j;
import org.geoserver.cloud.autoconfigure.catalog.event.ConditionalOnCatalogEvents;
import org.geoserver.cloud.event.bus.RemoteGeoServerEventsConfiguration;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.cloud.bus.BusAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import javax.annotation.PostConstruct;
@ -23,16 +24,18 @@ import javax.annotation.PostConstruct;
@Slf4j(topic = "org.geoserver.cloud.autoconfigure.bus")
public class GeoServerBusIntegrationAutoConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnGeoServerRemoteEventsEnabled
@AutoConfiguration
@AutoConfigureAfter(BusAutoConfiguration.class)
@ConditionalOnCatalogEvents
@ConditionalOnGeoServerRemoteEventsEnabled
@Import(RemoteGeoServerEventsConfiguration.class)
static class Enabled {
public @PostConstruct void logBusDisabled() {
log.info("GeoServer event-bus integration is enabled");
}
}
@Configuration(proxyBeanMethods = false)
@AutoConfiguration
@ConditionalOnGeoServerRemoteEventsDisabled
static class Disabled {
public @PostConstruct void logBusDisabled() {

View File

@ -5,6 +5,7 @@
package org.geoserver.cloud.event.bus;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.geoserver.catalog.Catalog;
import org.geoserver.catalog.CatalogInfo;
@ -21,17 +22,16 @@ import org.geoserver.cloud.event.info.InfoModified;
import org.geoserver.config.GeoServer;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.UnaryOperator;
/**
* Highest priority listener for incoming {@link RemoteGeoServerEvent} events to resolve the payload
* {@link CatalogInfo} properties, as they may come either as {@link ResolvingProxy} proxies, or
* {@code null} in case of collection properties.
*
* <p>This listener ensures the payload object properties are resolved before being catch up by
* Ensures the {@link Info} and {@link Patch} object payloads on {@link InfoAdded} and {@link
* InfoModified} events from a remote source get their properties resolved before being catch up by
* other listeners.
*/
public class InfoEventResolver {
@Slf4j(topic = "org.geoserver.cloud.event.bus.resolve")
class InfoEventResolver {
private UnaryOperator<Info> configInfoResolver;
private UnaryOperator<CatalogInfo> catalogInfoResolver;
@ -42,42 +42,48 @@ public class InfoEventResolver {
proxyUtils = new ProxyUtils(() -> rawCatalog, Optional.of(geoserverConfig));
BiConsumer<CatalogInfo, ResolvingProxy> onNotFound =
(info, proxy) ->
log.debug(
"Event object contains a reference to a non existing object ResolvingProxy(ref={})",
proxy.getRef());
configInfoResolver =
CollectionPropertiesInitializer.<Info>instance()
.andThen(ResolvingProxyResolver.<Info>of(rawCatalog))
.andThen(
ResolvingProxyResolver.<Info>of(rawCatalog)
.onNotFound(onNotFound))
::apply;
var catalogResolver = CatalogPropertyResolver.<CatalogInfo>of(rawCatalog);
var resolvingProxyResolver =
ResolvingProxyResolver.<CatalogInfo>of(rawCatalog).onNotFound(onNotFound);
var collectionsInitializer = CollectionPropertiesInitializer.<CatalogInfo>instance();
catalogInfoResolver =
CollectionPropertiesInitializer.<CatalogInfo>instance()
.andThen(CatalogPropertyResolver.of(rawCatalog))
.andThen(ResolvingProxyResolver.of(rawCatalog))
catalogResolver.andThen(collectionsInitializer).andThen(resolvingProxyResolver)
::apply;
}
@SuppressWarnings("unchecked")
public InfoEvent resolve(InfoEvent event) {
if (event instanceof InfoAdded addEvent) {
Info object = addEvent.getObject();
addEvent.setObject(resolveInfo(object));
addEvent.setObject(resolveInfo(addEvent.getObject()));
} else if (event instanceof InfoModified modifyEvent) {
modifyEvent.setPatch(resolvePatch(modifyEvent.getPatch()));
}
return event;
}
@SuppressWarnings("unchecked")
private <I extends Info> I resolveInfo(I object) {
if (object == null) return null;
private Info resolveInfo(Info object) {
if (object instanceof CatalogInfo i) {
return (I) resolveCatalogInfo(i);
return resolveCatalogInfo(i);
}
return (I) configInfoResolver.apply(object);
return object == null ? null : configInfoResolver.apply(object);
}
@SuppressWarnings("unchecked")
private <C extends CatalogInfo> C resolveCatalogInfo(C object) {
if (object == null) return null;
return (C) catalogInfoResolver.apply(object);
private CatalogInfo resolveCatalogInfo(CatalogInfo info) {
return info == null ? null : catalogInfoResolver.apply(info);
}
private Patch resolvePatch(Patch patch) {

View File

@ -19,6 +19,7 @@ public class RemoteGeoServerEvent extends RemoteApplicationEvent {
@Getter @NonNull private GeoServerEvent event;
/** Deserialization-time constructor, {@link #getSource()} will be {@code null} */
@SuppressWarnings("java:S2637") // final fields initialized by deserialization
protected RemoteGeoServerEvent() {
// default constructor, needed for deserialization
}
@ -43,4 +44,8 @@ public class RemoteGeoServerEvent extends RemoteApplicationEvent {
getDestinationService(),
getEvent());
}
public String toShortString() {
return getEvent().toShortString();
}
}

View File

@ -7,130 +7,173 @@ package org.geoserver.cloud.event.bus;
import com.google.common.annotations.VisibleForTesting;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.geoserver.catalog.CatalogException;
import org.geoserver.catalog.plugin.Patch;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.impl.ResolvingProxy;
import org.geoserver.cloud.event.GeoServerEvent;
import org.geoserver.cloud.event.info.InfoEvent;
import org.geoserver.cloud.event.info.InfoModified;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.geoserver.platform.config.UpdateSequence;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* Listens to local catalog and configuration change {@link InfoEvent}s produced by this service
* instance and broadcasts them to the cluster as {@link RemoteGeoServerEvent}
* instance and broadcasts them to the cluster as {@link RemoteGeoServerEvent}, and conversely,
* listens to incoming {@link RemoteGeoServerEvent}s and publishes their {@link
* RemoteGeoServerEvent#getEvent() event} payload as local events
*
* @see #publishRemoteEvent(GeoServerEvent)
* @see #publishLocalEvent(RemoteGeoServerEvent)
* @see RemoteGeoServerEventProcessor
*/
public class RemoteGeoServerEventBridge {
@Slf4j(topic = "org.geoserver.cloud.event.bus.bridge")
public class RemoteGeoServerEventBridge implements DisposableBean {
private final Outgoing outgoing;
private final Incoming incoming;
/**
* Provided event publisher for incoming remote events converted to local events (e.g. {@link
* ApplicationEventPublisher#publishEvent})
*
* @see #publishRemoteEvent(GeoServerEvent)
*/
private final Consumer<GeoServerEvent> inboundEventPublisher;
private boolean enabled = true;
/**
* Provided event publisher for outgoing remote events converted from local events (e.g. {@link
* ApplicationEventPublisher#publishEvent})
*
* @see #publishLocalEvent(RemoteGeoServerEvent)
*/
private final Consumer<RemoteGeoServerEvent> outboundEventPublisher;
private final RemoteGeoServerEventMapper mapper;
private final AtomicBoolean enabled = new AtomicBoolean(false);
public RemoteGeoServerEventBridge( //
@NonNull Consumer<GeoServerEvent> localRemoteEventPublisher, //
@NonNull Consumer<RemoteApplicationEvent> remoteEventPublisher, //
@NonNull RemoteGeoServerEventMapper mapper, //
@NonNull Supplier<String> localBusId) {
@NonNull Consumer<RemoteGeoServerEvent> remoteEventPublisher, //
@NonNull RemoteGeoServerEventMapper mapper,
@NonNull UpdateSequence updateSequence) {
this.outgoing = new Outgoing(remoteEventPublisher, mapper, localBusId);
this.incoming = new Incoming(localRemoteEventPublisher, mapper, localBusId);
this.mapper = mapper;
this.outboundEventPublisher = remoteEventPublisher;
this.inboundEventPublisher = localRemoteEventPublisher;
enable();
}
public @VisibleForTesting void enabled(boolean enabled) {
this.enabled = enabled;
}
@EventListener(GeoServerEvent.class)
public void handleLocalEvent(GeoServerEvent event) {
if (enabled) {
outgoing.broadCastIfLocal(event);
@VisibleForTesting
void enable() {
if (enabled.compareAndSet(false, true)) {
log.debug("RemoteGeoServerEventBridge enabled");
}
}
@VisibleForTesting
void disable() {
if (enabled.compareAndSet(true, false)) {
log.debug("RemoteGeoServerEventBridge disabled");
}
}
@Override
public void destroy() {
log.info(
"RemoteGeoServerEventBridge received destroy signal, stopping remote event processing");
disable();
}
private boolean enabled() {
return enabled.get();
}
/**
* Highest priority listener for incoming {@link RemoteGeoServerEvent} events to resolve the
* payload {@link CatalogInfo} properties, as they may come either as {@link ResolvingProxy}
* proxies, or {@code null} in case of collection properties.
*/
@EventListener(RemoteGeoServerEvent.class)
public void handleRemoteEvent(RemoteGeoServerEvent busEvent) throws CatalogException {
if (enabled) {
incoming.handleRemoteEvent(busEvent);
}
@Order(Ordered.HIGHEST_PRECEDENCE)
public void publishLocalEvent(RemoteGeoServerEvent busEvent) {
mapper.ifRemote(busEvent)
.ifPresentOrElse(
incoming -> {
logReceived(incoming);
dispatchAccepted(incoming);
},
() -> logIgnoreLocalRemote(busEvent));
}
@RequiredArgsConstructor
@Slf4j(topic = "org.geoserver.cloud.event.bus.outgoing")
private static class Outgoing {
private final @NonNull Consumer<RemoteApplicationEvent> remoteEventPublisher;
private final @NonNull RemoteGeoServerEventMapper mapper;
private @NonNull Supplier<String> localBusId;
/**
* Lowest priority listener on a local {@link GeoServerEvent}, publishes a matching {@link
* RemoteGeoServerEvent} to the event bus
*/
@EventListener(GeoServerEvent.class)
@Order(Ordered.LOWEST_PRECEDENCE)
public void publishRemoteEvent(GeoServerEvent event) {
mapper.mapIfLocal(event)
.ifPresentOrElse(this::dispatchAccepted, () -> logIgnoreRemoteLocal(event));
}
public void broadCastIfLocal(GeoServerEvent event) throws CatalogException {
if (event.isLocal()) {
RemoteGeoServerEvent remote = mapper.toRemote(event);
publishRemoteEvent(remote);
private void dispatchAccepted(RemoteGeoServerEvent event) {
if (enabled()) {
if (event.getEvent().isLocal()) {
doSend(event);
} else {
log.trace("{}: not re-publishing {}", localBusId.get(), event);
doReceive(event);
}
}
private void publishRemoteEvent(RemoteGeoServerEvent remoteEvent) {
logOutgoing(remoteEvent);
try {
remoteEventPublisher.accept(remoteEvent);
} catch (RuntimeException e) {
log.error("{}: error broadcasting {}", localBusId.get(), remoteEvent, e);
throw e;
}
}
protected void logOutgoing(RemoteGeoServerEvent remoteEvent) {
@NonNull GeoServerEvent event = remoteEvent.getEvent();
String logMsg = "{}: broadcasting {}";
if (event instanceof InfoModified modEvent) {
Patch patch = modEvent.getPatch();
if (patch.isEmpty()) {
logMsg = "{}: broadcasting no-change event {}";
}
}
final String busId = localBusId.get();
log.debug(logMsg, busId, remoteEvent);
}
}
@RequiredArgsConstructor
@Slf4j(topic = "org.geoserver.cloud.event.bus.incoming")
private static class Incoming {
private final @NonNull Consumer<GeoServerEvent> localRemoteEventPublisher;
private final @NonNull RemoteGeoServerEventMapper mapper;
private @NonNull Supplier<String> localBusId;
public void handleRemoteEvent(RemoteGeoServerEvent incoming) throws CatalogException {
mapper.ifRemote(incoming) //
.ifPresentOrElse( //
this::publishLocalEvent, //
() ->
log.trace(
"{}: not broadcasting local-remote event {}",
localBusId.get(),
incoming));
private void doSend(RemoteGeoServerEvent outgoing) {
try {
outboundEventPublisher.accept(outgoing);
logOutgoing(outgoing);
} catch (RuntimeException e) {
log.error("error broadcasting {}", outgoing, e);
throw e;
}
}
private void publishLocalEvent(RemoteGeoServerEvent incoming) {
log.trace("Received remote event {}", incoming);
private void doReceive(RemoteGeoServerEvent incoming) {
try {
GeoServerEvent localRemoteEvent = mapper.toLocalRemote(incoming);
log.debug("{}: publishing as local event {}", localBusId.get(), incoming);
try {
localRemoteEventPublisher.accept(localRemoteEvent);
} catch (RuntimeException e) {
log.error("{}: error accepting remote {}", localBusId.get(), localRemoteEvent, e);
throw e;
}
if (log.isDebugEnabled())
log.debug("publishing as local event {}", incoming.toShortString());
inboundEventPublisher.accept(localRemoteEvent);
} catch (RuntimeException e) {
log.error("{}: error accepting remote {}", mapper.localBusServiceId(), incoming, e);
throw e;
}
}
private void logIgnoreLocalRemote(RemoteGeoServerEvent incoming) {
if (log.isTraceEnabled())
log.trace(
"{}: not broadcasting local-remote event {}",
mapper.localBusServiceId(),
incoming.toShortString());
}
private void logIgnoreRemoteLocal(GeoServerEvent event) {
log.trace("{}: not re-publishing {}", mapper.localBusServiceId(), event);
}
private void logReceived(RemoteGeoServerEvent incoming) {
if (log.isDebugEnabled()) {
log.debug("received remote event {}", incoming.toShortString());
}
}
protected void logOutgoing(RemoteGeoServerEvent remoteEvent) {
if (log.isDebugEnabled()) {
log.debug("sent remote event {}", remoteEvent.toShortString());
}
}
}

View File

@ -14,9 +14,15 @@ import org.springframework.cloud.bus.event.Destination;
import java.util.Optional;
/** */
/**
* Aids {@link RemoteGeoServerEventBridge} in mapping {@link RemoteGeoServerEvent} to local {@link
* GeoServerEvent} and vice-versa.
*
* @see InfoEventResolver
* @see RemoteGeoServerEventBridge
*/
@RequiredArgsConstructor
public class RemoteGeoServerEventMapper {
class RemoteGeoServerEventMapper {
/** Constant indicating a remote event is destined to all services */
private static final String DESTINATION_ALL_SERVICES = "**";
@ -29,12 +35,12 @@ public class RemoteGeoServerEventMapper {
return destinationFactory.getDestination(DESTINATION_ALL_SERVICES);
}
private @NonNull String originService() {
public @NonNull String localBusServiceId() {
return serviceMatcher.getBusId();
}
public RemoteGeoServerEvent toRemote(GeoServerEvent anyLocalCatalogOrConfigEvent) {
String origin = originService();
String origin = localBusServiceId();
Destination destination = destinationService();
RemoteGeoServerEvent remote =
new RemoteGeoServerEvent(this, anyLocalCatalogOrConfigEvent, origin, destination);
@ -47,7 +53,19 @@ public class RemoteGeoServerEventMapper {
final boolean fromSelf = serviceMatcher.isFromSelf(busEvent);
final boolean forSelf = serviceMatcher.isForSelf(busEvent);
final boolean republishAsLocal = !fromSelf && forSelf;
return Optional.ofNullable(republishAsLocal ? busEvent : null);
if (republishAsLocal) {
GeoServerEvent event = busEvent.getEvent();
event.setRemote(true);
event.setOrigin(busEvent.getOriginService());
return Optional.of(busEvent);
}
return Optional.empty();
}
public Optional<RemoteGeoServerEvent> mapIfLocal(GeoServerEvent event) {
return Optional.of(event).filter(GeoServerEvent::isLocal).map(this::toRemote);
}
public GeoServerEvent toLocalRemote(@NonNull RemoteGeoServerEvent incoming) {

View File

@ -2,7 +2,7 @@
* (c) 2020 Open Source Geospatial Foundation - all rights reserved This code is licensed under the
* GPL 2.0 license, available at the root application directory.
*/
package org.geoserver.cloud.autoconfigure.event.bus;
package org.geoserver.cloud.event.bus;
import lombok.extern.slf4j.Slf4j;
@ -10,54 +10,39 @@ import org.geoserver.catalog.Catalog;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.Info;
import org.geoserver.catalog.impl.ResolvingProxy;
import org.geoserver.cloud.autoconfigure.catalog.event.ConditionalOnCatalogEvents;
import org.geoserver.cloud.event.GeoServerEvent;
import org.geoserver.cloud.event.bus.InfoEventResolver;
import org.geoserver.cloud.event.bus.RemoteGeoServerEvent;
import org.geoserver.cloud.event.bus.RemoteGeoServerEventBridge;
import org.geoserver.cloud.event.bus.RemoteGeoServerEventMapper;
import org.geoserver.config.GeoServer;
import org.geoserver.jackson.databind.catalog.GeoServerCatalogModule;
import org.geoserver.jackson.databind.config.GeoServerConfigModule;
import org.geoserver.platform.config.UpdateSequence;
import org.geotools.api.filter.Filter;
import org.geotools.api.filter.expression.Expression;
import org.geotools.api.filter.expression.Literal;
import org.geotools.jackson.databind.filter.GeoToolsFilterModule;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.bus.BusAutoConfiguration;
import org.springframework.cloud.bus.ServiceMatcher;
import org.springframework.cloud.bus.event.Destination;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* {@link EnableAutoConfiguration auto-configuration} catalog and config events integration with
* spring cloud bus
*/
/** Catalog and config events integration with spring cloud bus */
@Configuration(proxyBeanMethods = false)
@ConditionalOnCatalogEvents
@ConditionalOnGeoServerRemoteEventsEnabled
@AutoConfigureAfter(BusAutoConfiguration.class)
@RemoteApplicationEventScan(basePackageClasses = {RemoteGeoServerEvent.class})
@Slf4j(topic = "org.geoserver.cloud.autoconfigure.bus.catalog")
public class RemoteGeoServerEventsAutoConfiguration {
@Slf4j(topic = "org.geoserver.cloud.event.bus")
public class RemoteGeoServerEventsConfiguration {
/**
* Add a {@link GeoToolsFilterModule} to the default jackson spring codecs if not already
* present, so {@link Expression} and {@link Filter} objects can be used as {@link
* RemoteGeoServerEvent} payload, especially {@link Literal} expressions.
*/
@ConditionalOnMissingBean(GeoToolsFilterModule.class)
@Bean
@ConditionalOnMissingBean(GeoToolsFilterModule.class)
GeoToolsFilterModule geoToolsFilterModule() {
return new GeoToolsFilterModule();
}
@ -66,8 +51,8 @@ public class RemoteGeoServerEventsAutoConfiguration {
* Add a {@link GeoServerCatalogModule} to the default jackson spring codecs if not already
* present, so {@link CatalogInfo} objects can be used as {@link RemoteGeoServerEvent} payload
*/
@ConditionalOnMissingBean(GeoServerCatalogModule.class)
@Bean
@ConditionalOnMissingBean(GeoServerCatalogModule.class)
GeoServerCatalogModule geoServerCatalogJacksonModule() {
return new GeoServerCatalogModule();
}
@ -77,8 +62,8 @@ public class RemoteGeoServerEventsAutoConfiguration {
* present, so configuration {@link Info} objects can be used as {@link RemoteGeoServerEvent}
* payload
*/
@ConditionalOnMissingBean(GeoServerConfigModule.class)
@Bean
@ConditionalOnMissingBean(GeoServerConfigModule.class)
GeoServerConfigModule geoServerConfigJacksonModule() {
return new GeoServerConfigModule();
}
@ -111,14 +96,13 @@ public class RemoteGeoServerEventsAutoConfiguration {
RemoteGeoServerEventBridge remoteEventBroadcaster(
ApplicationEventPublisher eventPublisher,
RemoteGeoServerEventMapper eventMapper,
ServiceMatcher serviceMatcher) {
UpdateSequence updateSequence) {
log.info("Configuring GeoServer Catalog distributed events.");
Consumer<GeoServerEvent> localEventPublisher = eventPublisher::publishEvent;
Consumer<RemoteApplicationEvent> remoteEventPublisher = eventPublisher::publishEvent;
Supplier<String> busId = serviceMatcher::getBusId;
Consumer<RemoteGeoServerEvent> remoteEventPublisher = eventPublisher::publishEvent;
return new RemoteGeoServerEventBridge(
localEventPublisher, remoteEventPublisher, eventMapper, busId);
localEventPublisher, remoteEventPublisher, eventMapper, updateSequence);
}
}

View File

@ -1,3 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.geoserver.cloud.autoconfigure.event.bus.GeoServerBusIntegrationAutoConfiguration,\
org.geoserver.cloud.autoconfigure.event.bus.RemoteGeoServerEventsAutoConfiguration
org.geoserver.cloud.autoconfigure.event.bus.GeoServerBusIntegrationAutoConfiguration

View File

@ -8,12 +8,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import org.geoserver.catalog.Catalog;
import org.geoserver.cloud.event.bus.InfoEventResolver;
import org.geoserver.cloud.event.bus.RemoteGeoServerEventBridge;
import org.geoserver.cloud.event.bus.RemoteGeoServerEventMapper;
import org.geoserver.config.GeoServer;
import org.geoserver.jackson.databind.catalog.GeoServerCatalogModule;
import org.geoserver.jackson.databind.config.GeoServerConfigModule;
import org.geoserver.platform.config.UpdateSequence;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
@ -24,23 +23,25 @@ import org.springframework.cloud.bus.ServiceMatcher;
/**
* @since 1.0
*/
class RemoteGeoServerEventsAutoConfigurationTest {
class GeoServerBusIntegrationAutoConfigurationTest {
private final ServiceMatcher mockServiceMatcher = mock(ServiceMatcher.class);
private final BusBridge mockBusBridge = mock(BusBridge.class);
private final Catalog mockCatalog = mock(Catalog.class);
private final GeoServer mockGeoserver = mock(GeoServer.class);
private final UpdateSequence updateSequence = mock(UpdateSequence.class);
private ApplicationContextRunner runner =
new ApplicationContextRunner()
.withBean(ServiceMatcher.class, () -> mockServiceMatcher)
.withBean(BusBridge.class, () -> mockBusBridge)
.withBean(UpdateSequence.class, () -> updateSequence)
.withBean("rawCatalog", Catalog.class, () -> mockCatalog)
.withBean("Geoserver", GeoServer.class, () -> mockGeoserver)
.withConfiguration(
AutoConfigurations.of(
BusAutoConfiguration.class,
RemoteGeoServerEventsAutoConfiguration.class));
GeoServerBusIntegrationAutoConfiguration.class));
@Test
void enabledByDefault() {
@ -66,11 +67,11 @@ class RemoteGeoServerEventsAutoConfigurationTest {
runner.run(
context -> {
assertThat(context).hasSingleBean(GeoServerCatalogModule.class);
assertThat(context).hasSingleBean(GeoServerConfigModule.class);
assertThat(context).hasSingleBean(InfoEventResolver.class);
assertThat(context).hasSingleBean(RemoteGeoServerEventMapper.class);
assertThat(context).hasSingleBean(RemoteGeoServerEventBridge.class);
assertThat(context)
.hasNotFailed()
.hasSingleBean(GeoServerCatalogModule.class)
.hasSingleBean(GeoServerConfigModule.class)
.hasSingleBean(RemoteGeoServerEventBridge.class);
});
}
@ -78,11 +79,11 @@ class RemoteGeoServerEventsAutoConfigurationTest {
runner.run(
context -> {
assertThat(context).doesNotHaveBean(GeoServerCatalogModule.class);
assertThat(context).doesNotHaveBean(GeoServerConfigModule.class);
assertThat(context).doesNotHaveBean(InfoEventResolver.class);
assertThat(context).doesNotHaveBean(RemoteGeoServerEventMapper.class);
assertThat(context).doesNotHaveBean(RemoteGeoServerEventBridge.class);
assertThat(context)
.hasNotFailed()
.doesNotHaveBean(GeoServerCatalogModule.class)
.doesNotHaveBean(GeoServerConfigModule.class)
.doesNotHaveBean(RemoteGeoServerEventBridge.class);
});
}
}

View File

@ -26,12 +26,24 @@ import lombok.NonNull;
import lombok.experimental.Accessors;
import org.geoserver.catalog.Catalog;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.CatalogTestData;
import org.geoserver.catalog.Info;
import org.geoserver.catalog.LayerGroupInfo;
import org.geoserver.catalog.LayerInfo;
import org.geoserver.catalog.NamespaceInfo;
import org.geoserver.catalog.PublishedInfo;
import org.geoserver.catalog.ResourceInfo;
import org.geoserver.catalog.StoreInfo;
import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.impl.ClassMappings;
import org.geoserver.catalog.impl.LayerGroupStyle;
import org.geoserver.catalog.impl.ModificationProxy;
import org.geoserver.catalog.plugin.CatalogPlugin;
import org.geoserver.catalog.plugin.Patch;
import org.geoserver.catalog.plugin.PropertyDiff;
import org.geoserver.catalog.plugin.resolving.ProxyUtils;
import org.geoserver.cloud.event.GeoServerEvent;
import org.geoserver.cloud.event.catalog.CatalogInfoAdded;
import org.geoserver.cloud.event.catalog.CatalogInfoModified;
@ -44,6 +56,7 @@ import org.geoserver.cloud.event.info.InfoRemoved;
import org.geoserver.config.GeoServer;
import org.geoserver.config.GeoServerInfo;
import org.geoserver.config.LoggingInfo;
import org.geoserver.config.ServiceInfo;
import org.geoserver.config.SettingsInfo;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@ -62,6 +75,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
@SpringBootTest(
webEnvironment = RANDOM_PORT,
@ -86,7 +100,7 @@ public abstract class BusAmqpIntegrationTests {
private @Autowired ConfigurableApplicationContext localAppContext;
protected @Autowired GeoServer geoserver;
protected @Autowired Catalog catalog;
protected @Autowired CatalogPlugin catalog;
protected CatalogTestData testData;
@ -273,6 +287,11 @@ public abstract class BusAmqpIntegrationTests {
return type;
}
protected <T extends Info> RemoteGeoServerEvent testRemoteCatalogInfoAddEvent(
CatalogInfo info) {
return testRemoteAddEvent(info, catalog::add, CatalogInfoAdded.class);
}
protected <T extends Info> RemoteGeoServerEvent testRemoteCatalogInfoAddEvent(
T info, Consumer<T> addOp) {
return testRemoteAddEvent(info, addOp, CatalogInfoAdded.class);
@ -333,11 +352,12 @@ public abstract class BusAmqpIntegrationTests {
assertThat(infoType.isInstance(info)).isTrue();
if (event instanceof InfoAdded e) {
assertThat(e.getObject()).isNotNull();
assertThat(infoType.isInstance(e.getObject())).isTrue();
assertThat(e.getObject().getId()).isEqualTo(info.getId());
Info object = e.getObject();
assertThat(object).isNotNull();
assertThat(infoType.isInstance(object)).isTrue();
assertThat(object.getId()).isEqualTo(info.getId());
assertResolved(object, Info.class);
info = ModificationProxy.unwrap(info);
object = ModificationProxy.unwrap(object);
// testData.assertEqualsLenientConnectionParameters(info, object);
@ -348,6 +368,65 @@ public abstract class BusAmqpIntegrationTests {
}
}
protected void assertResolved(Info info, Class<? extends Info> expected) {
if (null == info) return;
if (ProxyUtils.isResolvingProxy(info)) return;
switch (info) {
case StoreInfo store:
assertCatalogSet(store, store::getCatalog);
assertResolved(store.getWorkspace(), WorkspaceInfo.class);
break;
case ResourceInfo resource:
assertCatalogSet(resource, resource::getCatalog);
assertResolved(resource.getNamespace(), NamespaceInfo.class);
assertResolved(resource.getStore(), StoreInfo.class);
break;
case StyleInfo style:
assertResolved(style.getWorkspace(), WorkspaceInfo.class);
break;
case LayerInfo layer:
assertResolved(layer.getDefaultStyle(), StyleInfo.class);
assertResolved(layer.getResource(), ResourceInfo.class);
break;
case LayerGroupInfo lg:
assertResolved(lg.getWorkspace(), WorkspaceInfo.class);
assertResolved(lg.getRootLayer(), LayerInfo.class);
assertResolved(lg.getRootLayerStyle(), StyleInfo.class);
lg.getStyles().forEach(s -> assertResolved(s, StyleInfo.class));
lg.getLayers().forEach(l -> assertResolved(l, PublishedInfo.class));
lg.getLayerGroupStyles().forEach(lgs -> assertResolved(lgs, LayerGroupStyle.class));
break;
case LayerGroupStyle lgs:
lgs.getLayers().forEach(l -> assertResolved(l, PublishedInfo.class));
lgs.getStyles().forEach(s -> assertResolved(s, StyleInfo.class));
break;
case NamespaceInfo ns:
break;
case WorkspaceInfo ws:
break;
case ServiceInfo service:
break;
case SettingsInfo settings:
break;
default:
throw new IllegalArgumentException(String.valueOf(info));
}
}
private void assertCatalogSet(Info info, Supplier<Catalog> accessor) {
assertThat(accessor.get())
.as("%s does not have its catalog property set".formatted(info))
.isNotNull();
}
protected void assertNotAProxy(Info info, Class<? extends Info> expected) {
assertThat(ProxyUtils.isResolvingProxy(info))
.as(
"Expected %s, got ResolvingProxy %s"
.formatted(expected.getSimpleName(), info.getId()))
.isFalse();
}
@Accessors(fluent = true)
@AllArgsConstructor
protected static class EventsCaptor {

View File

@ -80,7 +80,7 @@ public class BusEventCollector {
Class<T> payloadType, Predicate<T> filter) {
List<RemoteGeoServerEvent> matches =
await().atMost(Duration.ofSeconds(5)) //
await().atMost(Duration.ofSeconds(500)) //
.until(() -> allOf(payloadType, filter), not(List::isEmpty));
Supplier<String> message =
@ -126,12 +126,12 @@ public class BusEventCollector {
public void stop() {
log.debug("bus id {}: stopped", busId);
capturing = false;
bridge.enabled(false);
bridge.disable();
}
public void start() {
log.debug("bus id {}: ready to capture {} events", busId, eventType.getSimpleName());
capturing = true;
bridge.enabled(true);
bridge.enable();
}
}

View File

@ -198,7 +198,7 @@ class CatalogRemoteApplicationEventsIT extends BusAmqpIntegrationTests {
catalog.add(testData.workspaceA);
catalog.add(testData.namespaceA);
catalog.add(testData.dataStoreA);
testRemoteCatalogInfoAddEvent(testData.featureTypeA, catalog::add);
testRemoteCatalogInfoAddEvent(testData.featureTypeA);
}
@Test
@ -208,7 +208,7 @@ class CatalogRemoteApplicationEventsIT extends BusAmqpIntegrationTests {
catalog.add(testData.dataStoreA);
catalog.add(testData.featureTypeA);
catalog.add(testData.style1);
testRemoteCatalogInfoAddEvent(testData.layerFeatureTypeA, catalog::add);
testRemoteCatalogInfoAddEvent(testData.layerFeatureTypeA);
}
@Test

View File

@ -0,0 +1,56 @@
/*
* (c) 2022 Open Source Geospatial Foundation - all rights reserved This code is licensed under the
* GPL 2.0 license, available at the root application directory.
*/
package org.geoserver.cloud.event.bus;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import org.geoserver.catalog.Catalog;
import org.geoserver.config.GeoServer;
import org.geoserver.jackson.databind.catalog.GeoServerCatalogModule;
import org.geoserver.jackson.databind.config.GeoServerConfigModule;
import org.geoserver.platform.config.UpdateSequence;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.context.annotation.UserConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.bus.BusAutoConfiguration;
import org.springframework.cloud.bus.BusBridge;
import org.springframework.cloud.bus.ServiceMatcher;
/**
* @since 1.0
*/
class RemoteGeoServerEventsConfigurationTest {
private final ServiceMatcher mockServiceMatcher = mock(ServiceMatcher.class);
private final BusBridge mockBusBridge = mock(BusBridge.class);
private final Catalog mockCatalog = mock(Catalog.class);
private final GeoServer mockGeoserver = mock(GeoServer.class);
private final UpdateSequence updateSequence = mock(UpdateSequence.class);
private ApplicationContextRunner runner =
new ApplicationContextRunner()
.withBean(ServiceMatcher.class, () -> mockServiceMatcher)
.withBean(BusBridge.class, () -> mockBusBridge)
.withBean(UpdateSequence.class, () -> updateSequence)
.withBean("rawCatalog", Catalog.class, () -> mockCatalog)
.withBean("Geoserver", GeoServer.class, () -> mockGeoserver)
.withConfiguration(AutoConfigurations.of(BusAutoConfiguration.class))
.withConfiguration(
UserConfigurations.of(RemoteGeoServerEventsConfiguration.class));
@Test
void enabledByDefault() {
runner.run(
context -> {
assertThat(context).hasSingleBean(GeoServerCatalogModule.class);
assertThat(context).hasSingleBean(GeoServerConfigModule.class);
assertThat(context).hasSingleBean(InfoEventResolver.class);
assertThat(context).hasSingleBean(RemoteGeoServerEventMapper.class);
assertThat(context).hasSingleBean(RemoteGeoServerEventBridge.class);
});
}
}

View File

@ -14,5 +14,5 @@ logging:
root: WARN
org.springframework: error
org.geoserver.platform: error
org.geoserver.cloud: info
org.geoserver.cloud.bus: info
org.geoserver.cloud: debug
org.geoserver.cloud.bus: debug

View File

@ -11,4 +11,7 @@
<logger name="org.springframework" level="WARN"/>
<logger name="org.testcontainers" level="INFO"/>
<logger name="org.geoserver.cloud.event" level="DEBUG"/>
<logger name="org.geoserver.cloud.event.bus" level="DEBUG"/>
</configuration>

View File

@ -68,6 +68,8 @@ public abstract class GeoServerEvent implements Serializable {
return toStringBuilder().toString();
}
public abstract String toShortString();
protected ToStringCreator toStringBuilder() {
return new ToStringCreator(this)
.append("id", getId())

View File

@ -24,7 +24,7 @@ import org.springframework.security.core.context.SecurityContextHolder;
})
@JsonTypeName("UpdateSequence")
@SuppressWarnings("serial")
public class UpdateSequenceEvent extends GeoServerEvent {
public class UpdateSequenceEvent extends GeoServerEvent implements Comparable<UpdateSequenceEvent> {
/**
* The provided {@link GeoServerInfo}'s {@link GeoServerInfo#getUpdateSequence() update
* sequence}. Being the most frequently updated property, it's readily available for remote
@ -51,4 +51,17 @@ public class UpdateSequenceEvent extends GeoServerEvent {
public static UpdateSequenceEvent createLocal(long value) {
return new UpdateSequenceEvent(value);
}
@Override
public int compareTo(UpdateSequenceEvent o) {
return Long.compare(getUpdateSequence(), o.getUpdateSequence());
}
@Override
public String toShortString() {
String originService = getOrigin();
String type = getClass().getSimpleName();
return "%s[origin: %s, updateSequence: %s]"
.formatted(type, originService, getUpdateSequence());
}
}

View File

@ -34,6 +34,10 @@ public class CatalogInfoAdded extends InfoAdded<CatalogInfo> {
public static CatalogInfoAdded createLocal(
long updateSequence, @NonNull CatalogAddEvent event) {
return new CatalogInfoAdded(updateSequence, event.getSource());
return createLocal(updateSequence, event.getSource());
}
public static CatalogInfoAdded createLocal(long updateSequence, @NonNull CatalogInfo info) {
return new CatalogInfoAdded(updateSequence, info);
}
}

View File

@ -58,6 +58,15 @@ public enum ConfigInfoType {
return object != null && getType().isInstance(object);
}
public static boolean isPersistable(@NonNull Info info) {
for (ConfigInfoType enumVal : ConfigInfoType.values()) {
if (enumVal.isInstance(info)) {
return true;
}
}
return false;
}
public static ConfigInfoType valueOf(@NonNull Info object) {
for (ConfigInfoType enumVal : ConfigInfoType.values()) {
if (enumVal.isInstance(object)) {
@ -70,4 +79,9 @@ public enum ConfigInfoType {
public boolean isA(Class<? extends Info> type) {
return type.isAssignableFrom(getType());
}
@SuppressWarnings("unchecked")
public <T> Class<T> type() {
return (Class<T>) type;
}
}

View File

@ -61,6 +61,15 @@ public abstract class InfoEvent extends UpdateSequenceEvent {
*/
private static final String LOGGING_ID = "logging";
@Override
public String toShortString() {
String originService = getOrigin();
String type = getClass().getSimpleName();
return "%s[origin: %s, updateSequence: %s, object: %s(%s)]"
.formatted(
type, originService, getUpdateSequence(), getObjectType(), getObjectId());
}
public static String resolveId(Info object) {
if (null == object) return null;
String id = object.getId();

View File

@ -39,4 +39,12 @@ public class SecurityConfigChanged extends UpdateSequenceEvent {
public static SecurityConfigChanged createLocal(long updateSequence, @NonNull String reason) {
return new SecurityConfigChanged(updateSequence, reason);
}
@Override
public String toShortString() {
String originService = getOrigin();
String type = getClass().getSimpleName();
return "%s[origin: %s, updateSequence: %s, reason: %s]"
.formatted(type, originService, getUpdateSequence(), getReason());
}
}

View File

@ -116,6 +116,10 @@
<groupId>org.geotools</groupId>
<artifactId>gt-process-feature</artifactId>
</dependency>
<dependency>
<groupId>com.github.f4b6a3</groupId>
<artifactId>ulid-creator</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -6,6 +6,9 @@ package org.geoserver.catalog.plugin;
import static java.util.Collections.unmodifiableList;
import com.github.f4b6a3.ulid.Ulid;
import com.github.f4b6a3.ulid.UlidCreator;
import lombok.NonNull;
import org.geoserver.catalog.Catalog;
@ -16,16 +19,11 @@ import org.geoserver.catalog.LayerGroupInfo;
import org.geoserver.catalog.LayerInfo;
import org.geoserver.catalog.MapInfo;
import org.geoserver.catalog.NamespaceInfo;
import org.geoserver.catalog.PublishedInfo;
import org.geoserver.catalog.ResourceInfo;
import org.geoserver.catalog.ResourcePool;
import org.geoserver.catalog.StoreInfo;
import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.ValidationResult;
import org.geoserver.catalog.WMSLayerInfo;
import org.geoserver.catalog.WMSStoreInfo;
import org.geoserver.catalog.WMTSLayerInfo;
import org.geoserver.catalog.WMTSStoreInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.event.CatalogListener;
import org.geoserver.catalog.impl.CatalogImpl;
@ -45,14 +43,12 @@ import org.geoserver.config.plugin.GeoServerImpl;
import org.geoserver.config.util.XStreamPersister;
import org.geoserver.ows.util.OwsUtils;
import org.geotools.api.filter.Filter;
import org.geotools.api.filter.FilterFactory;
import org.geotools.api.filter.Id;
import org.geotools.api.filter.PropertyIsEqualTo;
import org.geotools.api.filter.expression.Literal;
import org.geotools.api.filter.expression.PropertyName;
import org.geotools.api.filter.identity.Identifier;
import org.geotools.api.filter.sort.SortBy;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.util.Converters;
import org.geotools.util.logging.Logging;
@ -61,8 +57,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -217,6 +211,40 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
super.facade.setCatalog(this);
}
public void add(@NonNull CatalogInfo info) {
switch (info) {
case WorkspaceInfo ws -> add(ws);
case NamespaceInfo ns -> add(ns);
case StoreInfo st -> add(st);
case ResourceInfo r -> add(r);
case LayerInfo l -> add(l);
case LayerGroupInfo lg -> add(lg);
case StyleInfo s -> add(s);
case MapInfo m -> add(m);
default -> throw new IllegalArgumentException(
"Unexpected value: %s".formatted(ModificationProxy.unwrap(info).getClass()));
}
}
public void save(@NonNull CatalogInfo info) {
doSave(info);
}
public void remove(@NonNull CatalogInfo info) {
switch (info) {
case WorkspaceInfo ws -> remove(ws);
case NamespaceInfo ns -> remove(ns);
case StoreInfo st -> remove(st);
case ResourceInfo r -> remove(r);
case LayerInfo l -> remove(l);
case LayerGroupInfo lg -> remove(lg);
case StyleInfo s -> remove(s);
case MapInfo m -> remove(m);
default -> throw new IllegalArgumentException(
"Unexpected value: %s".formatted(ModificationProxy.unwrap(info).getClass()));
}
}
// Store methods
@Override
public void add(StoreInfo store) {
@ -240,7 +268,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
@Override
public void remove(StoreInfo store) {
doRemove(store, facade::remove);
doRemove(store, StoreInfo.class);
}
/**
@ -386,7 +414,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
@Override
public void remove(ResourceInfo resource) {
doRemove(resource, facade::remove);
doRemove(resource, ResourceInfo.class);
}
@Override
@ -417,7 +445,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
@Override
public void remove(LayerInfo layer) {
doRemove(layer, facade::remove);
doRemove(layer, LayerInfo.class);
}
@Override
@ -465,7 +493,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
@Override
public void remove(LayerGroupInfo layerGroup) {
doRemove(layerGroup, facade::remove);
doRemove(layerGroup, LayerGroupInfo.class);
}
@Override
@ -490,7 +518,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
@Override
public void remove(MapInfo map) {
doRemove(map, facade::remove);
doRemove(map, MapInfo.class);
}
@Override
@ -515,7 +543,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
@Override
public void remove(NamespaceInfo namespace) {
doRemove(namespace, facade::remove);
doRemove(namespace, NamespaceInfo.class);
}
@Override
@ -562,7 +590,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
@Override
public void remove(WorkspaceInfo workspace) {
doRemove(workspace, facade::remove);
doRemove(workspace, WorkspaceInfo.class);
}
@Override
@ -622,7 +650,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
@Override
public void remove(StyleInfo style) {
doRemove(style, facade::remove);
doRemove(style, StyleInfo.class);
}
@Override
@ -671,22 +699,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
}
public Optional<CatalogInfo> findById(@NonNull String id) {
FilterFactory ff = CommonFactoryFinder.getFilterFactory();
final Filter filter = ff.id(ff.featureId(id));
return Stream.of(
WorkspaceInfo.class,
NamespaceInfo.class,
StoreInfo.class,
ResourceInfo.class,
LayerInfo.class,
LayerGroupInfo.class,
StyleInfo.class,
MapInfo.class)
.map(type -> get(type, filter))
.filter(Objects::nonNull)
.map(CatalogInfo.class::cast)
.findFirst();
return getFacade().get(id);
}
@Override
@ -695,7 +708,7 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
// try optimizing by querying by id first, defer to regular filter query if
// filter is not and Id filter
return getIdIfIdFilter(filter) //
.map(id -> findOneById(id, type)) //
.flatMap(id -> findById(id, type)) //
.or(() -> findOne(type, filter)) //
.orElse(null);
}
@ -713,29 +726,8 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
}
}
private <T extends CatalogInfo> T findOneById(String id, Class<T> type) {
final ClassMappings cm = classMapping(type).orElseThrow();
return switch (cm) {
case NAMESPACE -> type.cast(getNamespace(id));
case WORKSPACE -> type.cast(getWorkspace(id));
case STORE -> type.cast(getStore(id, StoreInfo.class));
case COVERAGESTORE -> type.cast(getCoverageStore(id));
case DATASTORE -> type.cast(getDataStore(id));
case WMSSTORE -> type.cast(getStore(id, WMSStoreInfo.class));
case WMTSSTORE -> type.cast(getStore(id, WMTSStoreInfo.class));
case RESOURCE -> type.cast(getResource(id, ResourceInfo.class));
case COVERAGE -> type.cast(getCoverage(id));
case FEATURETYPE -> type.cast(getFeatureType(id));
case WMSLAYER -> type.cast(getResource(id, WMSLayerInfo.class));
case WMTSLAYER -> type.cast(getResource(id, WMTSLayerInfo.class));
case LAYER -> type.cast(getLayer(id));
case LAYERGROUP -> type.cast(getLayerGroup(id));
case PUBLISHED -> type.cast(
Optional.<PublishedInfo>ofNullable(getLayer(id))
.orElseGet(() -> getLayerGroup(id)));
case STYLE -> type.cast(getStyle(id));
default -> throw new IllegalArgumentException("Unexpected value: %s".formatted(cm));
};
public <T extends CatalogInfo> Optional<T> findById(String id, Class<T> type) {
return getFacade().get(id, type);
}
private Optional<String> getIdIfIdFilter(Filter filter) {
@ -847,18 +839,23 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
}
}
protected <T extends CatalogInfo> void doRemove(T object, Consumer<T> remover) {
validationSupport.beforeRemove(object);
CatalogOpContext<T> context = new CatalogOpContext<>(this, object);
protected <T extends CatalogInfo> void doRemove(T object, Class<T> type) {
// proceed only if found, avoid no-op events
getFacade().get(object.getId(), type).ifPresent(this::doRemove);
}
private <T extends CatalogInfo> void doRemove(T found) {
validationSupport.beforeRemove(found);
CatalogOpContext<T> context = new CatalogOpContext<>(this, found);
businessRules.onBeforeRemove(context);
try {
remover.accept(object);
// fire the event before the post-rules are processed, since they may result in
// other objects removed/modified, and hence avoid a secondary event to be
// notified before the primary one. For example, a post-rule may result in a call to
getFacade().remove(ModificationProxy.unwrap(found));
// fire the event before the post-rules are processed, since they may result in other
// objects removed/modified, and hence avoid a secondary event to be notified before the
// primary one. For example, a post-rule may result in a call to
// setDefaultWorspace/Namespace/DataStore
fireRemoved(object);
fireRemoved(found);
businessRules.onRemoved(context);
} catch (RuntimeException error) {
businessRules.onRemoved(context.setError(error));
@ -866,13 +863,17 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
}
}
protected void setId(CatalogInfo o) {
protected void setId(@NonNull CatalogInfo o) {
if (null == o.getId()) {
String uid = UUID.randomUUID().toString();
String id = "%s-%s".formatted(o.getClass().getSimpleName(), uid);
String type =
ClassMappings.fromImpl(ModificationProxy.unwrap(o).getClass())
.getInterface()
.getSimpleName();
Ulid ulid = UlidCreator.getMonotonicUlid();
String id = "%s-%s".formatted(type, ulid);
OwsUtils.set(o, "id", id);
} else if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Using user provided id %s".formatted(o.getId()));
} else if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Using provided id %s".formatted(o.getId()));
}
}
@ -880,23 +881,4 @@ public class CatalogPlugin extends CatalogImpl implements Catalog {
private <T> List<T> asList(@Nullable T value) {
return Collections.singletonList(value);
}
public void save(CatalogInfo info) {
doSave(info);
}
public void remove(@NonNull CatalogInfo info) {
switch (info) {
case WorkspaceInfo ws -> remove(ws);
case NamespaceInfo ns -> remove(ns);
case StoreInfo st -> remove(st);
case ResourceInfo r -> remove(r);
case LayerInfo l -> remove(l);
case LayerGroupInfo lg -> remove(lg);
case StyleInfo s -> remove(s);
case MapInfo m -> remove(m);
default -> throw new IllegalArgumentException(
"Unexpected value: %s".formatted(ModificationProxy.unwrap(info).getClass()));
}
}
}

View File

@ -4,23 +4,30 @@
*/
package org.geoserver.catalog.plugin;
import lombok.NonNull;
import org.geoserver.catalog.CatalogFacade;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.LayerGroupInfo;
import org.geoserver.catalog.LayerInfo;
import org.geoserver.catalog.MapInfo;
import org.geoserver.catalog.NamespaceInfo;
import org.geoserver.catalog.PublishedInfo;
import org.geoserver.catalog.ResourceInfo;
import org.geoserver.catalog.StoreInfo;
import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.impl.ClassMappings;
import org.geoserver.catalog.util.CloseableIterator;
import org.geoserver.catalog.util.CloseableIteratorAdapter;
import org.geotools.api.filter.Filter;
import org.geotools.api.filter.sort.SortBy;
import java.io.Closeable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
@ -33,6 +40,105 @@ import javax.annotation.Nullable;
*/
public interface ExtendedCatalogFacade extends CatalogFacade {
default <T extends CatalogInfo> void forEach(Consumer<? super CatalogInfo> consumer) {
List<Class<? extends CatalogInfo>> types =
List.of(
WorkspaceInfo.class,
NamespaceInfo.class,
StoreInfo.class,
ResourceInfo.class,
StyleInfo.class,
LayerInfo.class,
LayerGroupInfo.class);
for (var type : types) {
try (var stream = query(Query.valueOf(type, Filter.INCLUDE))) {
stream.forEach(consumer);
}
}
}
default Optional<CatalogInfo> get(@NonNull String id) {
CatalogInfo found = getWorkspace(id);
if (null == found) found = getNamespace(id);
if (null == found) found = getStore(id, StoreInfo.class);
if (null == found) found = getResource(id, ResourceInfo.class);
if (null == found) found = getPublished(id);
if (null == found) found = getStyle(id);
if (null == found) found = getMap(id);
return Optional.ofNullable(found);
}
default <T extends CatalogInfo> Optional<T> get(@NonNull String id, @NonNull Class<T> type) {
if (!type.isInterface()) {
throw new IllegalArgumentException("Expected an interface type, got " + type);
}
if (CatalogInfo.class.equals(type)) {
return get(id).map(type::cast);
}
ClassMappings cm = ClassMappings.fromInterface(type);
CatalogInfo found =
switch (cm) {
case WORKSPACE -> getWorkspace(id);
case NAMESPACE -> getNamespace(id);
case DATASTORE, COVERAGESTORE, WMSSTORE, WMTSSTORE, STORE -> getStore(
id, StoreInfo.class);
case FEATURETYPE, COVERAGE, WMSLAYER, WMTSLAYER, RESOURCE -> getResource(
id, ResourceInfo.class);
case LAYER -> getLayer(id);
case LAYERGROUP -> getLayerGroup(id);
case PUBLISHED -> getPublished(id);
case STYLE -> getStyle(id);
case MAP -> getMap(id);
default -> throw new IllegalArgumentException(
"Unknown CatalogInfo type " + type);
};
return Optional.ofNullable(found).filter(type::isInstance).map(type::cast);
}
default PublishedInfo getPublished(@NonNull String id) {
return get(id, LayerInfo.class)
.map(PublishedInfo.class::cast)
.or(() -> get(id, LayerGroupInfo.class))
.orElse(null);
}
@SuppressWarnings("unchecked")
default <T extends CatalogInfo> T add(@NonNull T info) {
return switch (info) {
case WorkspaceInfo ws -> (T) add(ws);
case NamespaceInfo ns -> (T) add(ns);
case StoreInfo st -> (T) add(st);
case ResourceInfo r -> (T) add(r);
case LayerInfo l -> (T) add(l);
case LayerGroupInfo lg -> (T) add(lg);
case StyleInfo s -> (T) add(s);
case MapInfo m -> (T) add(m);
default -> throw new IllegalArgumentException("Unexpected value: %s".formatted(info));
};
}
default void remove(@NonNull CatalogInfo info) {
switch (info) {
case WorkspaceInfo ws -> remove(ws);
case NamespaceInfo ns -> remove(ns);
case StoreInfo st -> remove(st);
case ResourceInfo r -> remove(r);
case LayerInfo l -> remove(l);
case LayerGroupInfo lg -> remove(lg);
case StyleInfo s -> remove(s);
case MapInfo m -> remove(m);
default -> throw new IllegalArgumentException("Unexpected value: %s".formatted(info));
}
;
}
<I extends CatalogInfo> I update(I info, Patch patch);
/**

View File

@ -179,6 +179,7 @@ public class RepositoryCatalogFacadeImpl
@Override
public <T extends ResourceInfo> T getResourceByName(
NamespaceInfo namespace, String name, Class<T> clazz) {
if (namespace == null) return null;
Optional<T> result;
if (namespace == ANY_NAMESPACE) {
result = getResourceRepository().findFirstByName(name, clazz);

View File

@ -4,8 +4,6 @@
*/
package org.geoserver.catalog.plugin.forwarding;
import lombok.Getter;
import org.geoserver.catalog.Catalog;
import org.geoserver.catalog.CatalogCapabilities;
import org.geoserver.catalog.CatalogFacade;
@ -37,7 +35,7 @@ import javax.annotation.Nullable;
public class ForwardingCatalogFacade implements CatalogFacade {
// wrapped catalog facade
@Getter protected final CatalogFacade facade;
protected final CatalogFacade facade;
public ForwardingCatalogFacade(CatalogFacade facade) {
this.facade = facade;

View File

@ -4,6 +4,8 @@
*/
package org.geoserver.catalog.plugin.forwarding;
import lombok.NonNull;
import org.geoserver.catalog.CatalogFacade;
import org.geoserver.catalog.CatalogInfo;
import org.geoserver.catalog.plugin.ExtendedCatalogFacade;
@ -33,4 +35,14 @@ public class ForwardingExtendedCatalogFacade extends ForwardingCatalogFacade
protected ExtendedCatalogFacade asExtendedFacade() {
return (ExtendedCatalogFacade) super.facade;
}
@Override
public <T extends CatalogInfo> T add(@NonNull T info) {
return asExtendedFacade().add(info);
}
@Override
public void remove(@NonNull CatalogInfo info) {
asExtendedFacade().remove(info);
}
}

View File

@ -31,6 +31,7 @@ import org.geotools.api.filter.Filter;
import org.geotools.api.filter.sort.SortBy;
import java.util.List;
import java.util.Objects;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
@ -326,6 +327,11 @@ public class ResolvingCatalogFacadeDecorator extends ForwardingExtendedCatalogFa
super.setDefaultWorkspace(resolveInbound(workspace));
}
@Override
public void setDefaultDataStore(WorkspaceInfo workspace, DataStoreInfo store) {
super.setDefaultDataStore(resolveInbound(workspace), resolveInbound(store));
}
@Override
public WorkspaceInfo getWorkspace(String id) {
return resolveOutbound(super.getWorkspace(id));
@ -506,6 +512,6 @@ public class ResolvingCatalogFacadeDecorator extends ForwardingExtendedCatalogFa
@Override
public <T extends CatalogInfo> Stream<T> query(Query<T> query) {
return super.query(query).map(this::resolveOutbound).filter(i -> i != null);
return super.query(query).map(this::resolveOutbound).filter(Objects::nonNull);
}
}

View File

@ -21,7 +21,6 @@ import org.geoserver.catalog.StoreInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.plugin.CatalogPlugin;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
/**
@ -120,9 +119,9 @@ public class LockingCatalog extends CatalogPlugin {
}
/** {@inheritDoc} */
protected @Override <T extends CatalogInfo> void doRemove(T info, Consumer<T> remover) {
protected @Override <T extends CatalogInfo> void doRemove(T info, Class<T> type) {
locking.runInWriteLock(
() -> super.doRemove(info, remover),
() -> super.doRemove(info, type),
format("remove(%s[%s])", typeOf(info), nameOf(info)));
}

View File

@ -48,7 +48,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
/** */
@Slf4j
@Slf4j(topic = "org.geoserver.catalog.plugin.resolving")
@RequiredArgsConstructor
public class ProxyUtils {
/**

View File

@ -19,6 +19,7 @@ import org.geoserver.catalog.StoreInfo;
import org.geoserver.catalog.StyleInfo;
import org.geoserver.catalog.WorkspaceInfo;
import org.geoserver.catalog.impl.ResolvingProxy;
import org.geoserver.catalog.plugin.Patch;
import org.geoserver.catalog.plugin.forwarding.ResolvingCatalogFacadeDecorator;
import org.geoserver.config.ServiceInfo;
import org.geoserver.config.SettingsInfo;
@ -48,14 +49,15 @@ import java.util.stream.Collectors;
*
* @see ResolvingProxy
*/
@Slf4j
public class ResolvingProxyResolver<T extends Info> implements UnaryOperator<T> {
@Slf4j(topic = "org.geoserver.catalog.plugin.resolving")
public class ResolvingProxyResolver<T> implements UnaryOperator<T> {
private final Supplier<Catalog> catalog;
private final BiConsumer<CatalogInfo, ResolvingProxy> onNotFound;
private final ProxyUtils proxyUtils;
public ResolvingProxyResolver(@NonNull Catalog catalog) {
private BiConsumer<CatalogInfo, ResolvingProxy> onNotFound;
private ResolvingProxyResolver(@NonNull Catalog catalog) {
this(
catalog,
(info, proxy) ->
@ -64,12 +66,12 @@ public class ResolvingProxyResolver<T extends Info> implements UnaryOperator<T>
.formatted(info.getId())));
}
public ResolvingProxyResolver(
private ResolvingProxyResolver(
@NonNull Catalog catalog, @NonNull BiConsumer<CatalogInfo, ResolvingProxy> onNotFound) {
this(() -> catalog, onNotFound);
}
public ResolvingProxyResolver(
private ResolvingProxyResolver(
@NonNull Supplier<Catalog> catalog,
@NonNull BiConsumer<CatalogInfo, ResolvingProxy> onNotFound) {
this.catalog = catalog;
@ -108,29 +110,40 @@ public class ResolvingProxyResolver<T extends Info> implements UnaryOperator<T>
return (ResolvingProxyResolver<I>) new MemoizingProxyResolver(catalog, onNotFound);
}
public ResolvingProxyResolver<T> onNotFound(
BiConsumer<CatalogInfo, ResolvingProxy> onNotFound) {
this.onNotFound = onNotFound;
return this;
}
@Override
public T apply(T info) {
return resolve(info);
}
@SuppressWarnings("unchecked")
public <I extends Info> I resolve(final I orig) {
public <I> I resolve(final I orig) {
if (orig == null) {
return null;
}
final ResolvingProxy resolvingProxy = getResolvingProxy(orig);
final boolean isResolvingProxy = null != resolvingProxy;
if (isResolvingProxy) {
// may the object itself be a resolving proxy
I resolved = doResolveProxy(orig);
if (resolved == null && orig instanceof CatalogInfo cinfo) {
log.info("Proxy object {} not found, calling on-not-found consumer", orig.getId());
onNotFound.accept(cinfo, resolvingProxy);
// return the proxied value if the consumer didn't throw an exception
return orig;
if (orig instanceof Info info) {
final ResolvingProxy resolvingProxy = getResolvingProxy(info);
final boolean isResolvingProxy = null != resolvingProxy;
if (isResolvingProxy) {
// may the object itself be a resolving proxy
Info resolved = doResolveProxy(info);
if (resolved == null && info instanceof CatalogInfo cinfo) {
log.debug(
"Proxy object {} not found, calling on-not-found consumer",
info.getId());
onNotFound.accept(cinfo, resolvingProxy);
// if onNotFound didn't throw an exception, return the proxied value if the
// consumer didn't throw an exception
return orig;
}
return (I) resolved;
}
return resolved;
}
if (orig instanceof StyleInfo style) return (I) resolveInternal(style);
@ -145,9 +158,15 @@ public class ResolvingProxyResolver<T extends Info> implements UnaryOperator<T>
if (orig instanceof ServiceInfo service) return (I) resolveInternal(service);
if (orig instanceof Patch patch) return (I) resolveInternal(patch);
return orig;
}
private Patch resolveInternal(Patch patch) {
return proxyUtils.resolve(patch);
}
protected <I extends Info> I doResolveProxy(final I orig) {
return proxyUtils.resolve(orig);
}

View File

@ -594,7 +594,7 @@ public class CatalogFaker {
return s;
}
public AttributionInfo attributionInfo() throws Exception {
public AttributionInfo attributionInfo() {
AttributionInfoImpl attinfo = new AttributionInfoImpl();
attinfo.setId(faker.idNumber().valid());
attinfo.setHref(faker.company().url());

View File

@ -1145,7 +1145,8 @@ public abstract class CatalogConformanceTest {
catalog.remove(data.dataStoreA);
assertEquals(1, l.removed.size());
assertEquals(data.dataStoreA, l.removed.get(0).getSource());
// comparing id, DataStoreInfoImpl.equals() is screwed up
assertEquals(data.dataStoreA.getId(), l.removed.get(0).getSource().getId());
}
@Test

View File

@ -112,6 +112,11 @@
<version>1.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.f4b6a3</groupId>
<artifactId>ulid-creator</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>org.geoserver</groupId>
<artifactId>gs-platform</artifactId>