Data Pipeline use cases

Data Pipeline

 

There are plenty of use cases when AWS data pipelines could save a fortune and speed up business decisions. Firstly, it is serverless that means you do not have anything on your server if you do not want to. Everything is starting entirely on AWS side and you pay only per execution.

  1. Analyse daily users’ behaviour through extracting data from logs
  2. Analyse transactions for payment system
  3. Analyse stock exchange reports. And many more

So, datapipeline allows you to spin up entire infrastructure needed for Hadoop cluster. Run all logic you desire to process your data with and shut down. Main steps are:

  • Bootstrapping cluster. At this point settings which are organizing how many core and slave instances are gonna be launched. What kind of memory settings for instances and how much memory JVMs will use while running Hadoop tasks.
 //Creating object to define EMR cluster properties
static PipelineObject getEMRCluster() {

    Field type = new Field()
            .withKey("type")
            .withStringValue(PipelineConstants.EMR_CLUSTER)
    Field amiVersion = new Field()
            .withKey("amiVersion")
            .withStringValue("#{${PipelineConstants.MY_AMI_VERSION}}")
    Field masterInstanceType = new Field()
            .withKey("masterInstanceType")
            .withStringValue("#{${PipelineConstants.MY_MASTER_INSTANCE_TYPE}}")
    Field coreInstanceType = new Field()
            .withKey("coreInstanceType")
            .withStringValue("#{${PipelineConstants.MY_CORE_INSTANCE_TYPE}}")
    Field coreInstanceCount = new Field()
            .withKey("coreInstanceCount")
            .withStringValue("#{${PipelineConstants.MY_CORE_INSTANCE_COUNT}}")
    Field region = new Field()
            .withKey("region")
            .withStringValue("#{${PipelineConstants.MY_DDBREGION}}")
    Field terminateAfter = new Field()
            .withKey("terminateAfter")
            .withStringValue("50 Minutes")
    Field bootstrapAction = new Field()
            .withKey("bootstrapAction")
            .withStringValue("s3://elasticmapreduce" +
            "/bootstrap-actions/configure-hadoop, " +
            "--yarn-key-value,yarn.nodemanager.resource.memory-mb=11520," +
            "--yarn-key-value,yarn.scheduler.maximum-allocation-mb=11520," +
            "--yarn-key-value,yarn.scheduler.minimum-allocation-mb=1440," +
            "--yarn-key-value,yarn.app.mapreduce.am.resource.mb=2880," +
            "--mapred-key-value,mapreduce.map.memory.mb=5760," +
            "--mapred-key-value,mapreduce.map.java.opts=-Xmx4608M," +
            "--mapred-key-value,mapreduce.reduce.memory.mb=2880," +
            "--mapred-key-value,mapreduce.reduce.java.opts=-Xmx2304m," +
            "--mapred-key-value,mapreduce.map.speculative=false")

    List fieldsList = Lists.newArrayList(type,
            amiVersion,
            masterInstanceType,
            coreInstanceCount,
            coreInstanceType,
            region,
            terminateAfter,
            bootstrapAction)

    return new PipelineObject()
            .withName(PipelineConstants.EMR_CLUSTER_NAME)
            .withId(PipelineConstants.EMR_CLUSTER_NAME)
            .withFields(fieldsList)
}
  • Running steps defined in settings. Steps in data pipeline terminology means Hadoop Jar applications.
//As example of operation of restoring s3 resources in dynamo db
fieldsList << new Field()
        .withKey("inputImport")
        .withRefValue(PipelineConstants.INPUT_NODE_NAME)
fieldsList << new Field()
        .withKey("outputImport")
        .withRefValue(PipelineConstants.OUTPUT_NODE_NAME)
stepValue = "s3://dynamodb-emr-#{${PipelineConstants.MY_DDBREGION}}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,"
+"org.apache.hadoop.dynamodb.tools.DynamoDbImport,"
+"#{inputImport.${PipelineConstants.INPUT_NODE_DIRECTORY_NAME}},"
+"#{outputImport.tableName},"
+"#{outputImport.${PipelineConstants.WRITE_THROUGHPUT_PERCENT}}"

fieldsList << new Field()
        .withKey("step_${step.name}")
        .withStringValue(stepValue)

objects << PipelineImportObjectCreator.getS3SourceLocation()
objects << PipelineImportObjectCreator.getDDBDestinationTable()
//Example of exporting dynamo db to s3 step
 stepValue = "#{${PipelineConstants.MY_EMRSTEP + step.id}}"
if (step.operation == PipelineStepOperation.ARCHIVE_OPERATION) {

    fieldsList &amp;amp;amp;amp;amp;lt;&amp;amp;amp;amp;amp;lt; new Field()
            .withKey("input")
            .withRefValue(PipelineConstants.DDBSOURCE_TABLE)
    fieldsList &amp;amp;amp;amp;amp;lt;&amp;amp;amp;amp;amp;lt; new Field()
            .withKey("output")
            .withRefValue(PipelineConstants.S3_BACKUP_LOCATION)
    stepValue = "s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-" +
            "handler/2.1.0/emr-ddb-2.1.0.jar," +
            "org.apache.hadoop.dynamodb.tools.DynamoDbExport," +
            "#{output.${PipelineConstants.INPUT_NODE_DIRECTORY_NAME}}," +
            "#{input.tableName}," +
            "#{input.readThroughputPercent}"

    objects << PipelineExportObjectCreator.getDDBSourceTable()
    objects << PipelineExportObjectCreator.getS3BackupLocation()
  • Complete event at which cluster is getting terminated

Keep in mind that each of these listed steps could be joined with another SNS service. This is very powerful because it could lead to joining your app with data pipeline progress and results. So every step whether it succeeded or failed will generate an event which will send HTTP request to your app server informing about that step status. The app in order could take according action like informing users that results are ready or start another Data Pipeline based on results of previous. Important to remember that data pipeline could perform once or on some scheduled period. This means your Death Star cluster is starting every midnight does important work and shuts itself down.

Steps could be various and with any complexity you wish. In simple words, every step is a Hadoop Java application. This also gives an advantage. Each step could be run separately in testing environment making sure it’s properly tested and ready to perform on live data.

Now embedding into your app could be fairly simple if you know what to do. The steps are:

  1. Create empty pipeline.
    DataPipelineClient client = new DataPipelineClient(credentials) 
    CreatePipelineRequest request = new CreatePipelineRequest()
    request.setName(pipelineName)
    request.setUniqueId(pipelineUniqueId)
    CreatePipelineResult result = client.createPipeline(request)
    String pipelineId = result.pipelineId 
  2. Put pipeline definition. Here where you’re putting all your details and configure what should actually happen. Important thing here is when you assign your pipeline to SNS to track what happens, your app has to be able to confirm SNS subscription.
     PutPipelineDefinitionRequest putPipelineDefinition = new PutPipelineDefinitionRequest()
            .withPipelineId(pipelineId)
            .withParameterValues(parameterValues)
            .withPipelineObjects(pipelineObjectList)
    PutPipelineDefinitionResult putPipelineResult = dataPipelineClient.putPipelineDefinition(putPipelineDefinition) 
  3. Activate pipeline.
    CampaignPipelineActivity activity = CampaignPipelineActivity.get(activityId)
    PipelineCreator creator = new PipelineCreator()
    ActivatePipelineResult result = creator.activatePipeline(client, activity.pipelineId)
    
    
  4. Enjoy

For further details and consultations please contact.

Leave a Reply

Your email address will not be published. Required fields are marked *