Skip to content

Commit

Permalink
ImlCopy: using write_feature
Browse files Browse the repository at this point in the history
Signed-off-by: qGYdXbY2 <47661341+qGYdXbY2@users.noreply.github.com>
  • Loading branch information
qGYdXbY2 committed Oct 23, 2024
1 parent a0fbf3a commit 4bb9647
Show file tree
Hide file tree
Showing 9 changed files with 762 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.here.xyz.hub.task.SpaceConnectorBasedHandler;
import com.here.xyz.psql.query.IterateChangesets;
import com.here.xyz.responses.ChangesetsStatisticsResponse;
import com.here.xyz.responses.XyzResponse;
import com.here.xyz.responses.changesets.Changeset;
import com.here.xyz.responses.changesets.ChangesetCollection;
import com.here.xyz.util.service.HttpException;
Expand Down Expand Up @@ -80,14 +81,15 @@ private void getChangeset(RoutingContext context) {
long version = getVersionFromPathParam(context);
IterateChangesetsEvent event = buildIterateChangesetsEvent(context, version, version);
//TODO: Add static caching to this endpoint, once the execution pipelines have been refactored.
SpaceConnectorBasedHandler.execute(getMarker(context),
SpaceConnectorBasedHandler.<IterateChangesetsEvent,ChangesetCollection>execute(getMarker(context),
space -> Authorization.authorizeManageSpacesRights(context, space.getId(), space.getOwner()).map(space), event)
.onSuccess(result -> {
ChangesetCollection changesets = (ChangesetCollection) result;
if (changesets.getVersions().isEmpty())
sendErrorResponse(context, new HttpException(NOT_FOUND, "No changeset was found for version " + version));
else
sendResponse(context, changesets.getVersions().get(version).withNextPageToken(changesets.getNextPageToken()));
ChangesetCollection changesets = (ChangesetCollection) result;
if (changesets.getVersions().isEmpty())
sendErrorResponse(context, new HttpException(NOT_FOUND, "No changeset was found for version " + version));
else
sendResponse(context, changesets.getVersions().get(version).withNextPageToken(changesets.getNextPageToken()));

})
.onFailure(t -> sendErrorResponse(context, t));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.here.xyz.jobs.Job;
import com.here.xyz.jobs.steps.compiler.ExportToFiles;
import com.here.xyz.jobs.steps.compiler.CopySpaceToSpace;
import com.here.xyz.jobs.steps.compiler.ImportFromFiles;
import com.here.xyz.jobs.steps.compiler.JobCompilationInterceptor;
import com.here.xyz.util.Async;
Expand All @@ -40,7 +41,8 @@ public class JobCompiler {
static {
registerCompilationInterceptor(ImportFromFiles.class);
registerCompilationInterceptor(ExportToFiles.class);
//registerCompilationInterceptor(CopySpaceToSpace.class);
//registerCompilationInterceptor(_CopySpaceToSpace.class);
registerCompilationInterceptor(CopySpaceToSpace.class);
}

public Future<StepGraph> compile(Job job) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2017-2024 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

package com.here.xyz.jobs.steps.compiler;

import com.here.xyz.jobs.Job;
import com.here.xyz.jobs.datasets.DatasetDescription;
import com.here.xyz.jobs.datasets.filters.Filters;
import com.here.xyz.jobs.datasets.filters.SpatialFilter;
import com.here.xyz.jobs.steps.CompilationStepGraph;
import com.here.xyz.jobs.steps.impl.transport._CopySpace;
import com.here.xyz.models.hub.Ref;

public class _CopySpaceToSpace implements JobCompilationInterceptor {

@Override
public boolean chooseMe(Job job) {
return job.getSource() instanceof DatasetDescription.Space && job.getTarget() instanceof DatasetDescription.Space;
}

@Override
public CompilationStepGraph compile(Job job) {
final String sourceSpaceId = job.getSource().getKey();
final String targetSpaceId = job.getTarget().getKey();

Filters filters = ((DatasetDescription.Space) job.getSource()).getFilters();
Ref versionRef = ((DatasetDescription.Space) job.getSource()).getVersionRef();

_CopySpace copySpaceStep = new _CopySpace()
.withSpaceId(sourceSpaceId)
.withTargetSpaceId(targetSpaceId)
.withSourceVersionRef(versionRef);

if(filters != null) {
//filters.context is not supported
//TODO: work with propertiesQueryObject
//copySpaceStep.setPropertyFilter(filters.getPropertyFilter());

SpatialFilter spatialFilter = filters.getSpatialFilter();
if (spatialFilter != null) {
copySpaceStep.setGeometry(spatialFilter.getGeometry());
copySpaceStep.setRadius(spatialFilter.getRadius());
copySpaceStep.setClipOnFilterGeometry(spatialFilter.isClip());
}
}

return (CompilationStepGraph) new CompilationStepGraph()
.addExecution(copySpaceStep);
}
}
15 changes: 0 additions & 15 deletions xyz-jobs/xyz-job-service/src/main/resources/openapi-recipes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,3 @@ recipes:
- type: value
path: servers.0.url
replace: ${SERVER_URL}
- type: key
path: paths.'/spaces/{spaceId}/jobs'
replace: paths.'/hub/spaces/{spaceId}/jobs'
- type: key
path: paths.'/spaces/{spaceId}/jobs/{jobId}'
replace: paths.'/hub/spaces/{spaceId}/jobs/{jobId}'
- type: key
path: paths.'/spaces/{spaceId}/jobs/{jobId}/inputs'
replace: paths.'/hub/spaces/{spaceId}/jobs/{jobId}/inputs'
- type: key
path: paths.'/spaces/{spaceId}/jobs/{jobId}/outputs'
replace: paths.'/hub/spaces/{spaceId}/jobs/{jobId}/outputs'
- type: key
path: paths.'/spaces/{spaceId}/jobs/{jobId}/status'
replace: paths.'/hub/spaces/{spaceId}/jobs/{jobId}/status'
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ private SQLQuery wrapQuery(SQLQuery stepQuery) {
.withQueryFragment("jobId", getJobId())
.withQueryFragment("stepId", getId())
.withQueryFragment("stepQuery", stepQuery)
.withContext(stepQuery.getContext())
.withQueryFragment("successCallback", buildSuccessCallbackQuery())
.withQueryFragment("failureCallback", buildFailureCallbackQuery());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
import com.here.xyz.jobs.steps.Config;
import com.here.xyz.jobs.steps.execution.db.Database;
import com.here.xyz.jobs.steps.execution.db.DatabaseBasedStep;
import com.here.xyz.jobs.steps.impl.transport.CopySpace;
import com.here.xyz.jobs.steps.impl.transport._CopySpace;
import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles;
import com.here.xyz.jobs.steps.impl.transport.CopySpace;
import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace;
import com.here.xyz.models.hub.Space;
import com.here.xyz.models.hub.Tag;
Expand All @@ -50,7 +51,8 @@
@JsonSubTypes.Type(value = DropIndexes.class),
@JsonSubTypes.Type(value = AnalyzeSpaceTable.class),
@JsonSubTypes.Type(value = MarkForMaintenance.class),
@JsonSubTypes.Type(value = CopySpace.class)
@JsonSubTypes.Type(value = _CopySpace.class),
@JsonSubTypes.Type(value = CopySpace.class),
})
public abstract class SpaceBasedStep<T extends SpaceBasedStep> extends DatabaseBasedStep<T> {
private static final Logger logger = LogManager.getLogger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import com.here.xyz.responses.StatisticsResponse;
import com.here.xyz.util.db.SQLQuery;
import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException;
import com.here.xyz.util.web.XyzWebClient.WebClientException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -91,8 +93,6 @@ public class CopySpace extends SpaceBasedStep<CopySpace> {
@JsonView({Internal.class, Static.class})
private int estimatedSeconds = -1;

private final int PSEUDO_NEXT_VERSION = 0;

//Existing Space in which we copy to
private String targetSpaceId;

Expand Down Expand Up @@ -283,10 +283,10 @@ protected void onAsyncSuccess() throws WebClientException,
logger.info( "Loading space config for target-space " + getTargetSpaceId());
Space targetSpace = loadSpace(getTargetSpaceId());
logger.info("Getting storage database for space "+getSpaceId());
Database db = loadDatabase(targetSpace.getStorage().getId(), WRITER);
// Database db = loadDatabase(targetSpace.getStorage().getId(), WRITER);

//@TODO: Add ACU calculation
runWriteQueryAsync(buildCopySpaceNextVersionUpdate(getSchema(db), getRootTableName(targetSpace)), db, 0, false);
// runWriteQueryAsync(buildCopySpaceNextVersionUpdate(getSchema(db), getRootTableName(targetSpace)), db, 0, false);
}

@Override
Expand All @@ -302,65 +302,31 @@ public void resume() throws Exception {
logger.info("Copy - onAsyncSuccess");
}

private SQLQuery buildCopySpaceQuery(String schema, String sourceTableName, Space sourceSpace, String targetTableName,
boolean isEnableHashedSpaceIdActivated, boolean targetVersioningEnabled)
throws SQLException {
private SQLQuery buildCopySpaceQuery(String schema, String sourceTableName, Space sourceSpace, String targetTableName, boolean isEnableHashedSpaceIdActivated, boolean targetVersioningEnabled) throws SQLException {

final Map<String, Object> queryContext = new HashMap<>(Map.of(
"schema", schema,
"table", targetTableName,
"context", "'DEFAULT'" ,
"historyEnabled", targetVersioningEnabled
));

return new SQLQuery(
"""
WITH ins_data as
(INSERT INTO ${schema}.${table} (jsondata, operation, author, geo, id, version, next_version )
SELECT idata.jsondata, CASE WHEN idata.operation in ('I', 'U') THEN (CASE WHEN edata.id isnull THEN 'I' ELSE 'U' END) ELSE idata.operation END AS operation, idata.author, idata.geo, idata.id,
(SELECT nextval('${schema}.${versionSequenceName}')) AS version,
CASE WHEN edata.id isnull THEN max_bigint() ELSE ${{pseudoNextVersion}} END as next_version
FROM
(
select write_feature( (idata.jsondata || jsonb_build_object('geometry',st_asgeojson(idata.geo)::json))::text, idata.author, null, null , null, null, false, (SELECT nextval('${schema}.${versionSequenceName}')), false)
from
(${{contentQuery}} ) idata
LEFT JOIN ${schema}.${table} edata ON (idata.id = edata.id AND edata.next_version = max_bigint())
RETURNING id, version, (coalesce(pg_column_size(jsondata),0) + coalesce(pg_column_size(geo),0))::bigint as bytes_size
),
upd_data as
(UPDATE ${schema}.${table}
SET next_version = (SELECT version FROM ins_data LIMIT 1)
WHERE ${{targetVersioningEnabled}}
AND next_version = max_bigint()
AND id IN (SELECT id FROM ins_data)
AND version < (SELECT version FROM ins_data LIMIT 1)
RETURNING id, version
),
del_data AS
(DELETE FROM ${schema}.${table}
WHERE not ${{targetVersioningEnabled}}
AND id IN (SELECT id FROM ins_data)
AND version < (SELECT version FROM ins_data LIMIT 1)
RETURNING id, version
)
--SELECT count(1) AS rows_uploaded, sum(bytes_size)::BIGINT AS bytes_uploaded, 0::BIGINT AS files_uploaded,
-- (SELECT count(1) FROM upd_data) AS version_updated,
-- (SELECT count(1) FROM del_data) AS version_deleted
--FROM ins_data l
SELECT 1 INTO dummy_output FROM del_data
"""
).withVariable("schema", schema)
.withVariable("table", targetTableName)
.withQueryFragment("targetVersioningEnabled", "" + targetVersioningEnabled)
.withVariable("versionSequenceName", sourceTableName + "_version_seq")
.withQueryFragment("pseudoNextVersion", PSEUDO_NEXT_VERSION + "" )
.withQueryFragment("contentQuery", buildCopyContentQuery(sourceSpace, isEnableHashedSpaceIdActivated));
}

private SQLQuery buildCopySpaceNextVersionUpdate(String schema, String targetTableName) throws SQLException {
/* adjust next_version to max_bigint(), except in case of concurrency set it to concurrent inserted version */
//TODO: case of extern concurency && same id in source & target && non-versiond layer a duplicate id can occure with next_version = concurrent_inserted.version
return new SQLQuery(
"""
UPDATE
${schema}.${table} t
set next_version = coalesce(( select version from ${schema}.${table} i where i.id = t.id and i.next_version = max_bigint() ), max_bigint())
where
next_version = ${{pseudoNextVersion}}
select count(1) into dummy_output from ins_data
"""
).withVariable("schema", schema)
.withVariable("table", targetTableName)
.withQueryFragment("pseudoNextVersion", PSEUDO_NEXT_VERSION + "" );
)
.withVariable("schema", schema)
.withVariable("versionSequenceName", targetTableName + "_version_seq")
.withQueryFragment("contentQuery", buildCopyContentQuery(sourceSpace, isEnableHashedSpaceIdActivated))
.withContext( queryContext );

}

private SQLQuery buildCopyContentQuery(Space space, boolean isEnableHashedSpaceIdActivated) throws SQLException {
Expand Down
Loading

0 comments on commit 4bb9647

Please sign in to comment.