Skip to content

Commit

Permalink
[backend] Fix size computation
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-julien committed Oct 8, 2024
1 parent 421d11c commit e504e70
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions opencti-platform/opencti-graphql/src/manager/ingestionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const asArray = (data: unknown) => {
interface UpdateInfo {
state?: any
buffering?: boolean
messages_number?: number
messages_size?: number
}
const updateBuiltInConnectorInfo = async (context: AuthContext, user_id: string | undefined, id: string, opts: UpdateInfo = {}) => {
// Patch the related connector
Expand All @@ -70,7 +70,7 @@ const updateBuiltInConnectorInfo = async (context: AuthContext, user_id: string
run_and_terminate: false,
buffering: opts.buffering ?? false,
queue_threshold: 0,
queue_messages_size: opts.messages_number ?? 0
queue_messages_size: (opts.messages_size ?? 0) / 1000000 // In Mb
},
connector_user_id: user_id,
};
Expand Down Expand Up @@ -249,7 +249,7 @@ const rssExecutor = async (context: AuthContext, turndownService: TurndownServic
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
// If ingestion have remaining messages in the queue, dont fetch any new data
const { messages_number } = await queueDetails(connectorIdFromIngestId(ingestion.id));
const { messages_number, messages_size } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0) {
const ingestionPromise = rssDataHandler(context, httpGet, turndownService, ingestion)
.catch((e) => {
Expand All @@ -259,7 +259,7 @@ const rssExecutor = async (context: AuthContext, turndownService: TurndownServic
ingestionPromises.push(ingestionPromise);
} else {

Check warning on line 260 in opencti-platform/opencti-graphql/src/manager/ingestionManager.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/ingestionManager.ts#L252-L260

Added lines #L252 - L260 were not covered by tests
// Update the state
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_number });
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);
}

Check warning on line 264 in opencti-platform/opencti-graphql/src/manager/ingestionManager.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/ingestionManager.ts#L262-L264

Added lines #L262 - L264 were not covered by tests
}
Expand Down Expand Up @@ -390,7 +390,7 @@ const taxiiExecutor = async (context: AuthContext) => {
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
// If ingestion have remaining messages in the queue, dont fetch any new data
const { messages_number } = await queueDetails(connectorIdFromIngestId(ingestion.id));
const { messages_number, messages_size } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0) {
const taxiiHandler = TAXII_HANDLERS[ingestion.version];
if (!taxiiHandler) {
Expand All @@ -404,7 +404,7 @@ const taxiiExecutor = async (context: AuthContext) => {
ingestionPromises.push(ingestionPromise);
} else {

Check warning on line 405 in opencti-platform/opencti-graphql/src/manager/ingestionManager.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/ingestionManager.ts#L393-L405

Added lines #L393 - L405 were not covered by tests
// Update the state
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_number });
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);

Check warning on line 408 in opencti-platform/opencti-graphql/src/manager/ingestionManager.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/ingestionManager.ts#L407-L408

Added lines #L407 - L408 were not covered by tests
}
}
Expand Down Expand Up @@ -473,7 +473,7 @@ const csvExecutor = async (context: AuthContext) => {
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
// If ingestion have remaining messages in the queue, dont fetch any new data
const { messages_number } = await queueDetails(connectorIdFromIngestId(ingestion.id));
const { messages_number, messages_size } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0) {
const ingestionPromise = csvDataHandler(context, ingestion)
.catch((e) => {
Expand All @@ -483,7 +483,7 @@ const csvExecutor = async (context: AuthContext) => {
ingestionPromises.push(ingestionPromise);
} else {

Check warning on line 484 in opencti-platform/opencti-graphql/src/manager/ingestionManager.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/ingestionManager.ts#L476-L484

Added lines #L476 - L484 were not covered by tests
// Update the state
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_number });
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_size });
ingestionPromises.push(ingestionPromise);
}

Check warning on line 488 in opencti-platform/opencti-graphql/src/manager/ingestionManager.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/manager/ingestionManager.ts#L486-L488

Added lines #L486 - L488 were not covered by tests
}
Expand Down

0 comments on commit e504e70

Please sign in to comment.