Skip to content

Commit

Permalink
Generate and publish ClickHouse node configs
Browse files Browse the repository at this point in the history
  • Loading branch information
charW committed Aug 15, 2024
1 parent 1d9674e commit 117b2fb
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 1 deletion.
5 changes: 5 additions & 0 deletions orion-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<artifactId>route53</artifactId>
<version>2.17.273</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.17.273</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.pinterest.orion.core.actions.aws;

import java.util.Map;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

public class S3Utils {

public static S3Client getDefaultS3Client() {
return S3Client.builder().build();
}

public static void putObject(
S3Client s3,
String bucket,
String key,
byte[] object,
Map<String, String> metadata) throws S3Exception {
PutObjectRequest putReq = PutObjectRequest.builder()
.bucket(bucket)
.key(key)
.metadata(metadata)
.build();

s3.putObject(putReq, RequestBody.fromBytes(object));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package com.pinterest.orion.core.actions.clickhouse;

import java.util.logging.Logger;
import java.util.Map;
import java.util.List;
import java.util.HashMap;

import java.io.File;
import java.io.StringWriter;

import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.OutputKeys;

import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import org.w3c.dom.Element;

import com.pinterest.orion.core.actions.Action;
import com.pinterest.orion.core.actions.ActionEngine;
import com.pinterest.orion.core.actions.generic.GenericClusterWideAction;
import com.pinterest.orion.core.Cluster;
import com.pinterest.orion.core.Node;
import com.pinterest.orion.core.clickhouse.ClickHouseNodeInfo;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
import com.pinterest.orion.core.actions.aws.S3Utils;

import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.S3Client;


public class PublishAllNodeConfigAction extends GenericClusterWideAction.ClusterAction {
private static final Logger logger =
Logger.getLogger(PublishAllNodeConfigAction.class.getCanonicalName());

private static final String SERVERS_TAG = "remote_servers";
private static final String REPLICA_TAG = "replica";
private static final String SHARD_TAG = "shard";
private static final String HOST_TAG = "host";
private static final String PORT_TAG = "port";

private static final String SHARD_PLACEHOLDER = "${SHARD}";
private static final String REPLICA_PLACEHOLDER = "${REPLICA}";

private Element getClusterSection(Document config, String cluster) {
NodeList clusters = config.getElementsByTagName(cluster);
if (clusters.getLength() == 0) {
Element serversSection =
(Element)(config.getElementsByTagName(SERVERS_TAG).item(0));
Element clusterSection = config.createElement(cluster);
serversSection.appendChild(clusterSection);
clusters = config.getElementsByTagName(cluster);
}
return (Element)(clusters.item(0));
}

private Element getShardSection(
Document config, Element clusterSection, int shardNum) {
NodeList shards = clusterSection.getElementsByTagName(SHARD_TAG);
int shardsLength = shards.getLength();
for (int i = 0; i < shardNum - shardsLength; ++i) {
Element shardSection = config.createElement(SHARD_TAG);
clusterSection.appendChild(shardSection);
}

// assuming shard numbers start from 1
Element shardSection = (Element)
((clusterSection.getElementsByTagName(SHARD_TAG)).item(shardNum-1));
return shardSection;
}

private Element getReplicaSection(Document config, String host, int port) {
Element hostSection = config.createElement(HOST_TAG);
hostSection.appendChild(config.createTextNode(host));

Element portSection = config.createElement(PORT_TAG);
portSection.appendChild(config.createTextNode(Integer.toString(port)));

Element replicaSection = config.createElement(REPLICA_TAG);
replicaSection.appendChild(hostSection);
replicaSection.appendChild(portSection);
return replicaSection;
}

private void addReplicaToConfig(
Document config,
int shardNum,
int replicaNum,
String host,
int port,
String cluster) throws Exception {
Element clusterSection = getClusterSection(config, cluster);
Element shardSection = getShardSection(config, clusterSection, shardNum);
Element replicaSection = getReplicaSection(config, host, port);
shardSection.appendChild(replicaSection);
}

private String getConfigString(Document config) throws Exception {
TransformerFactory transformerFactory = TransformerFactory.newInstance();
transformerFactory.setAttribute("indent-number", 4);
Transformer transformer = transformerFactory.newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, "yes");

DOMSource input = new DOMSource(config);
StringWriter sw = new StringWriter();
transformer.transform(input, new StreamResult(sw));
return sw.toString();
}

@Override
public void runAction() throws Exception {
ActionEngine engine = getEngine();
ClickHouseCluster cluster = (ClickHouseCluster)engine.getCluster();
Map<String, Node> nodeMap = cluster.getNodeMap();

String configTemplatePath =
cluster.getAttribute(cluster.CONFIG_TEMPLATE_PATH).getValue();
String configS3Bucket =
cluster.getAttribute(cluster.CONFIG_S3_BUCKET).getValue();

File configTemplate = new File(configTemplatePath);
DocumentBuilder docBuilder =
DocumentBuilderFactory.newInstance().newDocumentBuilder();
Document config = docBuilder.parse(configTemplate);
config.getDocumentElement().normalize();

for (Node node : nodeMap.values()) {
ClickHouseNodeInfo nodeInfo = (ClickHouseNodeInfo)node.getCurrentNodeInfo();
String hostname = nodeInfo.getHostname();
int port = nodeInfo.getServicePort();
List<String> logicalClusters = nodeInfo.getLogicalClusters();
if (logicalClusters.isEmpty()) {
markFailed("Did not find any clusters for node " + hostname);
return;
}
// right now, assume there is only one logical cluster and all
// nodes belong to that
String clusterName = logicalClusters.get(0);
int shardNum = nodeInfo.getShardNum(clusterName);
int replicaNum = nodeInfo.getReplicaNum(clusterName);

// add this node to the config as a replica under the right shard
addReplicaToConfig(config, shardNum, replicaNum, hostname, port, clusterName);
}

String configStr = getConfigString(config);
S3Client s3 = S3Utils.getDefaultS3Client();
for (Node node : nodeMap.values()) {
ClickHouseNodeInfo nodeInfo = (ClickHouseNodeInfo)node.getCurrentNodeInfo();
String clusterName = nodeInfo.getLogicalClusters().get(0);
int shardNum = nodeInfo.getShardNum(clusterName);
int replicaNum = nodeInfo.getReplicaNum(clusterName);

// for the config of each node, sub in the shard and replica number for that node
String nodeConfig = configStr.replace(SHARD_PLACEHOLDER, Integer.toString(shardNum))
.replace(REPLICA_PLACEHOLDER, Integer.toString(replicaNum));

String hostname = nodeInfo.getHostname();
String key = hostname + "/config.xml";
try {
logger.info("Pushing updated config to S3 for node " + hostname);
S3Utils.putObject(
s3,
configS3Bucket,
key,
nodeConfig.getBytes(),
new HashMap<String, String>());
} catch (S3Exception e) {
markFailed("Pushing config to S3 for node " + hostname + " failed: " + e);
}
}
s3.close();
}

@Override
public String getName() {
return "PublishAllNodeConfigAction";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.pinterest.orion.core.automation.operator.clickhouse;

import com.pinterest.orion.core.Cluster;
import com.pinterest.orion.core.automation.operator.Operator;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;

public abstract class ClickHouseOperator extends Operator {

@Override
public final void operate(Cluster cluster) throws Exception {
if(cluster instanceof ClickHouseCluster){
operate((ClickHouseCluster) cluster);
}
}

public abstract void operate(ClickHouseCluster cluster) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.pinterest.orion.core.automation.operator.clickhouse;

import java.util.logging.Logger;

import com.pinterest.orion.core.Cluster;
import com.pinterest.orion.core.automation.operator.Operator;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
import com.pinterest.orion.core.actions.clickhouse.PublishAllNodeConfigAction;

public class PublishConfigOperator extends ClickHouseOperator {
private static final Logger logger =
Logger.getLogger(PublishConfigOperator.class.getCanonicalName());

@Override
public void operate(ClickHouseCluster cluster) throws Exception {
logger.info("Initializing PublishAllNodeConfigAction for ClickHouse cluster...");
PublishAllNodeConfigAction action = new PublishAllNodeConfigAction();
dispatch(action);
}

@Override
public String getName() {
return "PublishConfigOperator";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.pinterest.orion.core.PluginConfigurationException;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
import com.pinterest.orion.core.clickhouse.ClickHouseNodeInfo;
import com.pinterest.orion.core.actions.clickhouse.PublishAllNodeConfigAction;

public class ClickHouseClusterSensor extends ClickHouseSensor {

public static final String CLUSTER_COL = "cluster";
public static final String SHARD_NUM_COL = "shard_num";
public static final String SHARD_WEIGHT_COL = "shard_weight";
public static final String REPLICA_NUM_COL = "replica_num";
public static final String PORT_COL = "port";
public static final String HOST_NAME_COL = "host_name";

public static final String CLUSTERS_QUERY = "SELECT * FROM system.clusters WHERE is_local=1";
Expand Down Expand Up @@ -95,9 +97,11 @@ private void queryShardReplicaInfo(
int shard = r.getValue(SHARD_NUM_COL).asInteger();
int shardWeight = r.getValue(SHARD_WEIGHT_COL).asInteger();
int replicaNum = r.getValue(REPLICA_NUM_COL).asInteger();
int servicePort = r.getValue(PORT_COL).asInteger();
String hostName = r.getValue(HOST_NAME_COL).asString();

nodeInfo.setHostname(hostName);
nodeInfo.setServicePort(servicePort);
nodeInfo.addShardReplicaInfo(cluster, shard, shardWeight, replicaNum);
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ClickHouseCluster extends Cluster {
private static final String DEFAULT_PASSWORD = "";

public static final String SERVERSET_PATH = "serversetPath";
public static final String CONFIG_S3_BUCKET = "configS3Bucket";
public static final String CONFIG_TEMPLATE_PATH = "configTemplatePath";

private Map<String, Object> config;

Expand All @@ -56,6 +58,8 @@ protected void bootstrapClusterInfo(Map<String, Object> config) throws PluginCon
setAttribute(USER, config.getOrDefault(USER, DEFAULT_USER));
setAttribute(PASSWORD, config.getOrDefault(PASSWORD, DEFAULT_PASSWORD));
setAttribute(SERVERSET_PATH, config.get(SERVERSET_PATH));
setAttribute(CONFIG_S3_BUCKET, config.get(CONFIG_S3_BUCKET));
setAttribute(CONFIG_TEMPLATE_PATH, config.get(CONFIG_TEMPLATE_PATH));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import java.io.Serializable;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;

class ShardReplicaInfo {
public int shardNum;
Expand All @@ -30,6 +32,10 @@ public void addShardReplicaInfo(String cluster, int shardNum, int shardWeight, i
infoByCluster.put(cluster, new ShardReplicaInfo(shardNum, shardWeight, replicaNum));
}

public List<String> getLogicalClusters() {
return new ArrayList<String>(infoByCluster.keySet());
}

public int getShardNum(String cluster) {
return infoByCluster.get(cluster).shardNum;
}
Expand Down
20 changes: 19 additions & 1 deletion orion-server/src/test/resources/configs/clickhouse-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,27 @@ clusterConfigs:
configuration:
serversetPath: /opt/orion-server/discovery.testclickhouse.test

clusterConfigs:
- clusterId: testclickhouse
type: clickhouse
configuration:
serversetPath: /opt/orion-server/discovery.testclickhouse.test
configS3Bucket: test_config_bucket
configTemplatePath: /opt/orion-server/conf/config.xml

plugins:
sensorConfigs:
- key: clusterSensor
class: com.pinterest.orion.core.automation.sensor.clickhouse.ClickHouseClusterSensor
interval: 60
enabled: true
enabled: true

actionConfigs:
- key: publishAllNodeConfig
class: com.pinterest.orion.core.actions.clickhouse.PublishAllNodeConfigAction
enabled: true

operatorConfigs:
- key: publishConfigOperator
class: com.pinterest.orion.core.automation.operator.clickhouse.PublishConfigOperator
enabled: true

0 comments on commit 117b2fb

Please sign in to comment.