Skip to content

Transferir dados para o Redshift

O Amazon Redshift é um data warehouse popular que é executado no Amazon Web Services juntamente com o Amazon S3. Os dados do Braze do Currents são estruturados para transferência direta para o Redshift.

O seguinte descreve como transferir dados do Amazon S3 para o Redshift através de um processo de Extração, Transformação e Carga (ETL). Para o código-fonte completo, consulte os exemplos do Currents repositório do GitHub.

Visão geral do carregador S3 para Redshift

O script s3loader.py usa uma tabela de manifesto separada no mesmo banco de dados do Redshift para acompanhar os arquivos que já foram copiados. A estrutura geral é a seguinte:

  1. Liste todos os arquivos no S3, depois identifique os novos arquivos desde a última vez que você executou s3loader.py comparando a lista com o conteúdo na tabela de manifesto.
  2. Crie um arquivo manifesto contendo os novos arquivos.
  3. Execute uma consulta COPY para copiar os novos arquivos do S3 para o Redshift usando o arquivo de manifesto.
  4. Insira os nomes dos arquivos copiados na tabela de manifesto separada no Redshift.
  5. Confirme (commit).

Dependências

Você precisa instalar o AWS Python SDK e o Psycopg para executar o carregador:

1
2
pip install boto3
pip install psycopg2

Permissões

Função do Redshift com acesso de leitura ao S3

Se você ainda não fez isso, siga a documentação da AWS para criar uma função que possa executar comandos COPY nos seus arquivos no S3.

Regras de entrada da VPC do Redshift

Se o seu cluster do Redshift estiver em uma VPC, você precisa configurar a VPC para permitir conexões do servidor em que está executando o carregador S3. Acesse o seu cluster do Redshift e selecione a entrada de grupos de segurança da VPC à qual deseja que o carregador se conecte. Em seguida, adicione uma nova regra de entrada: Type = Redshift, Protocol = TCP, Port = a porta do seu cluster, Source = o IP do servidor que executa o carregador (ou “Anywhere” para testes).

Usuário do IAM (Identity and Access Management) com permissões mínimas ao S3

O carregador S3 precisa de permissões de leitura (e, em geral, de listagem) nos objetos do bucket onde estão os arquivos do Currents, além de permissões de gravação (e leitura) no prefixo ou bucket em que ele grava os arquivos de manifesto usados nos comandos COPY do Redshift. Evite anexar a política administrada AmazonS3FullAccess: ela concede acesso amplo a todos os buckets e aumenta o risco se as credenciais forem expostas. Prefira uma política IAM personalizada que restrinja s3:GetObject e s3:ListBucket (com condição de prefixo em s3:prefix, quando aplicável) ao caminho dos dados do Currents e s3:PutObject / s3:GetObject (e s3:DeleteObject, se necessário) somente ao local do manifesto. Quando possível, use uma função IAM com credenciais temporárias em vez de chaves de longo prazo.

Crie um usuário do IAM no console do IAM, anexe apenas essa política restrita e salve as credenciais, pois você precisará passá-las ao carregador.

Você pode passar as credenciais de acesso ao carregador por meio de variáveis de ambiente, do arquivo de credenciais compartilhado (~/.aws/credentials) ou do arquivo de configuração da AWS. Como alternativa, você pode incluí-las diretamente no carregador atribuindo-as aos campos aws_access_key_id e aws_secret_access_key dentro de um objeto S3LoadJob, mas não recomendamos codificar credenciais diretamente no seu código-fonte.

Uso

Exemplo de uso

O programa de exemplo a seguir carrega dados do evento users.messages.contentcard.Impression do S3 para a tabela content_card_impression no Redshift.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
if __name__ == '__main__':
    host = '{YOUR_CLUSTER}.redshift.amazonaws.com'
    port = 5439
    database = '{YOUR_DATABASE}'
    user = '{YOUR_USER}'
    password = '{YOUR_PASSWORD}'
    role = '{YOUR_REDSHIFT_ROLE_ARN}'

    # Do not hard code these credentials.
    aws_access_key_id = None
    aws_secret_access_key = None

    # Content Card Impression Avro fields:
    #   id            - string
    #   user_id       - string
    #   external_user_id - string (nullable)
    #   app_id        - string
    #   content_card_id  - string
    #   campaign_id   - string (nullable)
    #   send_id       - string (nullable)
    #   time          - int
    #   platform      - string (nullable)
    #   device_model  - string (nullable)

    print('Loading Content Card Impression...')
    cc_impression_s3_bucket = '{YOUR_CURRENTS_BUCKET}'
    cc_impression_s3_prefix = '{YOUR_CURRENTS_PREFIX}'
    cc_impression_redshift_table = 'content_card_impression'
    cc_impression_redshift_column_def = [
        ('id', 'text'),
        ('user_id', 'text'),
        ('external_user_id', 'text'),
        ('app_id', 'text'),
        ('content_card_id', 'text'),
        ('campaign_id', 'text'),
        ('send_id', 'text'),
        ('time', 'integer'),
        ('platform', 'text'),
        ('device_model', 'text')
    ]

    cc_impression_redshift = RedshiftEndpoint(host, port, database, user, password,
        cc_impression_redshift_table, cc_impression_redshift_column_def)
    cc_impression_s3 = S3Endpoint(cc_impression_s3_bucket, cc_impression_s3_prefix)

    cc_impression_job = S3LoadJob(cc_impression_redshift, cc_impression_s3, role,
        aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
    cc_impression_job.perform()

Credenciais

Para executar o carregador, você precisa primeiro fornecer o host, a port e o database do seu cluster do Redshift, bem como o user e a password de um usuário do Redshift que possa executar consultas COPY. Além disso, você precisa fornecer o ARN da função do Redshift com acesso de leitura ao S3 que você criou em uma seção anterior.

1
2
3
4
5
6
host = '{YOUR_CLUSTER}.redshift.amazonaws.com'
port = 5439
database = '{YOUR_DATABASE}'
user = '{YOUR_USER}'
password = '{YOUR_PASSWORD}'
role = '{YOUR_REDSHIFT_ROLE_ARN}'

Configuração do job

Você precisa fornecer o bucket S3 e o prefixo dos seus arquivos de eventos, bem como o nome da tabela do Redshift para a qual deseja executar o COPY.

Além disso, para executar o COPY de arquivos Avro com a opção “auto” conforme exigido pelo carregador, a definição de colunas na sua tabela do Redshift deve corresponder aos nomes dos campos no esquema Avro, conforme mostrado no programa de exemplo, com o mapeamento de tipos apropriado (por exemplo, string para text, int para integer).

Você também pode passar uma opção batch_size ao carregador se achar que demora muito para copiar todos os arquivos de uma vez. Passar um batch_size permite que o carregador copie e confirme incrementalmente um lote por vez, sem precisar copiar tudo ao mesmo tempo. O tempo necessário para carregar um lote depende do batch_size, do tamanho dos seus arquivos e do tamanho do seu cluster do Redshift.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Content Card Impression Avro fields:
#   id            - string
#   user_id       - string
#   external_user_id - string (nullable)
#   app_id        - string
#   content_card_id  - string
#   campaign_id   - string (nullable)
#   send_id       - string (nullable)
#   time          - int
#   platform      - string (nullable)
#   device_model  - string (nullable)
cc_impression_s3_bucket = '{YOUR_CURRENTS_BUCKET}'
cc_impression_s3_prefix = '{YOUR_CURRENTS_PREFIX}'
cc_impression_redshift_table = 'content_card_impression'
cc_impression_redshift_column_def = [
    ('id', 'text'),
    ('user_id', 'text'),
    ('external_user_id', 'text'),
    ('app_id', 'text'),
    ('content_card_id', 'text'),
    ('campaign_id', 'text'),
    ('send_id', 'text'),
    ('time', 'integer'),
    ('platform', 'text'),
    ('device_model', 'text')
]
cc_impression_batch_size = 1000
New Stuff!