Apache Spark com Amazon S3

Otimizando Apache Spark com S3 (e outras dicas)

Há alguns meses tivemos que reestruturar nossa arquitetura de big data aqui na Social Miner para fins de escalabilidade. Passado alguns meses após a implementação da arquitetura, resolvi escrever esse post listando aqui 7 pontos de otimização que acho vitais para quem esteja estruturando uma arquitetura de big data com Apache Spark e S3, partindo de coisas mais simples e triviais, chegando até tópicos não tão difundidos. Então bora lá? 🙂

1. Execute os jobs na mesma região de seus buckets

Não importa se você está rodando seus jobs Spark em um cluster no Amazon EMR (Elastic MapReduce) ou em uma instalação standalone no EC2, a execução desses jobs precisa ocorrer na mesma região dos buckets nos quais os dados serão lidos/escritos.

Essa inclusive, seria uma sugestão geral para qualquer operação de I/O em cloud: aproxime a fonte do dado, de seu consumidor, evitando assim qualquer latência de rede que possa ocorrer entre essas duas pontas.

2. Use o client S3A como seu scheme de URL

Se você possui uma arquitetura de big data “legada” ou se está lendo sobre o assunto de fontes um pouco antigas, as chances são de que você possa estar utilizando um client de conexão com o S3 obsoleto, tais como S3 e S3N.

Esses clients são definidos através do scheme da URL, logo, basta alterar seus apontamentos do S3 de “s3n://nome-do-bucket/” para “s3a://nome-do-bucket/” para passar a utilizar o novo client.

Dentre as vantagens estão: ganho de performance, suporte a arquivos maiores (até 5TB) e manutenção de bugs. Além disso, a partir da versão 3.0+ do Hadoop, apenas funcionará o scheme S3A.

Há apenas uma exceção à essa recomendação. Se você utiliza EMR, apenas é possível a utilização do scheme “s3://”, já que o mesmo se refere à um client próprio da AWS.

3. Utilize o formato Apache Parquet

Apache Parquet é um formato de arquivo colunar, auto-descritivo, o qual incorpora o schema (estrutura) dentro do próprio dado e os valores em cada coluna são fisicamente armazenados em locais de memória adjacentes. Mas na prática, o que isso significa? Significa que:

  • Queries que buscam valores de colunas específicas não precisam ler a linha por completo, mas somente as colunas requerentes, reduzindo assim I/O de disco e rede;
  • Dados repetidos entre linhas de uma mesma coluna não são persistidos, gerando uma deduplicação dos dados e resultando em menores arquivos;
  • Diferentes técnicas de encoding podem ser aplicada por entre as colunas;
  • Podem ser aplicadas técnicas de compressão específicas de um tipo, já que os valores das colunas tendem a ser do mesmo tipo;

Algumas dessas características são inerentes aos formatos colunares em si. Por conta disso, este tópico poderia ser “Utilize um formato colunar (ORC, Avro, Kudu, CarbonData, etc)”. Isso já seria o bastante para se observar um ganho expressivo em termos de performance, se comparado com formatos lineares baseados em texto, tais como CSV e JSON. Contudo, quase tão importante quanto o desempenho é a adoção de mercado e hoje o Parquet é o formato standard no ecossistema big data.

Mais informações sobre o formato e como ele se relaciona com o Apache Spark podem ser conferidas no (ótimo) talk com Emily Curtin e Robbie Strickland para o Spark Summit de 2017.

3.1. Ordene os dados a serem salvos quando utilizando Parquet

Considerando a característica de deduplicação de dados do Parquet, é recomendado – sempre que possível – que seja feita a ordenação dos mesmos antes de sua persistência, reduzindo assim o tamanho final do arquivo.

4. Consolide muitos arquivos pequenos em poucos arquivos maiores

Há um anti-padrão do mundo do Big Data – e o Spark não é exceção – conhecido como “small files problem”, ou em tradução direta: “problema dos arquivos pequenos”.

Este problema consiste no processamento de um grande conjunto de dados difundido por entre diversos pequenos arquivos, onde a maior parte do tempo de processamento acaba sendo gasta com I/O para listagem e carregamento desses arquivos.

Mas qual seria o tamanho ideal de um arquivo? Não há uma resposta exata, mas considerando a arquitetura de processamento de blocos do Spark, uma regra geral seria buscar um tamanho de partição de arquivo de, em média, 128MB.

Uma forma de atingir este tamanho de arquivo é agrupando arquivos menores, seja através de ETLs customizadas ou de ferramentas de merge, como o groupBy do S3DistCp. Esse último, no entanto, sendo incompatível com o Parquet devido a estrutura de metadados internas do formato. Caso você esteja usando um serviço de ingestão externa de dados, como o Amazon Data Migration Service (DMS) ou Amazon Kinesis Firehose, uma solução seria ampliar o threshold de replicação de dados ou aumentar o tempo de trigger dos jobs, em casos como AWS Glue ETL Jobs.

5. Estruture seu data lake com uma árvore de “diretórios” rasa

Diferente de file systems (ex.: HDFS), operações de listagem de arquivos e diretórios* em object stores como S3 são altamente caras em termos de performance. Quanto maior e mais profunda a sua estrutura de diretórios, mais destas operações terão de ser executadas, reduzindo assim sua capacidade de processamento.

Exemplo: caso seu data lake esteja estruturado por data, favoreça o formato “/2018-03-03” a “/2018/03/03”

*Diretórios no S3 são apenas uma convenção, dado que o S3 é um object store em que, cada objeto é uma combinação de um bucket (ex: “meu-bucket”) com uma chave específica (ex: “/caminho/dos/meus/objetos/objeto-x.parquet”) cujo possui o conteúdo do objeto como seu valor.

Pense na store como um repositório de chave-valor onde “s3://meu-bucket/purchases/2018-03-03/2018-03-03-23-42.parquet” é a chave e o conteúdo do arquivo é o valor. As barras depois do nome do bucket (meu-bucket) não significam nada para o S3, elas existem apenas para a conveniência do usuário.

6. Tire proveito do schema de particionamento através dos nomes de diretório

Como dito no item anterior, listagens em object stores são caras, e um dos maiores responsáveis por execuções de listagens no Spark são filtragens de arquivos através de globbing.

Globbing é o processo de utilização de wildcards para matching da nomenclatura de arquivos. Ex.: se você possui a seguinte estrutura de diretórios no seu data lake /{date}/{log_type}/{file_name}.parquet, você poderia buscar todos os logs de pagamentos e compras entre os dias 01 e 03 de março de 2018, utilizando o seguinte glob:

 

Contudo, este seria um processo custoso para o Spark, já que envolveria uma completa navegação recursiva e pattern match de todos os caminhos listados.

Uma melhor forma de estruturar esses diretórios seria explicitando o nome da partição (diretório) através de um “alias”.O resultado seria:

/date={date}/log_type={log_type}/{file_name}.parquet

Com isso, podemos realizar o mesmo filtro de antes, porém sem a utilização de caracteres de glob:

 

Desta forma, o Spark apenas precisaria percorrer a sua estrutura de diretórios uma única vez por execução, mantendo os caminhos das partições em cache (FileStatusCache) e retornará apenas aqueles itens que corresponderem a query (partition pruning).

Além do ganho de performance no carregamento dos arquivos, a utilização do schema de partição também traz outros benefícios:

  1. Diretórios/arquivos inexistentes não mais geram erro na tentativa de carregamento, passando apenas a retornar um dataframe vazio;
  2. As partições (nomes dos diretórios) passam a fazer parte do seu dataframe como colunas adicionais, estando disponíveis para visualização/manipulação como qualquer outro dado de seu arquivo;
  3. Código baseado em queries mais claros e expressivos do que strings de padrões Glob.

6.1. Use partitionBy para escrita de partições

A fim de evitar múltiplas operações de agrupamento e escrita individuais, pode-se usar o método partitionBy do DataFrameWriter. Ao efetuar a operação de escrita por essa função, o Spark irá previamente agrupar (particionar) e persistir os dados do dataframe em suas respectivas partições. Ex.:

 

Resultará em:

/date={date}/log_type={log_type}/{file_name}.parquet

PS: as colunas que serão usadas como partições precisam estar presente com a mesma nomenclatura no dataframe.

PS2: após particionada, as colunas usadas como partições são removidas do arquivo final, já que como mencionado anteriormente, após a leitura desses arquivos, as partições são incluídas como colunas adicionais no dataframe carregado.

7. Use Datasource Table com um catálogo externo (Hive Metastore)

Se você está usando o schema de particionamento e realizando partition pruning como apresentado no item anterior, as chances são de que seu datalake possua muitas partições.

Toda vez que é realizada a leitura de um conjunto de arquivos através do spark.read.parquet(file_path), é disparado um processo de descoberta de partições no Spark (partition discovery), através da classe InMemoryFileIndex. Esta classe tem por objetivo listar e filtrar os arquivos do datalake através dos filtros fornecidos.

Como a nomenclatura sugere, o processo de filtragem via InMemoryFileIndex ocorre em memória, ou seja, o Spark precisa listar e percorrer toda a estrutura de diretórios, trazer as partições para os workers (executors) e então aplicar o filtro fornecido, para então obter os paths finais que serão carregados. Todo esse processo é custoso, principalmente para arquiteturas baseadas em múltiplas partições. Contudo, ele pode ser evitado utilizando-se de um Hive Metastore.

O que é Hive Metastore

Hive Metastore é o repositório central dos meta dados do Apache Hive. É responsável por armazenar informações das tabelas Hive (como seus schemas e localizações) e partições, em uma base de dados relacional.

Dentre os benefícios proporcionados por um Metastore estão:

  • Gerenciamento de partições: ao realizar o carregamento dos arquivos do datalake utilizando partition pruning, o Spark precisa precisa percorrer toda a estrutura de diretórios para catalogalização das partições existentes, para então filtrá-las. Com um Metastore essa etapa é eliminada, pois as partições serão gerenciadas pelo Metastore, mapeando diretamente para seus caminhos finais.
  • Gerenciamento de schema: ao realizar a leitura de um conjunto de dados no datalake, o Spark tem de inferir o schema do conjunto para cada operação de carregamento. Assim como o partition discovery, essa pode ser uma operação cara a depender da arquitetura e/ou volume de dados. Com um Metastore, o schema é persistido previamente e fornecido no momento da leitura dos dados.

Utilizando um Hive Metastore com Spark

A utilização de um Metastore para carregamento de dados no Spark se dá através de Datasource Tables (Spark 2.1+), sendo muito similar ao carregamento direto de arquivos.

Utilizando o exemplo do item anterior, mudaríamos de:

 

Para:

 

Ponto! Isso já é o suficiente para eliminar a inferência de schema e a descoberta de partições no carregamento do dataframe. Nos benchmarks do Silvio Fiorito (Solutions Architect na Databricks), houve uma redução de ~43 segundos para ~0.29 segundo por entre ambas abordagens.

Mas nem tudo são flores. Há alguns pontos de atenção que se devem ser considerados antes da implementação dessa solução:

  1. Um Metastore é necessário: por mais que o Hive Metastore opere sobre uma base de dados SQL JDBC, o mesmo precisa ser propriamente configurado e disponibilizado para uso. Uma alternativa é utilizar o AWS Glue Data Catalog como Metastore. Apenas um detalhe: o Data Catalog somente é acessível através do EMR, e essa opção precisa ser habilitada no momento do deployment do cluster.
  2. As partições precisam ser atualizadas no Metastore: para cada nova partição criada no datalake, é necessário também atualizar o repositório com essa mesma partição. Seja através de dataframe/SQL, ou seja através de uma Crawler ou API do boto3 quando usando Glue Data Catalog.

Conclusão

Como dito no começo do post, não existe solução milagrosa e cada cenário possui suas próprias particularidades, esses foram alguns pontos que funcionaram para o nosso modelo e espero que funcione também para você. 😉

Eai, alguma dica dessa foi útil pra você? Tem alguma que ficou de fora dessa lista e gostaria de compartilhar? Então comenta ai!

Abraços e até a próxima!

Published by

Felipe Lopes

Amante de café, fotografia e cultura indie. E programador nas horas vagas.

Leave a Reply

Your email address will not be published. Required fields are marked *