Skip to content

Instantly share code, notes, and snippets.

Last active May 23, 2021 05:32
Show Gist options
  • Save oza/470e961ff10b60778772 to your computer and use it in GitHub Desktop.
Save oza/470e961ff10b60778772 to your computer and use it in GitHub Desktop.
Apache Tez Source code reading

Tez source code reading


  • This document is distributed under Apache 2 Licenses.
  • All source code described in this document is from Apache Tez project.


What's is Tez?

Quoting from

  • Empowering end users by:
    • Expressive dataflow definition APIs
    • Flexible Input-Processor-Output runtime model
    • Data type agnostic
    • Simplifying deployment
  • Execution Performance
    • Performance gains over MapReduce
    • Optimal resource management
    • Plan reconfiguration at runtime
    • Dynamic physical data flow decisions

Tez and Spark

common points

  • DAG-style processing framework
  • Apache Top Level Projects


Tez code base size

$ find . -name "*.java" | xargs wc -l
  • tez-api/tez-common/tez-dag/tez-runtime-internals(core of Tez)
  • tez-mapreduce/tez-mapreduce-examples(mapreduce APIs)

How to deploy

  1. Setup YARN and HDFS cluster
  2. Putting jar files into HDFS
  3. Configuring environment variables
  4. Changing properties:
  • Hive: -Dhive.execution.engine=tez (default: mr)
  • MapReduce: (default: yarn)
$ bin/hdfs -put tez-0.5.0 /apps/
$ bin/hadoop job wordcount.jar inputdir outputdir

Processing model

  • Input
  • Processor
  • Output


  • Vertex

    • Overview
      • Fetching data from Edge
      • Processing database
      • Output data to Edge
    • VertexManager
      • When to launch tasks for vertex?
        • ShuffleVertexManager: supports slowstart: pipelining map-side processing, fetching and reduce-side sort merge processing
  • Edge

    • How to pass data between Vertexs
      • Outputs
        1. UnorderedKVOutput
        2. UnorderedPartitionedKVOutput
        3. OrderedPartitionedKVOutput: Shuffle
      • Inputs
        1. OrderedGroupedKVInput
        2. OrderedGroupedMergedKVInput: Shuffle
      • EdgeProperty
        • DataMovementType
          1. SCATTER_GATHER: Shuffle
          2. ONE_TO_ONE: map to map
          3. BROADCAST
          4. CUSTOM
        • SchedulingType
          1. SEQUENTIAL: MapReduce
          2. CONCURRENT
  • Let's read


  • YarnTaskSchedulerService for distributed mode
  • NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
  • RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
  • NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
  • LocalTaskSchedulerService for local mode

Reducer Auto parallelalizm

    • Collecting profiling information
  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
    // TODO handle duplicates from retries
    if (enableAutoParallelism) {
      // save output size
      VertexManagerEventPayloadProto proto;
      try {
        proto = VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload()));
      } catch (InvalidProtocolBufferException e) {
        throw new TezUncheckedException(e);
      long sourceTaskOutputSize = proto.getOutputSize();
      completedSourceTasksOutputSize += sourceTaskOutputSize;
      if (LOG.isDebugEnabled()) {
        LOG.debug("Received info of output size: " + sourceTaskOutputSize 
            + " numInfoReceived: " + numVertexManagerEventsReceived
            + " total output size: " + completedSourceTasksOutputSize);
  • Determining parallelism based on profiling information
  void determineParallelismAndApply() {
    if(numSourceTasksCompleted == 0) {
    if(numVertexManagerEventsReceived == 0) {
    int currentParallelism = pendingTasks.size();
    long expectedTotalSourceTasksOutputSize = 
    int desiredTaskParallelism = 
    if(desiredTaskParallelism < minTaskParallelism) {
      desiredTaskParallelism = minTaskParallelism;
    if(desiredTaskParallelism >= currentParallelism) {
    // most shufflers will be assigned this range
    int basePartitionRange = currentParallelism/desiredTaskParallelism;
    if (basePartitionRange <= 1) {
      // nothing to do if range is equal 1 partition. shuffler does it by default
    int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
    int remainderRangeForLastShuffler = currentParallelism % basePartitionRange; 
    int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
          (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);"Reduce auto parallelism for vertex: " + getContext().getVertexName()
        + " to " + finalTaskParallelism + " from " + pendingTasks.size() 
        + " . Expected output: " + expectedTotalSourceTasksOutputSize 
        + " based on actual output: " + completedSourceTasksOutputSize
        + " from " + numVertexManagerEventsReceived + " vertex manager events. "
        + " desiredTaskInputSize: " + desiredTaskInputDataSize);
    if(finalTaskParallelism < currentParallelism) {
      // final parallelism is less than actual parallelism
      Map<String, EdgeManagerPluginDescriptor> edgeManagers =
          new HashMap<String, EdgeManagerPluginDescriptor>(bipartiteSources.size());
      for(String vertex : bipartiteSources.keySet()) {
        // use currentParallelism for numSourceTasks to maintain original state
        // for the source tasks
        CustomShuffleEdgeManagerConfig edgeManagerConfig =
            new CustomShuffleEdgeManagerConfig(
                currentParallelism, finalTaskParallelism, 
                getContext().getVertexNumTasks(vertex), basePartitionRange,
                ((remainderRangeForLastShuffler > 0) ?
                    remainderRangeForLastShuffler : basePartitionRange));
        EdgeManagerPluginDescriptor edgeManagerDescriptor =
        edgeManagers.put(vertex, edgeManagerDescriptor);
      getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null);

To be continued...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment