Monday, July 28, 2014

AF (analysis framework) and the task graph story

Analysis Framework is an application written on top of TaskCluster and provides a way to execute Telemetry MapReduce jobs.

AF comprises two modules:


This module contains an example of a MapReduce node that can be executed over TaskCluster.
It contains:
  •  a custom specification for a Docker image
  •  a Makefile for creating a Docker image and posting it on registry
  •  a Vagrantfile useful to developers working on MACOSX
  •  custom code for map/reduce jobs
  •  an encryption module used to decrypt STS temporary credentials

 A task up and running

In order to run a task on TaskCluster you need a Docker container, and a custom code to be executed in the container.
 A Docker container is a Docker image in execution. To obtain a custom image you can use the Docker specification in the telemetry-analysis-base repository.
 As TaskCluster needs a container to run your task in, you need to push your container on TaskCluster registry. The Makefile present in the repository takes care of creating a container and pushing it on the registry.

Because Telemetry jobs work with data that is not open to the public you need a set of credentials to access the files in S3. The Docker container expects a set of encrypted temporary credentials as an environment variable called CREDENTIALS.
As these credentials are visible in the task description on TaskCluster they are encrypted and base64 encoded. The temporary credentials used are STS Federation Token credentials. These credentials expire in 36 hours and can be obtain only by AWS users that hold a policy to generate them.
After obtaining this credentials they are encrypted with a symmetric key. The symmetric key is encrypted with a public key and sent together as an environment variable to the Docker instance. Inside the Docker container this credentials will be decrypted properly and used to make calls in S3.

Custom code

Inside the custom container resides the custom code that will receive as arguments a set of file names in S3.


After decrypting the credentials the mapper will take the file list and start downloading a batch of files in parallel. As files finish downloading they are stored in a directory called s3/<path is S3> and their names are sent as arguments to the mapperDriver.
MapperDriver will first read the specification of the job from analysis-tools.yml. In the configuration file it is specified if the files need to be decompressed, the mapper function that needs to run and also the language that the mapper is written in.
Next, as the example provided in the repository is in python, the driver spawns another process that executes python-helper-mapper that reads the files, decompresses them, loads the mapper function and sends the decompressed files line by line to the mapper function.
In the mapper function the output is written to result.txt. This file is an artifact for the task ran.


The reducer task requires an environment variable named INPUT_TASK_IDS specifying all the mapper task ids. Holding the list of all mappers the reducer makes calls to get all the result files from the mappers. As the files finish to download they are stored in a folder called mapperOutput.
The reducerDriver than reads the specification of the job from analysis-tools.yml. The analysis-tools.yml contains the reducer function name and the language that is written in.
In the example provided in the repository the reducer is also written in python so it uses an intermediary module called python-helper-reducer. This module loads the reducer, removes all empty lines from the result files and feeds them to the reducer function.
The output is written to the result file that is an artifact of the reducer task. After writing the result file, the reducer sends an email to the owner of the task. This mail contains a link to the output of the MapReduce job. The email address will be given as an environment variable called OWNER.


This module constructs a taskGraph and posts it to TaskCluster.
At this point a set of credentials is needed to run a task graph:
  • credentials in AWS allowing Federation Token generation. To obtain them you need to specify  a policy enabling STS credentials generation.
  • public key associated to the private one residing in the running Docker container
  • symmetric key used to encrypt the Federation Token credentials
  • access to IndexDB
  • credentials to TaskCluster (in the future)

 Example call:

./AF.js Filter.json "" '{"OWNER" : "", "BLACK" : "black"}'

 AF takes as arguments a Filter.json, a Docker image ( and optionally some other arguments that will be passed as environment variables to the Docker container.
AF executes the following:

  • makes a call for a Federation Token. It encrypts with the public key the credentials and provides base64 encrypted credentials as CREDENTIALS environment variable to the Docker container
  • using Filter.json AF queries indexDB to get the specific file names and file sizes
  • creates skeletons for  mapper tasks and adds load to them (file names from indexDB)
  • pushes taskDefinition to graph Skeleton
  • creates reducer task and gives it as dependencies the labels of dependent tasks
  • posts the graph
  • gets the graph definition, prints the graph definition and a link to a simple monitor page
  • as the graph finises execution, the page will contain the links to the result page

Last but not least

Analysis Framework is a really interesting/fun project. It can be easily extended or reused, it is designed as an example that can be customized and has some documentation too. :p