Importieren Sie eine CSV- oder JSON-Datei in DynamoDB

7

Ich habe 1000 CSV-Dateien. Jede CSV-Datei ist zwischen 1 und 500 MB groß und wird auf dieselbe Weise formatiert (dh in derselben Spaltenreihenfolge). Ich habe eine Header-Datei für Spaltenüberschriften, die mit den Spaltennamen meiner DynamoDB-Tabelle übereinstimmen. Ich muss diese Dateien in eine DynamoDB-Tabelle importieren. Was ist der beste Weg / das beste Werkzeug dafür?

Ich kann diese CSV-Dateien zu einer einzigen riesigen Datei verketten (ich würde es jedoch lieber vermeiden) oder sie bei Bedarf in JSON konvertieren. Ich bin mir der Existenz von BatchWriteItem bewusst, daher würde eine gute Lösung das Schreiben von Stapeln beinhalten.


Beispiel:

  • Die DynamoDB-Tabelle enthält zwei Spalten: Vorname, Nachname
  • Die Header-Datei enthält nur: first_name,last_name
  • Eine CSV-Datei sieht aus wie

::

John,Doe
Bob,Smith
Alice,Lee
Foo,Bar
Franck Dernoncourt
quelle
Wie steht es mit der Leistung, wie lange dauert das Laden von 1 Million Zeilendaten?
FU USF

Antworten:

12

Am Ende habe ich eine Python-Funktion codiert import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types), die eine CSV in eine DynamoDB-Tabelle importiert. Spaltennamen und Spalten müssen angegeben werden. Es verwendet Boto und lässt sich von diesem Kern sehr inspirieren . Unten finden Sie die Funktion sowie eine Demo ( main()) und die verwendete CSV-Datei. Getestet unter Windows 7 x64 mit Python 2.7.5, sollte aber auf jedem Betriebssystem mit Boto und Python funktionieren.

import boto

MY_ACCESS_KEY_ID = 'copy your access key ID here'
MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'


def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
    '''
    From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
    '''
    batch_list = dynamodb_conn.new_batch_write_list()
    batch_list.add_batch(dynamodb_table, puts=items)
    while True:
        response = dynamodb_conn.batch_write_item(batch_list)
        unprocessed = response.get('UnprocessedItems', None)
        if not unprocessed:
            break
        batch_list = dynamodb_conn.new_batch_write_list()
        unprocessed_list = unprocessed[table_name]
        items = []
        for u in unprocessed_list:
            item_attr = u['PutRequest']['Item']
            item = dynamodb_table.new_item(
                    attrs=item_attr
            )
            items.append(item)
        batch_list.add_batch(dynamodb_table, puts=items)


def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types):
    '''
    Import a CSV file to a DynamoDB table
    '''        
    dynamodb_conn = boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
    dynamodb_table = dynamodb_conn.get_table(table_name)     
    BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB

    items = []

    count = 0
    csv_file = open(csv_file_name, 'r')
    for cur_line in csv_file:
        count += 1
        cur_line = cur_line.strip().split(',')

        row = {}
        for colunm_number, colunm_name in enumerate(colunm_names):
            row[colunm_name] = column_types[colunm_number](cur_line[colunm_number])

        item = dynamodb_table.new_item(
                    attrs=row
            )           
        items.append(item)

        if count % BATCH_COUNT == 0:
            print 'batch write start ... ', 
            do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
            items = []
            print 'batch done! (row number: ' + str(count) + ')'

    # flush remaining items, if any
    if len(items) > 0: 
        do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)


    csv_file.close() 


def main():
    '''
    Demonstration of the use of import_csv_to_dynamodb()
    We assume the existence of a table named `test_persons`, with
    - Last_name as primary hash key (type: string)
    - First_name as primary range key (type: string)
    '''
    colunm_names = 'Last_name First_name'.split()
    table_name = 'test_persons'
    csv_file_name = 'test.csv'
    column_types = [str, str]
    import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)


if __name__ == "__main__":
    main()
    #cProfile.run('main()') # if you want to do some profiling

test.csvInhalt (muss sich im selben Ordner wie das Python-Skript befinden):

John,Doe
Bob,Smith
Alice,Lee
Foo,Bar
a,b
c,d
e,f
g,h
i,j
j,l
Franck Dernoncourt
quelle
0

Die vorherige Antwort wurde geringfügig geändert, um das CSV-Modul zu verwenden, sodass Ihre CSV-Datei Zeichenfolgen mit Anführungszeichen unterstützen kann.

import boto
from csv import reader

MY_ACCESS_KEY_ID = 'copy your access key ID here'
MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'


def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
    '''
    From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
    '''
    batch_list = dynamodb_conn.new_batch_write_list()
    batch_list.add_batch(dynamodb_table, puts=items)
    while True:
        response = dynamodb_conn.batch_write_item(batch_list)
        unprocessed = response.get('UnprocessedItems', None)
        if not unprocessed:
            break
        batch_list = dynamodb_conn.new_batch_write_list()
        unprocessed_list = unprocessed[table_name]
        items = []
        for u in unprocessed_list:
            item_attr = u['PutRequest']['Item']
            item = dynamodb_table.new_item(
                    attrs=item_attr
            )
            items.append(item)
        batch_list.add_batch(dynamodb_table, puts=items)


def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names,     column_types):
    '''
    Import a CSV file to a DynamoDB table
    '''        
    dynamodb_conn =     boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
    dynamodb_table = dynamodb_conn.get_table(table_name)     
    BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB

    items = []

    count = 0
    csv_file = open(csv_file_name, 'r')
    for cur_line in reader(csv_file):
        count += 1

        row = {}
        for colunm_number, colunm_name in enumerate(colunm_names):
            row[colunm_name] = column_types[colunm_number]    (cur_line[colunm_number])

        item = dynamodb_table.new_item(
                    attrs=row
            )           
        items.append(item)

        if count % BATCH_COUNT == 0:
            print 'batch write start ... ', 
            do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
            items = []
            print 'batch done! (row number: ' + str(count) + ')'

    # flush remaining items, if any
    if len(items) > 0: 
        do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)


    csv_file.close() 


def main():
    '''
    Demonstration of the use of import_csv_to_dynamodb()
    We assume the existence of a table named `test_persons`, with
    - Last_name as primary hash key (type: string)
    - First_name as primary range key (type: string)
    '''
    colunm_names = 'facebookID age ethnicity gender hometown name party sfw url'.split()
    table_name = 'OneAmericaDB'
    csv_file_name = 'test_data.csv'
    column_types = [str, str, str, str, str, str, str, str, str]
    import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)


if __name__ == "__main__":
    main()
    #cProfile.run('main()') # if you want to do some profiling
John Tubert
quelle
0

Ich empfehle Ihnen, den AWS Database Migration Service (DMS) zu verwenden.

Wie in diesem Artikel beschrieben: https://aws.amazon.com/es/blogs/database/migrate-delimited-files-from-amazon-s3-to-an-amazon-dynamodb-nosql-table-using-aws- Datenbank-Migrations-Service-und-aws-Cloudformation / Sie können S3 als Ursprung und DynamoDB als Ziel verwenden, um CSV-Dateien mit vielen Tupeln zu importieren.

Ich habe erfolgreich einen vollständigen Importprozess von S3 in DynamoDB implementiert und bin der einfachste und schnellste Weg, dies zu tun.

Im Wesentlichen müssen Sie:

  • Haben Sie einen Bucket, in dem Sie Ihre CSV-Dateien ablegen können, mit mindestens zwei Ordnerebenen (die erste bezieht sich auf "Schema" und die zweite auf "Tabellenname").
  • Haben Sie eine DynamoDB-Tabelle mit mindestens dem gleichen Hash-Schlüssel wie in CSV-Dateien.
  • Erstellen Sie ein Ursprungselement in DMS, das auf S3 zeigt, und ordnen Sie die CSV-Struktur zu.
  • Erstellen Sie ein Zielelement in DMS, das auf die DynamoDB-Tabelle zeigt, und ordnen Sie es dem zugeordneten Ursprung zu.
  • Erstellen Sie eine Replikationsinstanz (achten Sie auf die kostenlose Schicht) in DMS.
  • Erstellen Sie eine Replikationsaufgabe in DMS, die vom Ursprung und vom Ziel erstellte Elemente verwendet.
  • Aufgabe ausführen.

Durch Ändern des DynamoDB-Tabellenturchsatzes auf 25 Lesekapazitätseinheiten und 150 Schreibkapazitätseinheiten konnte ich in weniger als 7 Minuten mehr als 124.000 Tupel einfügen, einschließlich Ihrer Vorbereitungsaufgaben.

Die Hauptempfehlung von AWS für diese Aufgabe ist die Verwendung des Datenpipeline-Dienstes. Ich habe ihn jedoch verwendet und er ist teurer. Die zugrunde liegende Initialisierung des EMR-Culsters ist ein sehr langsamer Prozess. Wenn Sie diese Importaufgabe nicht wiederholen möchten, verwenden Sie DMS stattdessen.

Juan Manuel Ruiz Fernández
quelle