Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4931]Add Registry Module for Discovery AdminServer #4932

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.eventmesh.registry;

import lombok.Getter;

import java.util.List;

public class NotifyEvent {

public NotifyEvent(){

}

public NotifyEvent(List<RegisterServerInfo> instances) {
this(instances, false);
}

public NotifyEvent(List<RegisterServerInfo> instances, boolean isIncrement) {
this.isIncrement = isIncrement;
this.instances = instances;
}



// means whether it is an increment data
@Getter
private boolean isIncrement;

@Getter
private List<RegisterServerInfo> instances;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,17 @@ public void setMetadata(Map<String, String> metadata) {
public void addMetadata(String key, String value) {
this.metadata.put(key, value);
}

public void setExtFields(Map<String, Object> extFields) {
if (extFields == null) {
this.extFields.clear();
return;
}

this.extFields = extFields;
}

public void addExtFields(String key, Object value) {
this.extFields.put(key, value);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.eventmesh.registry;

import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;

import java.util.HashMap;
import java.util.Map;

@Slf4j
public class RegistryFactory {
private static final Map<String, RegistryService> META_CACHE = new HashMap<>(16);

public static RegistryService getInstance(String registryPluginType) {
return META_CACHE.computeIfAbsent(registryPluginType, RegistryFactory::registryBuilder);
}

private static RegistryService registryBuilder(String registryPluginType) {
RegistryService registryServiceExt = EventMeshExtensionFactory.getExtension(RegistryService.class, registryPluginType);
if (registryServiceExt == null) {
String errorMsg = "can't load the registry plugin, please check.";
log.error(errorMsg);
throw new RuntimeException(errorMsg);
}
log.info("build registry plugin [{}] by type [{}] success", registryServiceExt.getClass().getSimpleName(),
registryPluginType);
return registryServiceExt;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package org.apache.eventmesh.registry;

public interface RegistryListener {
void onChange(Object data);
void onChange(NotifyEvent event) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

@EventMeshSPI(eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
public interface RegistryService {
String ConfigurationKey = "registry";
void init() throws RegistryException;

void shutdown() throws RegistryException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.registry.NotifyEvent;
import org.apache.eventmesh.registry.QueryInstances;
import org.apache.eventmesh.registry.RegisterServerInfo;
import org.apache.eventmesh.registry.RegistryListener;
Expand All @@ -28,35 +30,43 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

@Slf4j
public class NacosDiscoveryService implements RegistryService {
private final AtomicBoolean initFlag = new AtomicBoolean(false);

private CommonConfiguration configuration;
private final AtomicBoolean initFlag = new AtomicBoolean(false);

private NacosRegistryConfiguration nacosConf;

private NamingService namingService;

private final Map<String, Map<RegistryListener, EventListener>> listeners = new HashMap<>();

private static final Executor notifyExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20), r -> {
Thread t = new Thread(r);
t.setName("org.apache.eventmesh.registry.nacos.executor");
t.setDaemon(true);
return t;
}, new ThreadPoolExecutor.DiscardOldestPolicy()
);

private final Lock lock = new ReentrantLock();
private static final String GROUP_NAME = "admin";


@Override
public void init() throws RegistryException {
if (!initFlag.compareAndSet(false, true)) {
return;
}
configuration = ConfigurationContextUtil.get(RegistryService.ConfigurationKey);
if (configuration == null ) {
throw new RegistryException("registry config instance is null");
}
nacosConf = ConfigService.getInstance().buildConfigInstance(NacosRegistryConfiguration.class);
if (nacosConf == null) {
log.info("nacos registry configuration is null");
Expand All @@ -73,12 +83,13 @@ public void init() throws RegistryException {

private Properties buildProperties() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, configuration.getRegistryAddr());
properties.setProperty(PropertyKeyConst.USERNAME, configuration.getEventMeshRegistryPluginUsername());
properties.setProperty(PropertyKeyConst.PASSWORD, configuration.getEventMeshRegistryPluginPassword());
if (nacosConf == null) {
return properties;
}
properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosConf.getRegistryAddr());
properties.setProperty(PropertyKeyConst.USERNAME, nacosConf.getEventMeshRegistryPluginUsername());
properties.setProperty(PropertyKeyConst.PASSWORD, nacosConf.getEventMeshRegistryPluginPassword());

String endpoint = nacosConf.getEndpoint();
if (Objects.nonNull(endpoint) && endpoint.contains(":")) {
int index = endpoint.indexOf(":");
Expand All @@ -87,7 +98,8 @@ private Properties buildProperties() {
} else {
Optional.ofNullable(endpoint).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT, endpoint));
String endpointPort = nacosConf.getEndpointPort();
Optional.ofNullable(endpointPort).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT_PORT, endpointPort));
Optional.ofNullable(endpointPort).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT_PORT,
endpointPort));
}
String accessKey = nacosConf.getAccessKey();
Optional.ofNullable(accessKey).ifPresent(value -> properties.put(PropertyKeyConst.ACCESS_KEY, accessKey));
Expand All @@ -96,7 +108,8 @@ private Properties buildProperties() {
String clusterName = nacosConf.getClusterName();
Optional.ofNullable(clusterName).ifPresent(value -> properties.put(PropertyKeyConst.CLUSTER_NAME, clusterName));
String logFileName = nacosConf.getLogFileName();
Optional.ofNullable(logFileName).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME, logFileName));
Optional.ofNullable(logFileName).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME,
logFileName));
String logLevel = nacosConf.getLogLevel();
Optional.ofNullable(logLevel).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_LEVEL, logLevel));
Integer pollingThreadCount = nacosConf.getPollingThreadCount();
Expand All @@ -122,19 +135,54 @@ public void subscribe(RegistryListener listener, String serviceName) {
lock.lock();
try {
ServiceInfo serviceInfo = ServiceInfo.fromKey(serviceName);
Map<RegistryListener, EventListener> eventListenerMap = listeners.computeIfAbsent(serviceName, k -> new HashMap<>());
Map<RegistryListener, EventListener> eventListenerMap = listeners.computeIfAbsent(serviceName,
k -> new HashMap<>());
if (eventListenerMap.containsKey(listener)) {
log.warn("already use same listener subscribe service name {}" ,serviceName);
log.warn("already use same listener subscribe service name {}", serviceName);
return;
}
EventListener eventListener = listener::onChange;
List<String> clusters ;
EventListener eventListener = new AbstractEventListener() {
@Override
public Executor getExecutor() {
return notifyExecutor;
}

@Override
public void onEvent(Event event) {
if (!(event instanceof NamingEvent)) {
log.warn("received notify event type isn't not as expected");
return;
}
try {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
List<RegisterServerInfo> list = new ArrayList<>();
if (instances != null) {
for (Instance instance : instances) {
RegisterServerInfo info = new RegisterServerInfo();
info.setAddress(instance.getIp() + ":" + instance.getPort());
info.setMetadata(instance.getMetadata());
info.setHealth(instance.isHealthy());
info.setServiceName(
ServiceInfo.getKey(NamingUtils.getGroupedName(namingEvent.getServiceName(),
namingEvent.getGroupName()),
namingEvent.getClusters()));
list.add(info);
}
}
listener.onChange(new NotifyEvent(list));
} catch (Exception e) {
log.warn("");
}
}
};
List<String> clusters;
if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) {
clusters = new ArrayList<>();
} else {
clusters = Arrays.stream(serviceInfo.getClusters().split(",")).collect(Collectors.toList());
}
namingService.subscribe(serviceInfo.getName(),serviceInfo.getGroupName(), clusters, eventListener);
namingService.subscribe(serviceInfo.getName(), serviceInfo.getGroupName(), clusters, eventListener);
eventListenerMap.put(listener, eventListener);
} catch (Exception e) {
log.error("subscribe service name {} fail", serviceName, e);
Expand All @@ -152,7 +200,7 @@ public void unsubscribe(RegistryListener registryListener, String serviceName) {
if (map == null) {
return;
}
List<String> clusters ;
List<String> clusters;
if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) {
clusters = new ArrayList<>();
} else {
Expand All @@ -177,14 +225,18 @@ public List<RegisterServerInfo> selectInstances(QueryInstances queryInstances) {
if (StringUtils.isNotBlank(serviceInfo.getClusters())) {
clusters.addAll(Arrays.asList(serviceInfo.getClusters().split(",")));
}
List<Instance> instances = namingService.selectInstances(serviceInfo.getName(), serviceInfo.getGroupName(), clusters, queryInstances.isHealth());
List<Instance> instances = namingService.selectInstances(serviceInfo.getName(),
serviceInfo.getGroupName(), clusters,
queryInstances.isHealth());
if (instances != null) {
instances.forEach(x -> {
RegisterServerInfo instanceInfo = new RegisterServerInfo();
instanceInfo.setMetadata(x.getMetadata());
instanceInfo.setHealth(x.isHealthy());
instanceInfo.setAddress(x.getIp() + ":" + x.getPort());
instanceInfo.setServiceName(ServiceInfo.getKey(NamingUtils.getGroupedName(x.getServiceName(), serviceInfo.getGroupName()), x.getClusterName()));
instanceInfo.setServiceName(
ServiceInfo.getKey(NamingUtils.getGroupedName(x.getServiceName(),
serviceInfo.getGroupName()), x.getClusterName()));
list.add(instanceInfo);
});
}
Expand Down Expand Up @@ -228,7 +280,9 @@ public boolean unRegister(RegisterServerInfo eventMeshRegisterInfo) throws Regis
return false;
}
ServiceInfo serviceInfo = ServiceInfo.fromKey(eventMeshRegisterInfo.getServiceName());
namingService.deregisterInstance(serviceInfo.getName(), serviceInfo.getGroupName(), ipPort[0], Integer.parseInt(ipPort[1]), serviceInfo.getClusters());
namingService.deregisterInstance(serviceInfo.getName(), serviceInfo.getGroupName(), ipPort[0],
Integer.parseInt(ipPort[1]),
serviceInfo.getClusters());
return true;
} catch (Exception e) {
log.error("unregister instance service {} fail", eventMeshRegisterInfo, e);
Expand Down
Loading
Loading