Our main target is to develop a system for sending large numbers of emails and then tracking users’ interaction with a received mail (e.g. opens, link clicks, unsubscribes, etc.) Users with large amounts of daily sends prefer use scheduling for sending use.
Why we need scheduling
Using scheduling, instead of send-immediately flows, gives us several advantages.
- Better control for sending process
- If a user needs to send emails after his workday is finished, then he won’t be able to monitor the sending process and fix them if something goes wrong. With scheduling, users can prepare all emails for sending before the actual send date, and our ESP (Email Service Provider) handles the rest.Less resources required
- If you already have content for emails that you need to send (for example tomorrow), you are able to send the emails gradually. For example, if you need to send 5M emails at 8 PM you can start sending at 10 AM and continue sending in small batches the whole day. We collect the emails from our side and cover the delivery in time.No personal CRON.
If you have a notification email that needs to be sent after some time (for example, a subscription reminder), just send the notification to our ESP and we cover the scheduling. You don’t need to handle that case with personal Cron scripts.
Problems that we faced when building scheduling
We propose pretty simple API for working with scheduling, but it’s not easy to organize simultaneously sending even 1M emails. We needed to think about scalability and our system in the future. We had to solve several problems.
- Proper storage for scheduled emails
- Limited throughput of the network for one process
- Split scheduled emails that were ready to send for different workers
- Syncing statuses of scheduled emails and avoiding duplication on send
Proper storage for scheduled emails
We should able to filter “ready for send” emails as fast as possible among large amounts of records that don’t need to send yet. At the time we already used ClickHouse database as storage for our analytics events (unsubscribe, clicks, open, etc.).
Clickhouse is a column-oriented database that is used mostly for analytics. But with the read/write throughput that ClickHouse provides for us we are able to use the solution as scheduling storage.
The most valuable storage requirements for us were the following:
- Local and distributed joins
- Data compression
- Parallel and distributed query execution
- Cross-datacenter replication
- High availability
- SQL support
- Fetch speed not less than 500K email records per second.
We made some performance tests and got results that completely satisfied our requirements:
- Read speed is 1.2 GB/s
- Write speed is 50 to 200 MB/s
- Finding 335K scheduled emails on a dataset with the size of 5M takes 0.061 second
As a result, with network latency for transfers, we were able to fetch 3.5M emails per second. That was a very good result for us.
Limited throughput of the network for one process
We resolved the problem of choosing our data storage and then faced another issue. We used NodeJS as our main platform, the platform, together with SQS queues, didn’t provide the desired performance for us. Then we wrote simple prototypes for testing using Golang and Elixir (we hoped that Erlang with its lightweight threads helps us) we got some throughput increases, but still weren’t reaching the desired results and scalable solutions.
Results of our benchmark’s queue inserting time for 50K messages to queue was as follows:
- NodeJS 1 thread = 95.408 sec
- NodeJS 2 thread = 75.757 sec
- Go = 35.564 sec
Elixir was about 2 times slower than NodeJS with 2 threads and the same kind of problem. So obviously, we needed to find ways to scale horizontally.
Split scheduled emails that are ready to send for different workers
Obviously, we needed to make our scheduling scalable and found a way to split all scheduled emails for workers. Workers should have the same role and know nothing about each other. It’s allowing us to run more workers on more servers when we need to process the largest amount of emails.
When workers are fully independent from each other, we decided to create one more component in our system.
The component is named Ranger and is responsible for splitting ranges for our workers. Each range shouldn’t intersect with other ranges.
With Ranger and Clickhouse working together, we created the following architecture for our scheduling feature:
- Emails send to REST API, and if a parameter with a future send date exists, API puts the email to Messaging Queue.
- Write to the consumer processing the Queue and save all emails inside Clickhouse database.
- Ranger runs query for fetching emails and the query contains the following steps: Records filtered by send date and each time we fetch all records that are ready in that moment. Filter records by time-created range, we need only those that weren’t affected by the previous step. For that, we fetch only records that have a date greater than biggest date from previous execution. Aggregate results by CreatedTime and COUNT function.
- Set first fetched date as DateFrom for query.
- Wait for full batch size and set last date as DateTo.
- Update Date to as last processed range.
- Send range to Queue for processing by worker.
- Consumer pulls a Queue with ranges and fetches by CreatedTime emails for sending to user.
- Each email sends to target user.
AWS SQS service promises not less than one delivery for a Standard Queue. this means that sometimes we can obtain one message on two workers and send our email twice. Since each message can contain multiple emails (our current batch size contains 5K) we need to organize duplicate handling.
For that, we have the Redis database and we store each SQS messageID as a Redis key. The keys have an expiration date that's equal to message visibility timeout in the Queue. When a consumer obtains a new message from SQS, he checks for an existing messageID in Redis. If the message already exists, the consumer skips this one.
Using ClickHouse and AWS Simple Queue Service, we build a scalable scheduling system for delivering emails. These flows and tools can be applied to any actions that could be scheduled, not just for emails.
At the moment, we can fetch 335K records per second, filtering by ready date takes 0.1 seconds for 5M database set (including network latency), and workers that process the emails can easily scale horizontally for the desired throughput.