AWS SQS, SNS, and Kafka
Credits Educative.io
Here we demo some message queue and pub/sub platforms: - AWS SQS - AWS SNS - Kafka
We first create a standard SQS, then we send a message to it, and print message ID. Then we create AWS SNS which is great for prototyping and smaller projects. Finally, we demo Kafka which is fast, large scale industry standard.
AWS SQS
We first create a standard SQS, then we send a message to it, and print message ID. Note that a consumer must delete messages in the queue after processing them (this process is not done automatically by SQS). They disappear temporary but reappear after expiration time. Send a message via python:
import json
import boto3
# Create an SQS client
= boto3.client('sqs',region_name='us-east-1',aws_access_key_id="XYZ",
client = "XYZ")
aws_secret_access_key
# URL of the queue
= 'https://sqs.us-east-1.amazonaws.com/XYZ/MyQueue'
queue_url
# Send message to SQS queue
= client.send_message(
response =queue_url,
QueueUrl={
MessageAttributes'Subject': {
'DataType': 'String',
'StringValue': 'Sample Message'
},
},=(
MessageBody'This is a sample message'
)
)
print(response['MessageId'])
Delete all messages:
= client.purge_queue(
delete_response =queueURL
QueueUrl )
SNS
Lambda will be publisher (and will trigger it using EventBridge scheduler), and we’ll have two subscribers, SQS queue and an email. Next IAM role (add SNSPublishManagedPolicy
to the IAM role and assign to Lambda).
import json
import boto3
def lambda_handler(event, context):
= boto3.client('sns')
client = "<SNS Topic ARN Here>" # looks like this: arn:aws:sns:us-east-1:<XYZ>:ReminderTopic
Topic_ARN = client.publish (
response_one = Topic_ARN,
TargetArn = json.dumps({'Reminder-Type': "Weekly Reminder", 'Reminder': "Reminder 1", 'Destination': "Email" }),
Message
)= client.publish (
response_two = Topic_ARN,
TargetArn = json.dumps({'Reminder-Type': "Weekly Reminder", 'Reminder': "Reminder 2", 'Destination': "Email"}),
Message
)= client.publish (
response_three = Topic_ARN,
TargetArn = json.dumps({'Reminder-Type': "Daily Reminder", 'Reminder': "Reminder 3", 'Destination': "SQS"}),
Message
)return {
'statusCode': 200,
'body': json.dumps({'response_one': response_one, 'response_two': response_two, 'response_three': response_three })
}
Publish
By default, only the owner of the SNS topic can publish messages to the topic, so we need to change SNS access policy:
{
"Version": "2012-10-17",
"Id": "ReminderID",
"Statement": [
{
"Sid": "SNS topic policy",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "SNS:Publish",
"Resource": "<Provide Lambda Function ARN>", # arn:aws:lambda:us-east-1:XYZ:function:ReminderFunction
"Condition": {
"StringEquals": {
"aws:SourceAccount": "<Provide Account ID without Dashes>"
}
}
}
]
}
Subscribe
We will subscribe SQS and Email. Add their ARNs under SNS subscribers tab.
Schedule EventBridge to invoke Lambda
We can add a trigger to set messages regularly:
datetime | cron |
---|---|
cron(minutes hours “day of the month” month “day of the week” year) | |
27th December, 2023. AT 5:00 PM | cron(0 17 27 DEC ? 2023) |
Every 30 minutes on every Sunday | cron(30 * ? * SUN *) |
Every Monday to Friday, At 8:30 PM | cron(30 20 ? * MON-FRI *) |
Apache Kafka and Apache Zookeeper
We first download kafka and zookeeper project:
Edit server.properties
file to have this line active: advertised.listeners=PLAINTEXT://localhost:9092
Zookeeper is a naming registry that is used in distribution systems for service synchronization.
In Kafka, the Zookeeper is responsible for:
- Managing
- Tracking the status of the Kafka cluster’s nodes, topics, and messages
We need to run the Zookeeper shell script and pass our properties to start our Zookeeper server.
./zookeeper-server-start.sh ../config/zookeeper.properties
Next we start Kafka server too:
./kafka-server-start.sh ../config/server.properties
Next we create a topic:
./kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
Next we make a producer that will publish to a topic:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
and we create consumer that will be able to receive message:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
That’s it. Any message that is now sent via producer to the test-topic will be received by a subscriber to the test-topic.