Skip to content

Commit

Permalink
Merge branch 'main' into tristanvuong-replace-separate-inserts-with-1…
Browse files Browse the repository at this point in the history
…-large-insert-per-table
  • Loading branch information
tristanvuong2021 authored Mar 8, 2024
2 parents 2c7a414 + c9e982c commit bd7fe76
Showing 1 changed file with 79 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1384,8 +1384,20 @@ class MetricsService(
}
}

grpcRequire(request.hasMetric()) { "Metric is not specified." }

val batchGetReportingSetsResponse =
batchGetInternalReportingSets(
parentKey.measurementConsumerId,
listOf(request.metric.reportingSet),
)

val internalCreateMetricRequest: InternalCreateMetricRequest =
buildInternalCreateMetricRequest(principal.resourceKey.measurementConsumerId, request)
buildInternalCreateMetricRequest(
principal.resourceKey.measurementConsumerId,
request,
batchGetReportingSetsResponse.reportingSetsList.first(),
)

val internalMetric =
try {
Expand Down Expand Up @@ -1446,9 +1458,48 @@ class MetricsService(
"Duplicate metric IDs in the request."
}

val internalCreateMetricRequestsList: List<InternalCreateMetricRequest> =
request.requestsList.map { createMetricRequest ->
buildInternalCreateMetricRequest(parentKey.measurementConsumerId, createMetricRequest)
val reportingSetNames =
request.requestsList
.map {
grpcRequire(it.hasMetric()) { "Metric is not specified." }

it.metric.reportingSet
}
.distinct()

val callRpc: suspend (List<String>) -> BatchGetReportingSetsResponse = { items ->
batchGetInternalReportingSets(parentKey.measurementConsumerId, items)
}

val reportingSetNameToInternalReportingSetMap: Map<String, InternalReportingSet> = buildMap {
submitBatchRequests(reportingSetNames.asFlow(), BATCH_GET_REPORTING_SETS_LIMIT, callRpc) {
response ->
response.reportingSetsList
}
.collect { reportingSetsList ->
for (reportingSet in reportingSetsList) {
putIfAbsent(
ReportingSetKey(parentKey.measurementConsumerId, reportingSet.externalReportingSetId)
.toName(),
reportingSet,
)
}
}
}

val internalCreateMetricRequestsList: List<Deferred<InternalCreateMetricRequest>> =
coroutineScope {
request.requestsList.map { createMetricRequest ->
async {
buildInternalCreateMetricRequest(
parentKey.measurementConsumerId,
createMetricRequest,
reportingSetNameToInternalReportingSetMap.getValue(
createMetricRequest.metric.reportingSet
),
)
}
}
}

val internalMetrics =
Expand All @@ -1457,7 +1508,7 @@ class MetricsService(
.batchCreateMetrics(
internalBatchCreateMetricsRequest {
cmmsMeasurementConsumerId = parentKey.measurementConsumerId
requests += internalCreateMetricRequestsList
requests += internalCreateMetricRequestsList.awaitAll()
}
)
.metricsList
Expand Down Expand Up @@ -1486,12 +1537,11 @@ class MetricsService(
}

/** Builds an [InternalCreateMetricRequest]. */
private suspend fun buildInternalCreateMetricRequest(
private fun buildInternalCreateMetricRequest(
cmmsMeasurementConsumerId: String,
request: CreateMetricRequest,
internalReportingSet: InternalReportingSet,
): InternalCreateMetricRequest {
grpcRequire(request.hasMetric()) { "Metric is not specified." }

grpcRequire(request.metricId.matches(RESOURCE_ID_REGEX)) { "Metric ID is invalid." }
grpcRequire(request.metric.reportingSet.isNotEmpty()) {
"Reporting set in metric is not specified."
Expand All @@ -1517,9 +1567,6 @@ class MetricsService(
}
grpcRequire(request.metric.hasMetricSpec()) { "Metric spec in metric is not specified." }

val internalReportingSet: InternalReportingSet =
getInternalReportingSet(cmmsMeasurementConsumerId, request.metric.reportingSet)

// Utilizes the property of the set expression compilation result -- If the set expression
// contains only union operators, the compilation result has to be a single component.
if (
Expand Down Expand Up @@ -1582,35 +1629,32 @@ class MetricsService(
}
}

/** Gets an [InternalReportingSet] based on a reporting set name. */
private suspend fun getInternalReportingSet(
/** Batch get [InternalReportingSet]s based on [ReportingSet] names. */
private suspend fun batchGetInternalReportingSets(
cmmsMeasurementConsumerId: String,
reportingSetName: String,
): InternalReportingSet {
val reportingSetKey =
grpcRequireNotNull(ReportingSetKey.fromName(reportingSetName)) {
"Invalid reporting set name $reportingSetName."
}
reportingSetNames: List<String>,
): BatchGetReportingSetsResponse {
val externalReportingSetIds: List<String> =
reportingSetNames.map {
val reportingSetKey =
grpcRequireNotNull(ReportingSetKey.fromName(it)) { "Invalid reporting set name $it." }

if (reportingSetKey.cmmsMeasurementConsumerId != cmmsMeasurementConsumerId) {
failGrpc(Status.PERMISSION_DENIED) { "No access to the reporting set [$it]." }
}

if (reportingSetKey.cmmsMeasurementConsumerId != cmmsMeasurementConsumerId) {
failGrpc(Status.PERMISSION_DENIED) { "No access to the reporting set [$reportingSetName]." }
}
reportingSetKey.reportingSetId
}

return try {
internalReportingSetsStub
.batchGetReportingSets(
batchGetReportingSetsRequest {
this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId
this.externalReportingSetIds += reportingSetKey.reportingSetId
}
)
.reportingSetsList
.first()
} catch (e: StatusException) {
throw Exception(
"Unable to retrieve ReportingSet using the provided name [$reportingSetName].",
e,
internalReportingSetsStub.batchGetReportingSets(
batchGetReportingSetsRequest {
this.cmmsMeasurementConsumerId = cmmsMeasurementConsumerId
this.externalReportingSetIds += externalReportingSetIds
}
)
} catch (e: StatusException) {
throw Exception("Unable to retrieve ReportingSets using the provided names.", e)
}
}

Expand Down

0 comments on commit bd7fe76

Please sign in to comment.