Skip to content

Commit

Permalink
Merge pull request #208 from cisagov/SQS-fixes
Browse files Browse the repository at this point in the history
Clean-up SQS code and fix visibility timeout
  • Loading branch information
schmelz21 authored Apr 30, 2024
2 parents 708a7ba + ee574c4 commit 555f630
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 159 deletions.
12 changes: 2 additions & 10 deletions backend/env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ staging:
STAGE: staging
PE_FARGATE_CLUSTER_NAME: pe-staging-worker
PE_FARGATE_TASK_DEFINITION_NAME: pe-staging-worker
SHODAN_QUEUE_URL: ${ssm:/crossfeed/staging/SHODAN_QUEUE_URL}
DNSTWIST_QUEUE_URL: ${ssm:/crossfeed/staging/DNSTWIST_QUEUE_URL}
HIBP_QUEUE_URL: ${ssm:/crossfeed/staging/HIBP_QUEUE_URL}
INTELX_QUEUE_URL: ${ssm:/crossfeed/staging/INTELX_QUEUE_URL}
CYBERSIXGILL_QUEUE_URL: ${ssm:/crossfeed/staging/CYBERSIXGILL_QUEUE_URL}
QUEUE_URL: ${ssm:/crossfeed/staging/QUEUE_URL}
EMAIL_BUCKET_NAME: cisa-crossfeed-staging-html-email

prod:
Expand Down Expand Up @@ -105,11 +101,7 @@ prod:
STAGE: prod
PE_FARGATE_CLUSTER_NAME: pe-prod-worker
PE_FARGATE_TASK_DEFINITION_NAME: pe-prod-worker
SHODAN_QUEUE_URL: ${ssm:/crossfeed/prod/SHODAN_QUEUE_URL}
DNSTWIST_QUEUE_URL: ${ssm:/crossfeed/prod/DNSTWIST_QUEUE_URL}
HIBP_QUEUE_URL: ${ssm:/crossfeed/prod/HIBP_QUEUE_URL}
INTELX_QUEUE_URL: ${ssm:/crossfeed/prod/INTELX_QUEUE_URL}
CYBERSIXGILL_QUEUE_URL: ${ssm:/crossfeed/prod/CYBERSIXGILL_QUEUE_URL}
QUEUE_URL: ${ssm:/crossfeed/staging/QUEUE_URL}
EMAIL_BUCKET_NAME: cisa-crossfeed-staging-html-email

dev-vpc:
Expand Down
3 changes: 1 addition & 2 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,10 @@
"lint:fix": "eslint '**/*.{ts,tsx,js,jsx}' --fix",
"pesyncdb": "docker-compose exec -T backend npx ts-node src/tools/run-pesyncdb.ts",
"scan-exec": "docker-compose exec -T backend npx ts-node src/tools/run-scanExecution.ts",
"send-message": "node sendMessage.js",
"syncdb": "docker-compose exec -T backend npx ts-node src/tools/run-syncdb.ts",
"syncmdl": "docker-compose exec -T backend npx ts-node src/tools/run-syncmdl.ts",
"test": "jest --detectOpenHandles",
"test-python": "pytest"
},
"version": "1.0.0"
}
}
32 changes: 0 additions & 32 deletions backend/sendMessage.js

This file was deleted.

10 changes: 5 additions & 5 deletions backend/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,35 +112,35 @@ resources:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:provider.stage}-shodan-queue
VisibilityTimeout: 300
VisibilityTimeout: 18000 # 5 hours
MaximumMessageSize: 262144 # 256 KB
MessageRetentionPeriod: 604800 # 7 days
DnstwistQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:provider.stage}-dnstwist-queue
VisibilityTimeout: 300
VisibilityTimeout: 18000 # 5 hours
MaximumMessageSize: 262144 # 256 KB
MessageRetentionPeriod: 604800 # 7 days
HibpQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:provider.stage}-hibp-queue
VisibilityTimeout: 300
VisibilityTimeout: 18000 # 5 hours
MaximumMessageSize: 262144 # 256 KB
MessageRetentionPeriod: 604800 # 7 days
IntelxQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:provider.stage}-intelx-queue
VisibilityTimeout: 300
VisibilityTimeout: 18000 # 5 hours
MaximumMessageSize: 262144 # 256 KB
MessageRetentionPeriod: 604800 # 7 days
CybersixgillQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:provider.stage}-cybersixgill-queue
VisibilityTimeout: 300
VisibilityTimeout: 18000 # 5 hours
MaximumMessageSize: 262144 # 256 KB
MessageRetentionPeriod: 604800 # 7 days

Expand Down
3 changes: 3 additions & 0 deletions backend/src/tasks/pesyncdb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6576,6 +6576,9 @@ VALUES ('WhoisXML', 'DNS lookpus', '2022-03-14');
INSERT INTO public.data_source(name, description, last_run)
VALUES ('findomain', 'Domain enumerator', '2022-03-14');
INSERT INTO public.data_source(name, description, last_run)
VALUES ('IntelX', 'Credentials', '2022-03-14');
INSERT INTO public.data_source(name, description, last_run)
VALUES ('Sublist3r', 'Domain Permutations', '2022-03-14');
Expand Down
93 changes: 58 additions & 35 deletions backend/src/tasks/scanExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { integer } from 'aws-sdk/clients/cloudfront';

const ecs = new AWS.ECS();
let docker: any;
const QUEUE_URL = process.env.QUEUE_URL!;
const SCAN_LIST = ['dnstwist', 'hibp', 'intelx', 'cybersixgill', 'shodan'];

if (process.env.IS_LOCAL) {
const Docker = require('dockerode');
Expand All @@ -15,19 +17,33 @@ const toSnakeCase = (input) => input.replace(/ /g, '-');
async function startDesiredTasks(
scanType: string,
desiredCount: integer,
queueUrl: string
shodanApiKeyList: string[] = []
) {
const queueUrl = QUEUE_URL + `${scanType}-queue`;
try {
// ECS can only start 10 tasks at a time. Split up into batches
const batchSize = 10;
let batchSize = 10;
if (scanType == 'shodan') {
batchSize = 1;
}
let remainingCount = desiredCount;
while (remainingCount > 0) {
let shodan_api_key = '';
if (shodanApiKeyList.length > 0) {
shodan_api_key = shodanApiKeyList[remainingCount - 1];
}
const currentBatchCount = Math.min(remainingCount, batchSize);

if (process.env.IS_LOCAL) {
// If running locally, use RabbitMQ and Docker instead of SQS and ECS
console.log('Starting local containers');
await startLocalContainers(currentBatchCount, scanType, queueUrl);
console.log(queueUrl);
await startLocalContainers(
currentBatchCount,
scanType,
queueUrl,
shodan_api_key
);
} else {
await ecs
.runTask({
Expand Down Expand Up @@ -55,6 +71,10 @@ async function startDesiredTasks(
{
name: 'SERVICE_QUEUE_URL',
value: queueUrl
},
{
name: 'PE_SHODAN_API_KEYS',
value: shodan_api_key
}
]
}
Expand All @@ -75,7 +95,8 @@ async function startDesiredTasks(
async function startLocalContainers(
count: number,
scanType: string,
queueUrl: string
queueUrl: string,
shodan_api_key: string = ''
) {
// Start 'count' number of local Docker containers
for (let i = 0; i < count; i++) {
Expand Down Expand Up @@ -117,7 +138,7 @@ async function startLocalContainers(
`SIXGILL_CLIENT_ID=${process.env.SIXGILL_CLIENT_ID}`,
`SIXGILL_CLIENT_SECRET=${process.env.SIXGILL_CLIENT_SECRET}`,
`INTELX_API_KEY=${process.env.INTELX_API_KEY}`,
`PE_SHODAN_API_KEYS=${process.env.PE_SHODAN_API_KEYS}`,
`PE_SHODAN_API_KEYS=${shodan_api_key}`,
`WORKER_SIGNATURE_PUBLIC_KEY=${process.env.WORKER_SIGNATURE_PUBLIC_KEY}`,
`WORKER_SIGNATURE_PRIVATE_KEY=${process.env.WORKER_SIGNATURE_PRIVATE_KEY}`,
`ELASTICSEARCH_ENDPOINT=${process.env.ELASTICSEARCH_ENDPOINT}`,
Expand All @@ -141,13 +162,16 @@ async function startLocalContainers(
export const handler: Handler = async (event) => {
let desiredCount: integer;
let scanType: string;

// Check if desired count was passed
if (event.desiredCount) {
desiredCount = event.desiredCount;
} else {
console.log('Desired count not found. Setting to 1.');
desiredCount = 1;
}

// Check if scan type was passed
if (event.scanType) {
scanType = event.scanType;
} else {
Expand All @@ -156,39 +180,38 @@ export const handler: Handler = async (event) => {
}

try {
// If scanType is shodan, check if API keys were passed and split up
if (scanType === 'shodan') {
await startDesiredTasks(
scanType,
desiredCount,
process.env.SHODAN_QUEUE_URL!
);
} else if (scanType === 'dnstwist') {
await startDesiredTasks(
scanType,
desiredCount,
process.env.DNSTWIST_QUEUE_URL!
);
} else if (scanType === 'hibp') {
await startDesiredTasks(
scanType,
desiredCount,
process.env.HIBP_QUEUE_URL!
);
} else if (scanType === 'intelx') {
await startDesiredTasks(
scanType,
desiredCount,
process.env.INTELX_QUEUE_URL!
);
} else if (scanType === 'cybersixgill') {
await startDesiredTasks(
scanType,
desiredCount,
process.env.CYBERSIXGILL_QUEUE_URL!
);
let shodanApiKeyList: string[];
if (event.apiKeyList) {
shodanApiKeyList = event.apiKeyList
.split(',')
.map((value) => value.trim());
} else {
console.error(
'apiKeyList must be provided for shodan and be a comma-separated string'
);
return 'Failed no apiKeyList';
}
// Check if there are enough keys for the desired number of tasks
if (shodanApiKeyList.length >= desiredCount) {
console.log(
'The number of API keys is greater than or equal to desiredCount.'
);
} else {
console.error(
'The number of API keys is less than desired Fargate tasks.'
);
return 'Failed no apiKeyList';
}
await startDesiredTasks(scanType, desiredCount, shodanApiKeyList);

// Run the rest of the scans normally
} else if (SCAN_LIST.includes(scanType)) {
await startDesiredTasks(scanType, desiredCount);
} else {
console.log(
'Shodan, DNSTwist, HIBP, and Cybersixgill are the only script types available right now.'
'Shodan, DNSTwist, HIBP, IntelX, and Cybersixgill are the only script types available right now. Must be all lowercase.'
);
}
} catch (error) {
Expand Down
44 changes: 41 additions & 3 deletions backend/src/tools/run-scanExecution.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,48 @@
// Script to execute the scanExecution function
import { handler as scanExecution } from '../tasks/scanExecution';
const amqp = require('amqplib');

async function localScanExecution() {
async function localScanExecution(scan_type, desired_count, apiKeyList = '') {
console.log('Starting...');
const payload = { scanType: 'dnstwist', desiredCount: 3 };
const payload = {
scanType: scan_type,
desiredCount: desired_count,
apiKeyList: apiKeyList
};
scanExecution(payload, {} as any, () => null);
}

localScanExecution();
async function sendMessageToQueue(message, queue) {
const connection = await amqp.connect('amqp://rabbitmq');
const channel = await connection.createChannel();

await channel.assertQueue(queue, { durable: true });

// Simulate sending a message to the queue
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true
});

console.log('Message sent:', message);

setTimeout(() => {
connection.close();
}, 500);
}

// Simulate sending a message
const SCAN_TYPE = 'dnstwist';
const DESIRED_COUNT = 1;
const ORG_LIST = ['DHS', 'DOI'];
const QUEUE = `staging-${SCAN_TYPE}-queue`;
const API_KEY_LIST = '';

for (const org of ORG_LIST) {
const message = {
scriptType: SCAN_TYPE,
org: org
};
sendMessageToQueue(message, QUEUE);
}

localScanExecution(SCAN_TYPE, DESIRED_COUNT, API_KEY_LIST);
4 changes: 2 additions & 2 deletions backend/worker/pe-worker-entry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ while true; do
MESSAGE=$(echo "$RESPONSE" | jq -r '.[0].payload')
MESSAGE=${MESSAGE//\\\"/\"}
echo "MESSAGE: $MESSAGE"

else
echo "Running live SQS logic..."
MESSAGE=$(aws sqs receive-message --queue-url "$SERVICE_QUEUE_URL" --output json --max-number-of-messages 1)
echo "MESSAGE: $MESSAGE"
fi


# Check if there are no more messages. If no more, then exit Fargate container
if [ -z "$MESSAGE" ] || [ "$MESSAGE" == "null" ]; then
echo "No more messages in the queue. Exiting."
Expand Down Expand Up @@ -72,7 +72,7 @@ while true; do

# Run the pe-source command
eval "$COMMAND" \
&& cat /app/pe_reports_logging.log
&& cat /app/pe_reports_logging.log > /app/pe_reports_logging.log

# Delete the processed message from the queue
if [ "$IS_LOCAL" = true ]; then
Expand Down
6 changes: 1 addition & 5 deletions dev.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,8 @@ PE_DB_USERNAME=pe
PE_DB_PASSWORD=password


SHODAN_QUEUE_URL =shodanQueue
QUEUE_URL=staging-
PE_SHODAN_API_KEYS=
DNSTWIST_QUEUE_URL=dnstwistQueue
HIBP_QUEUE_URL=hibpQueue
INTELX_QUEUE_URL=intelxQueue
CYBERSIXGILL_QUEUE_URL=cybersixgillQueue

PE_FARGATE_CLUSTER_NAME=pe-staging-worker
PE_FARGATE_TASK_DEFINITION_NAME=pe-staging-worker
Loading

0 comments on commit 555f630

Please sign in to comment.