Lädt...


📚 DynamoDB Go SDK: How to Use the Scan and Batch Operations Efficiently


Nachrichtenbereich: 🔧 AI Nachrichten
🔗 Quelle: towardsdatascience.com

Parallel scan with the DynamoDB Go SDK (image by author)

Learn with practical code examples

The DynamoDB Scan API accesses every items in a table (or secondary index). It is the equivalent of a select * from query. One of the things I will cover in this blog is how to use Scan API with the DynamoDB Go SDK.

To scan a table, we need some data to begin with! So in the process, I will also go into how to use the Batch API to write bulk data in DynamoDB. You can use the BatchWriteItem API to create or delete items in batches (of twenty five) and it's possible to you can combine these operations across multiple tables.

We will start simple and gradually improve our approach to use the APIs efficiently. I will also go over some of the basic tests that I ran to demonstrate incremental improvements. Finally I will wrap up by highlighting some of the considerations while using these operations.

You can refer to the code on GitHub

Before you proceed…

… make sure to create a DynamoDB table called users with:

  • partition key email (data type String) and
  • On-Demand capacity mode.
DynamoDB table (image by author)

Also, there are a few things I want to call a few things to set the context:

  • The table was created in us-east-1 and tests were executed from an EC2 instance in us-east-1 as well
  • Since these are general tests instead of specialised benchmarks, I did not do any special tuning (at any level). These are just Go functions that were executed with different inputs, keeping things as simple as possible.
  • The tests include marshalling (converting Go struct to DynamoDB data types) for BatchWriteItem operations and un-marshalling (converting from DynamoDB data types back to Go struct) for Scan operation.

Lets start off by exploring the BatchWriteItem API. This way we will have data to work with the Scan operations as well.

Win-win!

Importing data in batches

Since you can combine 25 items in a single invocation, using a batch approach for bulk data imports is much better compared to invoking the PutItem in a loop (or even in parallel).

Here is a basic example of how you would use BatchWriteItem:

func basicBatchImport() {

startTime := time.Now()

cities := []string{"NJ", "NY", "ohio"}
batch := make(map[string][]types.WriteRequest)
var requests []types.WriteRequest

for i := 1; i <= 25; i++ {
user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
item, _ := attributevalue.MarshalMap(user)
requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
}

batch[table] = requests

op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})
if err != nil {
log.Fatal("batch write error", err)
} else {
log.Println("batch insert done")
}

if len(op.UnprocessedItems) != 0 {
log.Println("there were", len(op.UnprocessedItems), "unprocessed records")
}

log.Println("inserted", (25 - len(op.UnprocessedItems)), "records in", time.Since(startTime).Seconds(), "seconds")
}

With BatchWriteItemInput, we can define the operations we want to perform in the batch — here we are just going to perform PutRequests (which is encapsulated within another type called WriteRequest).

We assemble the WriteRequests in a slice and finally put them in a map with key being the table name - this is exactly what the RequestItems attribute in BatchWriteItemInput needs.

In this case we are dealing with a single table but you could execute operations on multiple tables.

In this example we just dealt with one batch of 25 records (maximum permitted batch size). If we want to import more records, all we need to do is split them into batches of 25 and execute them one (sub)batch at a time. Simple enough — here is an example:

func basicBatchImport2(total int) {

startTime := time.Now()

cities := []string{"NJ", "NY", "ohio"}
batchSize := 25
processed := total

for num := 1; num <= total; num = num + batchSize {

batch := make(map[string][]types.WriteRequest)
var requests []types.WriteRequest

start := num
end := num + 24

for i := start; i <= end; i++ {
user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}
item, _ := attributevalue.MarshalMap(user)
requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
}

batch[table] = requests

op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

if err != nil {
log.Fatal("batch write error", err)
}

if len(op.UnprocessedItems) != 0 {
processed = processed - len(op.UnprocessedItems)
}
}

log.Println("all batches finished. inserted", processed, "records in", time.Since(startTime).Seconds(), "seconds")

if processed != total {
log.Println("there were", (total - processed), "unprocessed records")
}
}

I tried this with 50000 records (which means 2000 batches) and it took approximately 15 seconds. But we can do much better!

Parallel batch import

Instead of processing each batch sequentially, we can spin up a goroutine for each batch:

func parallelBatchImport(numRecords int) {

startTime := time.Now()

cities := []string{"NJ", "NY", "ohio"}
batchSize := 25

var wg sync.WaitGroup

processed := numRecords

for num := 1; num <= numRecords; num = num + batchSize {
start := num
end := num + 24

wg.Add(1)

go func(s, e int) {
defer wg.Done()

batch := make(map[string][]types.WriteRequest)
var requests []types.WriteRequest

for i := s; i <= e; i++ {
user := User{Email: uuid.NewString() + "@foo.com", Age: rand.Intn(49) + 1, City: cities[rand.Intn(len(cities))]}

item, err := attributevalue.MarshalMap(user)
if err != nil {
log.Fatal("marshal map failed", err)
}
requests = append(requests, types.WriteRequest{PutRequest: &types.PutRequest{Item: item}})
}

batch[table] = requests

op, err := client.BatchWriteItem(context.Background(), &dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

if err != nil {
log.Fatal("batch write error", err)
}

if len(op.UnprocessedItems) != 0 {
processed = processed - len(op.UnprocessedItems)
}

}(start, end)
}

log.Println("waiting for all batches to finish....")
wg.Wait()

log.Println("all batches finished. inserted", processed, "records in", time.Since(startTime).Seconds(), "seconds")

if processed != numRecords {
log.Println("there were", (numRecords - processed), "unprocessed records")
}
}

The results improved by a good margin. Here is what I got. On an average:

  • Inserting 50000 records took ~ 2.5 seconds
  • inserted 100000 records in ~ 4.5 to 5 seconds
  • inserted 150000 records in less than 9.5 seconds
  • inserted 200000 records in less than 11.5 seconds
There maybe unprocessed records in a batch. This example detects these records, but the retry logic has been skipped to keep things simple. Ideally you should have a (exponential back-off based) retry mechanism for handling unprocessed records as well.

To insert more data, I ran the parallelBatchImport function (above) in loops. For example:

for i := 1; i <= 100; i++ {
parallelBatchImport(50000)
}

Alright, let’s move ahead. Now that we have some data, let’s try …

… the Scan API

This is what basic usage looks like:

func scan() {
startTime := time.Now()

op, err := client.Scan(context.Background(), &dynamodb.ScanInput{
TableName: aws.String(table),
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
})

if err != nil {
log.Fatal("scan failed", err)
}

for _, i := range op.Items {
var u User
err := attributevalue.UnmarshalMap(i, &u)
if err != nil {
log.Fatal("unmarshal failed", err)
}
}

if op.LastEvaluatedKey != nil {
log.Println("all items have not been scanned")
}
log.Println("scanned", op.ScannedCount, "items in", time.Since(startTime).Seconds(), "seconds")
log.Println("consumed capacity", *op.ConsumedCapacity.CapacityUnits)
}

Just provide the table (or secondary index) name and you are good to go! But, there are chances that you might not be able to get all items because of API limits (1 MB worth of data per invocation). In my case took about 0.5 secs for approximately 15000 records — rest of the items were skipped because the 1 MB limit was breached.

Using Pagination

To handle the limitation around data, the Scan API returns LastEvaluatedKey in its output to point to the last processed record. All you need to do is invoke Scan again, with the value for ExclusiveStartKey attribute set to the one for LastEvaluatedKey.

Using paginated scan approach took me approximately 100 secs to scan ~ 7.5 million records.

Parallel Scan

Pagination helps, but it’s still a sequential process. There is lot of scope for improvement. Thankfully, Scan allows you to adopt a parallelized approach i.e. you can use multiple workers (goroutines in this case) to process data in parallel!

func parallelScan(pageSize, totalWorkers int) {
log.Println("parallel scan with page size", pageSize, "and", totalWorkers, "goroutines")
startTime := time.Now()

var total int

var wg sync.WaitGroup
wg.Add(totalWorkers)

for i := 0; i < totalWorkers; i++ {
// start a goroutine for each segment

go func(segId int) {
var segTotal int

defer wg.Done()

lastEvaluatedKey := make(map[string]types.AttributeValue)

scip := &dynamodb.ScanInput{
TableName: aws.String(table),
Limit: aws.Int32(int32(pageSize)),
Segment: aws.Int32(int32(segId)),
TotalSegments: aws.Int32(int32(totalWorkers)),
}

for {
if len(lastEvaluatedKey) != 0 {
scip.ExclusiveStartKey = lastEvaluatedKey
}
op, err := client.Scan(context.Background(), scip)

if err != nil {
log.Fatal("scan failed", err)
}

segTotal = segTotal + int(op.Count)

for _, i := range op.Items {

var u User
err := attributevalue.UnmarshalMap(i, &u)
if err != nil {
log.Fatal("unmarshal failed", err)
}
}

if len(op.LastEvaluatedKey) == 0 {
log.Println("[ segment", segId, "] finished")
total = total + segTotal
log.Println("total records processsed by segment", segId, "=", segTotal)
return
}

lastEvaluatedKey = op.LastEvaluatedKey
}
}(i)
}

log.Println("waiting...")
wg.Wait()

log.Println("done...")
log.Println("scanned", total, "items in", time.Since(startTime).Seconds(), "seconds")
}

Segment and TotalSegments attributes are the key to how Scan API enables parallelism. TotalSegments is nothing but the number of threads/goroutines/worker-processes that need to be spawned and Segment is a unique identifier for each of them.

In my tests, the Scan performance remained (almost) constant at 37-40 seconds (average) for about ~ 7.5 million records (I tried a variety of page size and goroutine combinations).

How many TotalSegments do I need to configure???

To tune appropriate number of parallel threads/workers, you might need to experiment a bit. A lot might depend on your client environment.

  • Do you have enough compute resources?
  • Some environments/runtimes might have managed thread-pools, so you will have to comply with those

So, you will need to try things out to find the optimum parallelism for your. one way to think about it could be to choose one segment (single worker/thread/goroutine) per unit of data (say a segment for every GB of data you want to scan).

Wrap up — API considerations

Both Batch and Scan APIs are quite powerful, but there are nuances you should be aware of. My advise is to read up the API documentation thoroughly.

With Batch APIs:

  • No more than 25 requests in a batch
  • Individual item in a batch should not exceeds 400KB
  • Total size of items in a single BatchWriteItem cannot be more than 16MB
  • BatchWriteItem cannot update items
  • You cannot specify conditions on individual put and delete requests
  • It does not return deleted items in the response
  • If there are failed operations, you can access them via the UnprocessedItems response parameter

Use Scan wisely

Since a Scan operation goes over the entire table (or secondary index), it's highly likely that it consumes a large chunk of the provisioned throughput, especially if it's a large table. That being said, Scan should be your last resort. Check whether Query API (or BatchGetItem) works for your use-case.

The same applies to parallel Scan.

There are a few ways in which you can further narrow down the results by using a Filter Expression, a Limit parameter (as demonstrated earlier) or a ProjectionExpression to return only a subset of attributes.

That’s all for this blog. I hope you found it useful.

Until next time, Happy coding!


DynamoDB Go SDK: How to Use the Scan and Batch Operations Efficiently was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.

...

📰 DynamoDB Go SDK: How to Use the Scan and Batch Operations Efficiently


📈 83.13 Punkte
🔧 AI Nachrichten

🔧 DynamoDB Go SDK: How To Use the Scan and Batch Operations Efficiently


📈 83.13 Punkte
🔧 Programmierung

🪟 Inhalt/Text einer Batch in neue Batch schreiben - Batch automatisch erstellen


📈 39.95 Punkte
🪟 Windows Server

🔧 Unlocking DynamoDB's Hidden Potential: Elevate Your Serverless Game with Batch Operations Mastery


📈 39.59 Punkte
🔧 Programmierung

🔧 [20 Days of DynamoDB] Day 19 - PartiQL Batch Operations


📈 39.59 Punkte
🔧 Programmierung

🔧 Tìm Hiểu Về RAG: Công Nghệ Đột Phá Đang "Làm Mưa Làm Gió" Trong Thế Giới Chatbot


📈 39.47 Punkte
🔧 Programmierung

🔧 DynamoDB Basic - Part 1: Introduction DynamoDB


📈 34.27 Punkte
🔧 Programmierung

🔧 DynamoDB Transactions: An E-Commerce with Amazon DynamoDB


📈 34.27 Punkte
🔧 Programmierung

🔧 [20 Days of DynamoDB] Day 5 - Avoid overwrites when using DynamoDB UpdateItem API


📈 34.27 Punkte
🔧 Programmierung

🔧 [20 Days of DynamoDB] Day 12 - Using the DynamoDB expression package to build Projection expressions


📈 34.27 Punkte
🔧 Programmierung

🔧 [20 Days of DynamoDB] Day 17 - DynamoDB BatchGetItem operation


📈 34.27 Punkte
🔧 Programmierung

🔧 How to do Bulk and Transactional Batch operations with the Azure Cosmos DB .NET SDK | Azure Friday


📈 33.92 Punkte
🔧 Programmierung

🔧 Efficiently Deleting Multiple Lakhs of Records from DynamoDB Using Lambda: A Comprehensive Guide


📈 33.31 Punkte
🔧 Programmierung

🔧 Difference between "min SDK version", "target SDK version" and "compile SDK" version?


📈 31.15 Punkte
🔧 Programmierung

📰 APIDetector - Efficiently Scan For Exposed Swagger Endpoints Across Web Domains And Subdomains


📈 29.28 Punkte
📰 IT Security Nachrichten

🕵️ Batch Console by Privateloader - Earn and learn with Batch


📈 28.24 Punkte
🕵️ Hacking

🔧 Batch Geocoding and Batch Reverse-Geocoding with Bing Maps


📈 28.24 Punkte
🔧 Programmierung

🍏 CM Batch Photo Processor 4.1.5.3.564 - Batch size and rename photos.


📈 28.24 Punkte
🍏 iOS / Mac OS

🔧 Batch Processing vs. Stream Processing: Why Batch Is Dying and Streaming Takes Over


📈 28.24 Punkte
🔧 Programmierung

🐧 Creating a Batch File and PowerShell Script “Batch File to Run PowerShell Script


📈 28.24 Punkte
🐧 Linux Tipps

🐧 Batch File Echo and Echo Off: How to Control the Command Outputs in Batch Scripts


📈 28.24 Punkte
🐧 Linux Tipps

🐧 Batch File Syntax: Understanding and Mastering the Syntax for Batch Scripting


📈 28.24 Punkte
🐧 Linux Tipps

🔧 DynamoDB and its Data Pane Operations - 1


📈 27.89 Punkte
🔧 Programmierung

🔧 DynamoDB and its Data Pane Operations - 3


📈 27.89 Punkte
🔧 Programmierung

🔧 How to handle type conversions with the DynamoDB Go SDK


📈 26.98 Punkte
🔧 Programmierung

🔧 How To Handle Type Conversions With the DynamoDB Go SDK


📈 26.98 Punkte
🔧 Programmierung

🔧 How to Retrieve All DynamoDB Tables using JavaScript SDK v3 (2023)


📈 26.98 Punkte
🔧 Programmierung

🎥 HPR3740: Batch File Variables; Nested Batch Files


📈 26.63 Punkte
🎥 Podcasts

🍏 CM Batch Filename Changer 1.2.7.3.0 - Batch file renaming application.


📈 26.63 Punkte
🍏 iOS / Mac OS

🐧 Batch File Rename: A Guide on Renaming Files through Batch Scripts


📈 26.63 Punkte
🐧 Linux Tipps

🐧 Batch File Pause for 5 Seconds: How to Add Delays to Your Batch Scripts


📈 26.63 Punkte
🐧 Linux Tipps

🐧 Create a Folder in Batch File: How to Create Directories Using Batch Scripts


📈 26.63 Punkte
🐧 Linux Tipps

🐧 Batch File Copy: A Guide to Copying Files Using Batch Scripts


📈 26.63 Punkte
🐧 Linux Tipps

matomo