Accreditation Bodies
Accreditation Bodies
Accreditation Bodies
Supercharge your career with our Multi-Cloud Engineer Bootcamp
KNOW MOREHadoop is an open-source software framework used for distributed storage and processing of large datasets on clusters of commodity hardware. Used in a variety of applications, including data warehousing, data processing, machine learning, and more, hadoop is the backbone of data engineering. If you are preparing for big data roles, be interview-ready with this list of top Hadoop interview questions and answers, carefully curated by industry experts and is meant for beginners, intermediate and expert professionals in the field of big data. Get ready to answer questions on Hadoop applications, how Hadoop is different from other parallel processing engines, the difference between nodes, HDFS, JobTracker, configuration files, popular commands, YARN, scheduling, LDAP, directory and more. We have put together a detailed list of big data Hadoop interview questions that will help you become a Hadoop developer, Java developer, or Big Data engineer the industry talks about.
Filter By
Clear all
$ hadoop fs -copyToLocal $ hadoop fs -copyFromLocal $ hadoop fs -put
Below are the main tasks of JobTracker:
Following are the three configuration files in Hadoop:
NameNode- It is also known as Master node. It maintains the file system tree and the metadata for all the files and directories present in the system. NameNode is a very highly available server that manages the File System Namespace and controls access to files by clients. It records the metadata of all the files stored in the cluster i.e. location of blocks stored, size of the files, hierarchy,permissions etc .
NameNode is the master daemon that manages and maintains all the DataNodes (slave nodes).
There are two files associated with the metadata:
Checkpoint node- Checkpoint node is the new implementation of Secondary NameNode . It is used to create periodic checkpoints of file system metadata by merging edits file with fsimage file and finally it uploads the new image back to the active NameNode
It is structured in the same directory as the NameNode and stores the latest checkpoint .
Backup Node - Backup Node is an extended checkpoint node that performs checkpointing and also supports online streaming of file system edits.
Its main role is to act as the dynamic Backup for the Filesystem Namespace (Metadata )in the Primary Namenode of the Hadoop Ecosystem.
The Backup node keeps an in-memory, up-to-date copy of the file system namespace which is always synchronized with the active NameNode state.
Backup node does not need to download fsimage and edits files from the active NameNode to create a checkpoint, as it already has an up-to-date state of the namespace in it’s own main memory. So, creating checkpoint in backup node is just saving a copy of file system meta-data (namespace) from main-memory to its local files system.
It is worth mentioning that this is one of the most frequently asked Hadoop interview questions and answers for freshers in recent times.
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -distcp hdfs://namenodeA/apache_hadoop hdfs://namenodeB/Hadoop
Linux file system
*****************
Hadoop Distributed file system
*****************************
High Availability of cluster was introduced in Hadoop 2 to solve the single point of Name node failure problem in Hadoop 1.
The High availability Name node architecture provides an opportunity to have two name nodes as Active name node and Passive/Standby name node. So, both are running Name Nodes at the same time in a High Availability cluster.
Whenever Active Name Node goes down due to crashes of server or graceful failover during the maintenance period at the same time control will go to passive/Standby Name Node automatically and it reduces the cluster downtime. There are two problems in maintaining consistency in the HDFS High Availability cluster:
As discussed above There are two types of failover: A. Graceful Failover: In this case, we manually initiate the failover for routine maintenance. B. Automatic Failover: In this case, the failover is initiated automatically in case of Name Node failure or Name node crashes.
In either case of a Name Node failure, Passive or Stand by Name Node can take control of exclusive lock in Zookeeper and showing as it wants to become the next Active Name Node.
In HDFS High availability cluster, Apache Zookeeper is a service which provides the automatic failover. When the Name Node is active at that time Zookeeper maintains a session with the active Name Node. In any scenario when active Name Node get failed at that time the session will expire and the Zookeeper will inform to Passive or Stand by Name Node to initiate the failover process.
The ZookeeperFailoverController (ZKFC) is a Zookeeper client that also monitors and manages the Name Node status. Each of the Name Nodes runs a ZKFC also. ZKFC is responsible for monitoring the health of the Name Nodes periodically.
When zookeeper is installed in your cluster you should make sure that below are the process, or daemons running in Active Name Node, Standby Name Node and Data node.
When you do JPS (Java Virtual Machine Process Status Tool ) in Active NameNode you should get below Daemons:
When you do JPS (Java Virtual Machine Process Status Tool ) in Standby NameNode you should get below Daemons:
When you do JPS (Java Virtual Machine Process Status Tool ) in DataNode you should get below Daemons:
The complexity of the answer makes this a must-know for anyone looking for top Hadoop interview questions. Not to mention this is one of the most frequently asked Hadoop interview questions.
It is a facility provided by Hadoop map-reduce framework to access small file needed by an application during its execution. These files are small as it is in KB's and MB's in size. The type of files are mainly text, archive or jar files. These files are small that is why it will keep in the cache memory which is one of the fast memories. Applications which need to use distributed cache to distribute a file should make sure that the file is available and can be accessed via URLs. URLs can either be hdfs:// or http://
Once the file is present on the mentioned URL, the Map-Reduce framework will copy the necessary files on all the nodes before initiation of the tasks on those nodes. In case the files provided are archives, these will be automatically unarchived on the nodes after transfer.
Example: In a Hadoop cluster, we have three data nodes there are 30 tasks we run in the cluster. So each node will get 10 tasks each. Our nature of the task is such kind of task where it needs some information or a particular jar to be adopted before its execution. To fulfil this, we can cache these files which contain the info or jar files. Before execution of the job, the cache files will copy to each slave node application master. Application master than reads the files and start the tasks. The task can be mapper or reducer and these are read-only files. By default Hadoop, the distributed cache is 10GB if you want to change the same you have to modify the size in mapred-site.xml. Here it is coming to our mind that why cache memory is required to perform the tasks. why can’t we keep the file in HDFS on each data node already present and have the application read it? they are a total of 30 tasks and in real time it should be more than 100 or 1000 tasks. If we put the files in HDFS than to perform 30 tasks the application has to access the HDFS location 30 times and then read it but HDFS is not very efficient to access small files for this many times. this is the reason why we are using cache memory and it reduces the number of reads from HDFS locations.
In Hadoop cluster when we are talking about Data node, Data node is where the actual data we are keeping. Data nodes are sending a heartbeat message to the name node in every 3 seconds to confirm that they are active. If the Name Node does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead. Then Name Node initiates the replication of Dead data node blocks to some other data nodes which are active. Data nodes can talk to each other to rebalance the data, move and copy the data around and keep the replication active in the cluster. You can get the block report using below HDFS commands.
Example:
hadoop fsck / ==> Filesystem check on HDFS
# hadoop fsck /hadoop/container/pbibhu
FSCK ended at Thu Oct 20 20:49:59 CET 2011 in 7516 milliseconds
The filesystem under path '/hadoop/container/pbibhu 'is HEALTHY
Name node is the node which stores the file system metadata when we are talking about metadata, it is having information like List of file names, Owner, Permissions, Timestamps, Size, Replication Factor, List of Blocks for each file etc. Metadata, which files maps to what block location and which blocks are stored in which data node. When data nodes are storing a block of information, it maintains a checksum for each block as well. when any data has been written to HDFS, checksum value has been written simultaneously and when it reads by default verifies the same checksum value.
The data nodes update the name node with the block information periodically and before updating verify the value of the checksum. when the checksum value is not correct for a particular block then we will consider as disk level corruption for that particular block , it skips that block information while reporting to the name node, in this way name node will get to know the disk level corruption on that data node and takes necessary steps like it can be replicated from its alternate locations to other active data nodes to bring the replication factor back to the normal level. Data nodes can be listed in DFS.HOSTS file, It contains a list of hosts that are permitted to connect to the Name Node.
Example:
Add this property to hdfs-site.xml: <property> <name>dfs.hosts</name> <value>/home/hadoop/includes</value> </property> includes: host name1 hostname2 hostname3
If include file is empty then all hosts are permitted but it is not a definitive list of active data nodes. Name node will consider those data nodes from which Name Node will receive the heart beats.
ANS:
LVM stands for Logical Volume Management. It is a system of managing logical volumes or filesystems, that is much more advanced and flexible than the traditional method of partitioning a disk into one or more segments and formatting that partition with a filesystem. Today the disks are huge (> 1TB) and LVM is the right tools to dynamically allocate and resize partitions of these huge disks.
If you are using Linux to deploy Hadoop nodes, master or slaves, it is strongly recommended that you should not use LVM in Linux because of below points
HDFS data might not always be distributed uniformly across DataNodes for different reasons like if some DataNodes have less disk space available for use by HDFS or During the normal usage/ when usage is more, the disk utilization on the DataNode machines may become uneven or when a new Data Nodes are added to an existing cluster at that time also data nodes utilizations are uneven. to mitigate this problem balancer is required.
A balancer is a tool that balances disk space usage on an HDFS cluster and it analyzes block placement and balances data across the DataNodes. The balancer moves blocks until the cluster is deemed to be balanced, which means that the utilization of every DataNode more or less equally distributed. The balancer does not balance between individual volumes on a single DataNode.
HDFS balancer [-policy <policy>]
The two supported policies are Blackpool and data node. Setting the policy to Blackpool means that the cluster is balanced if each pool in each node is balanced while the data node means that a cluster is balanced if each DataNode is balanced. The default policy is the data node.
HDFS balancer [-threshold <threshold>] specifies a number in [1.0, 100.0] representing the acceptable threshold of the percentage of storage capacity so that storage utilization outside the average +/- the threshold is considered as over/underutilized. The default threshold is 10.0.
When we are talking about Rack, It is the collection of multiple servers based on your requirement. All these servers are connected using the same network switch and if that network goes down then all machines in that rack will be out of service and we can say rack is downstate.
To mitigate the same, Rack Awareness was introduced for Hadoop by Apache. In Rack Awareness, Name Node chooses the Data Node which is closer to the rack where the Name Node will be available or nearby that rack. Name Node maintains all the Rack ids of each Data Node to get the rack information and based on Rack ID Name Node can communicate with Data Node. In Hadoop, when we are maintaining a Rack we have to follow certain rules as mentioned below.
Below are some points due to which we are following Rack Awareness in Hadoop. Please find the details as mentioned below:
There are two types of tables which HIVE supports.
Hive Managed Tables:
Hive Managed Table is also known as an internal table. When we will create a table in Hive, by default Managed table will create and it manages the data as well. It means that Hive is storing the data into its warehouse directory. A managed table is stored under the hive.metastore.warehouse.dir path property and default location of table will be /apps/hive/warehouse/<db_name>.db/<table_name>. This path will be modifiable. If a managed table or partition is dropped, then the data and corresponding metadata of the table or partition are deleted. If you do not specify the PURGE option then the data is moved to a trash folder for a certain period, it will be deleted permanently after that.
Example:
1. Create Table
hive> create table univercity_db.school_table(name string, roll no int) row format delimited fields terminated by ',';
OK
Time taken: 0.202 seconds
2. Describe table
hive> describe formatted univercity_db.school_table;
OK
you will get extra information like whether the table is managed or an external table. when the table is created, what kind of file format, Location of the data path in HDFS, whether the object is a table or view.
3. Load the data to table from the local path
hive>load data local inpath '/home/pbibhu/Desktop/blog/school' into table univercity_db.school_table;
After loading from the local path you can further use hive commands to select/count/describe etc
Hive External Tables:
while creating an External table the location of the data path is not the usual warehouse path, you have to provide the HDFS path outside of the warehouse directory. While Creating an external table location is mandatory in the create syntax. By any chance structure or partitioning of an external table is changed then an MSCK REPAIR TABLE table_name statement can be used to refresh metadata information. Basically, In External Table we cannot load the table from a local path. you have to load data from HDFS mentioning the path.
Use external tables when files are present in the remote locations, and the files should remain even if the external table is dropped.
Example:
1. Create Table
CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE/ORC LOCATION 'hdfs/pbibhu/school';
2. Create partition Table
CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING) partitioned by (student_ID int) STORED AS ORC LOCATION 'hdfs/pbibhu/school';
3. insert the data to internal table from external table,data structure should be same for both the tables.
hive> CREATE TABLE IF NOT EXISTS office(EmployeeID INT,FirstName STRING, Title STRING, State STRING, Laptop STRING) STORED AS ORC;
OK
hive> CREATE EXTERNAL TABLE IF NOT EXISTS Office_text( EmployeeID INT,FirstName STRING, Title STRING, State STRING, Laptop STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/user/pbibhu/office';
OK
hive> INSERT OVERWRITE TABLE office SELECT * FROM office_text;
Resource allocation within the queues is controlled separately. Within a queue:
FairScheduler can apply any of FIFO policy, FairPolicy or DominantResouceFairnessPolicy.
CapacityScheduler can apply either FifoPolicy and fair policy.
Fair Scheduler can use different scheduling policies. The default scheduling policy is fair sharing, using memory as a resource. There’s also a FIFO policy first in first out which is not much use. It’s quite common to use the third type of scheduling policy, DRF, which allocates both memory and CPU resources to applications,DRF is similar to fair-scheduling, but it is important to keep in mind that it applies primarily to the allocation of resources among queues, an activity which is already dominated by queue weights. Thus, the most important thing about DRF is that considers multiple resources, rather than that it attempts to provide equal resource allocation.
Initially, TCS and Wipro each have some resources allocated to jobs in their respective queues and only 10 GB remains in the cluster. Each queue is requesting to run a map task requiring 20 GB, so memory is available 30 GB and the rest of the required resource will take from CPU. WIPRO currently holds 15 GB resources. Another 10 GB is required for mapper task so the fair scheduler will award a container the requested 10 GB of memory to WIPRO. Now the available memory is 5 GB for TCS and it will require another 20 GB to run the mapper task. In this case, there is no memory available for TCS and DRF will try to use 5GB from memory and rest 20 GB can be used from the CPU.
A staple in HDFS interview questions, be prepared to answer this using your hands-on experience. This is also one of the top interview questions to ask a Hadoop engineer.
Basically, we store files under some folders in HDFS, most of the time the folder that we give will be based on Application Name. When we talk about small files it should be lesser than the block size, for example, if the block size is 64mb or 128mb then smaller files are considered as lesser than the block size. If the files are smaller than the block size then we will face a problem at the HDFS level as well as Map-Reduce Level.
In HDFS when we are storing files/Directories, corresponding metadata will be stored in the Name Node, each file, directory, block metadata information will approximately occupy 150 bytes. Suppose if you have 1 million files and each are using approximately a block size or lesser then the block size then metadata size of the corresponding files/directories are approximately 300MB of memory, In such case lot of memory is occupied in the name node and after some time threshold will be reached and further it will be a problem with the current hardware. Certainly, performance will be a downgrade.
During the execution of Map-reduce, when the file size is less than or equivalent to the block size, for each block size or equivalent split size one mapper will launch so approximately large number of Mapper will launch for a large number of small files in this case processing time will be more for each file having small chunk of data .when we are reading and writing a large number of small files seek time will be more which will impact performance and seeks are generally expensive operation . Since Hadoop is designed in such a way to run over your entire dataset, it is best to minimize seeks by using large files.
Remediation plan:
We can merge all the small files using HDFS getmerge command into a big file. getmerge command can copy all the files available in the HDFS folder to a single concatenated file in the local system. after concatenated in the local system you can place the same file from local to HDFS using HDFS PUT command. Please find the example mentioned below.
hadoop fs -getmerge /hdfs_path/pbibhu/school_info_* /local_path/pbibhu/school_inf.txt hadoop fs -put school_inf.txt /hdfs_path/pbibhu/school_inf.txt
Below are the file formats which support Hadoop.
Usually, text format was very common prior to Hadoop and even it is very common in a Hadoop environment as well. Data are presented as lines and each line terminated by a newline character as /n or Tab separated as /t.
CSV stands for comma-separated-values, so data fields are separated or delimited by comma. For example, we have below value in excel sheet
Name | class | section | subject |
---|---|---|---|
Bibhu | 7 | A | English |
The above data will be present in a CSV formatted file as follows.
Bibhu,7, A, English
JSON stands for Javascript object Notion. It is a readable format for structuring data, basically, it is used to transfer the data from server to web Application. We can use it as an alternative to XML. In JSON data are presenting as key and value pairs. The key is always a string data type which is enclosed with a quotation mark. Value can be a String, Number, Boolean, Array or object.
the basic syntax is Key followed by a colon followed by a value.
Example: "Name" : "Bibhu"
AVRO stores the data in JSON format which is easy to read and understand. The Data itself stored in Binary format which is making it compressed and Efficient, Each value is stored without having any metadata other than a small schema identifier having a size of 1 to 4 bytes. it is having the capability to split the large data set into subsets which are very much suitable for Map Reduce processing.
In Hive following command is used to use AVRO.
Create table avro_school
(column_address)
stored as avro;
RC stands for Record Columnar which is one type of Binary file format, it will provide high compression on top of rows or on multiple rows at a time for which we want to do some operation.RC Files consisting of Binary Key/Value pairs. RC File format first partitions the rows horizontally into Row split and after that all the row split presented vertically in a columnar way. please find the example as mentioned below:
Step 1
First, partition the rows horizontally into Row split
501 | 502 | 503 | 504 |
505 | 506 | 507 | 508 |
509 | 510 | 511 | 512 |
513 | 514 | 515 | 516 |
Step 2
All the row split presented vertically in a columnar way
501 | 502 | 503 | 504 |
505 | 506 | 507 | 508 |
509 | 510 | 511 | 512 |
513 | 514 | 515 | 516 |
RC file combines Multiple functions such as data storage formatting, data compression, and data access optimization. It is able to meet all the four below requirements of data storage.
The ORC File provides a more efficient way to store the Relational Data than then RC file. It is basically reducing the data storage format by up to 75% of the original. as compared to the RC file ORC file takes less time to access the data and takes less space to store the data as well, It internally divides the data again with a default size of 250M.
In Hive following command is used to use the ORC file.
CREATE TABLE ...STORED AAS ORC
ALTER TABLE ... SET FILEFORMAT ORC
SET hive.default.fileformat=ORC
It's another column-oriented storage like RC format and ORC format but it's very good at handling nested data as well as good at query scan for a particular column in a table. In the Parquet New column can be added at the end of the structure. It is handling the compression using Snappy, ggip currently snappy is a default. The parquet is supported by Cloudera and optimized for Cloudera Impala.
Hive Parquet File Format Example:
Create table parquet_school_table
(column_specs)
stored as parquet;
Both the scheduler cannot be used in the same cluster. Both the scheduling algorithms have come up due to specific use-cases and cluster-wise you have to set up the configuration file for either Fair scheduler or Capacity Scheduler. you cannot set up both the scheduler for one cluster.
you can choose the Fair Scheduler using below scheduler class in yarn-site.xml as mentioned below:
<property> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value> </property>
To use the Capacity Scheduler you have to configure the Resource Manager in the conf/yarn-site.xml as mentioned below:
yarn.resourcemanager.scheduler.class- org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler while setting up the queues in Capacity Scheduler you need to make some changes in etc/hadoop/capacity-scheduler.xml configuration file.The Capacity Scheduler has a predefined queue called root.
whatever queues we will create in the system are children of the root queue.Setting up further queues- Configure property yarn.scheduler.capacity.root.queues with a list of comma-separated child queues.Setting up sub-queues within a queue- configure property yarn.scheduler.capacity.<queue-path>.queues queue-path can mention the full path of the queue’s hierarchy and it is starting at root with. (dot) as the delimiter.
Queue capacity is provided in percentage (%). The sum of capacities for all queues, at each queue level, must be equal to 100. If there are free resources in the queue then Applications in the queue may consume the required resources.
Capacity scheduler queue configuration example:
If there are two child queues starting from root XYZ and ABC. XYZ further divides the queue into technology and development. XYZ is given 60% of the cluster capacity and ABC is given 40% in this scenario please find the details as mentioned below to set up your yarn-site.xml.
<property> <name>yarn.scheduler.capacity.root.queues</name> <value>XYZ, ABC</value> </property> <property> <name>yarn.scheduler.capacity.root.XYZ.queues</name> <value>technology,marketing</value> </property> <property> <name>yarn.scheduler.capacity.root.XYZ.capacity</name> <value>60</value> </property> <property> <name>yarn.scheduler.capacity.root.ABC.capacity</name> <value>40</value> </property>
Basically It is massaging system which is exchanging the large volume of Streaming/log data in between processes, Application and servers. Distributed messaging is based on the queue which can handle a high volume of data and allow you to pass the messages from one end to another. Kafka is appropriate for both offline and online message consumption.
Prior to talk about Kafka further, we need to know about the components belongs to Kafka and below are the details.
Kafka Broker: Kafka cluster consists of one or more server that is called kafka broker in which kafka is running. Producers are nothing but processes that distribute data into Kafka topics within the brokers, then consumer of topics drag the messages off from the Kafka topics.
few basic points related to Kafka Broker:
Kafka Topics: A Topic is nothing but category or feed name to which messages are stored and distributed. All kafka massages are prepared into topics. so whenever you want to send a message you can send it to specific Topic and whenever you want to read the messages you can read it from a specific topic.
Kafka Topic Partition: Kafka topics are divided into a number of partitions and it contains the messages in a sequence, sequence is only applicable within a partition. Each massage in partition is recognized by its offset value. Here offset is represented as an incremental ID which is maintained by Zookeeper. The offsets are meaningful for that partition, It does not have any value across the partition. A topic may contain any number of partitions. Basically there is no such rule and regulation for write the available messages to which partition. However, there is an option available to adding a key to a massage. If a producer distributes the messages with a Key then all the messages with the same key will go to the same partition.
Kafka producers: Basically producers are writing data to a topic, while writing data, producers need to specify the Topic name and one broker name to connect to. Kafka is having own mechanism to send the data to the right partition of the right broker automatically.
Producers having the Mechanism where producer can receive an acknowledgment of data it writes. Below is the acknowledgment which the producer receives.
Kafka Consumer: Basically consumer reads data from topics. As we know Topics are divided into multiple partitions so consumer reads data from each partition of topic. Consumers need to mention the topic name as well as broker. Consumer read data from a partition in sequence. when consumer connects a broker Kafka will make sure that it connected to an entire cluster.
Kafka Consumer Group: Consumer group consists of multiple consumer process. One consumer group having one unique group Id. One consumer instance in one consumer group will read data from one partition. If the number of consumers exceeds the number of partition then in this case extra number of consumers will be inactive. For example, there are 6 partitions in total and there are 8 consumers in a single consumer group. In this case, there will be 2 inactive consumers.
Here in Kafka two types of massaging patterns are available such as:
1. Point to point messaging system:
In point to point messaging system, Massages are keeping on the queue. One or more consumers read the message in the queue but a particular message can be read by one consumer at a time.
Basically Point-to-point messaging is used when a single message will be received by only one message consumer. There may be multiple consumers reading on the queue for the same message but only one of the consumers will receive it. There can be multiple producers as well. They will be sending messages to the queue but it will be received by only one receiver.
2. Publish subscribe messaging system:
Here in Publish subscribe messaging system, message producers are called publishers and message consumers are called subscribers. Here in this scenario Topic can have multiple receivers and each and every receiver receives a copy of each message.
Based on the above picture, below are a few points that explain the publish-subscribe messaging system.
Massages are shared through channel and it is called as Topic. Topics are placed in a centralized place where the producer can distribute and a consumer can read the messages.
Each message is delivered to one or more than one consumer and it is called subscribers. The publisher or producer is not aware of which massage or topic is receiving by which consumer or subscriber. A single message created by one publisher may be copied and distributed to hundreds or thousands of subscribers.
Role of Zookeeper in Kafka:
Zookeeper is a mandatory component in Kafka ecosystem, It helps in managing kafka brokers and helps in leader election of partitions. It helps in maintaining the cluster membership. For example, when a new broker is added or a broker is removed and a new topic is added or a topic is deleted, when a broker goes down or comes up etc, Zookeeper manages such situations informing Kafka. It also handle the topic configurations like number of partitions a topic has and the leader of the partitions for a topic.
The sequence of starting the Kafka services:
This is default zookeeper configuration file available in Kafka, for which below are the properties
dataDir=/tmp/zookeeper Client Port= 2183 [root@xxxx]# /bin/zookeeper-server-start.sh /config/zookeeper.properties
You can start the Kafka broker with the default configuration file. Below are the configuration properties
broker.id=0 log.dir=/tmp/Kafka-logs zookeeper.connect=localhost:2183
Here one broker whose ID is 0 and its connecting the zookeeper using port as 2183.
[root@xxxx]# /bin/kafka-server-start.sh /config/server.properties
Below is the example to create a topic with a single partition and replica
[root@xxxx]#/bin/Kafka-create-topic.sh -zookeeper localhost:2183 -replica 1 -partition 1 -topic examtopic
Here in the above example, we created a topic as an examtopic.
[root@xxxx]#/bin/Kafka-console-producer.sh -broker-list localhost:9090 -topic exam topic
broker-list ==> this is the server and port information for the brokers, here in the above example we have provided server as localhost and port as 9090
Topic==> Name of the Topic here in the above example we have provided as examtopic
in command line we created the producer client that accepts your massages and distributes it to a cluster as massages then a consumer can consume or read the messages.
Producer client is running you can type something on the terminal where the producer is running
Hi Bibhu, How are you?
[root@xxxx]#/bin/Kafka-console-consumer.sh -zookeeper localhost:2183 -topic examtopic -from-beginning
Consumer runs with the default configuration properties as mentioned below, this information will be there in the consumer. Properties file.
Basically, SQOOP can use to get the Data from Relational database that is DB2, MYSQL, Oracle, etc and load into Hadoop that is HDFS, Hive, Hbase, etc or vice versa this process is called ETL for Extract, Transform and Load. Alternatively, SQOOP can import and export data from the Relational database to Hadoop.
Below are some of the important features which are Sqoop having:
Sqoop creating SQL query for each mapper internally which is ingesting data from a source table to HDFS, basically, 4 mappers will be generated by default but you can modify the number of mappers based on your logic and requirements. The number of mapper influence the split by column. split by column work based on where condition and each mapper have a logical partition of the Target table or directory. For example, if we used three mappers and a split-by column. suppose 1,000,000 records are there. Sqoop can segregate using min and max call to the DB on the split-by column. Sqoop's first mapper would try to get values from 0 to 333333 records, the second mapper would pull from 333334 to 666666 records and the last would grab from 666667 to 1000000 records.
Scoop is running a Map-only job, as we know the Reduce phase is required in case of aggregations. But here in Apache Sqoop we just import and export the data. It does not perform any aggregations. Map job launch multiple mappers depending on the number defined by the user in the above example we are considering as 3. For Sqoop import, each mapper task will be assigned with a part of the data to be imported. Sqoop distributes the input data among the mappers equally to get high performance. Then each mapper creates a connection with the database using JDBC and fetches the part of data assigned by Sqoop and writes it into HDFS or Hive or HBase based on the arguments provided in the CLI so alternatively Mappers drop the data in the Target-dir with a file named as part-m-00000, part-m-00001, part-m-00002.
Here in this scenario, we will discuss how sqoop will import data from the Relational database to Hive. Sqoop can only import the data as a text file or sequence file into a hive database. If you want to use the ORC file format then you must follow a two-stage approach, in the first stage sqoop can get the data into HDFS as a text file format or sequence file format, then in the second stage hive can convert the data into ORC file format.
Please find the steps as mentioned below.
Example:
sqoop import --connect jdbc:mysql://db.bib.com/sales --table EMPLOYEES --username <username> --password-file ${user.home}/.password
Example:
sqoop import --connect jdbc:mysql://db.bib.com/sales --table EMPLOYEES --columns "employee_id,first_name,last_name,job_title" --where "start_date > '2010-01-01'"
--num-mappers 8
Example:
sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' --split-by a.id --target-dir /user/bib/sales
1. HDFS as target directory
sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' --split-by a.id --target-dir /user/bib/sales
2. Hive as Target table
sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --hive-import
After jobs submissions, Job IDs are generated by job tracker in Hadoop 1 and in Hadoop 2/3 Application IDs are generated. Application ID or Job ID is represented as a globally unique identifier for an Application or Job.
Example: job_1410450250506_002 / application_1410450250506_002
_1410450250506 ==> this is the start time of Resource manager which is achieved by using "cluster timestamp"
_002 ==> Basically counter is used to keep track of occurrences of the job
Task IDs are formed by replacing the job or Application with task prefix within the job
Example: task_1410450250506_0002_m_000002
Here in the above example, _000002 is the third map task of the job "job_1410450250506_002"
Tasks may be executed more than once due to task failure so to identify different instances of task execution, Task attempts are given unique IDs.
Example: attempt_1410450250506_0002_m_0000002_0
_0 is the first attempt of the task task_1410450250506_002_m_0000002
When you will open the Job history WEB UI, you will get the image below. Here in the image, you can able to see the Job state where the Job is succeeded or Failed. How many Mappers and Reducers are launched whether all the Mappers and Reducers are completed or not you can find all these details.
JOB HISTORY Server:
When you click the Job id from the Job history server, you will get below image and more or less similar information you will get as above.
Overview:
Hadoop Counters:
This is the most useful option to examine job performance. Hadoop provides several built-in counters as well as you can customize counters as per your requirements. Counters help you to get the below kind of information.
Hadoop counters provide three types of Built-in counters such as :
In addition to this Hadoop provides another 3 counters from other groups by default, such as:
File system counters:
Under File system counter You can get the information regarding reading and write operations in both the local file system and HDFS as well. The total number of bytes read and written depending upon compression algorithms. Here are the few key counters.
File_Bytes_Read: The total number of bytes read from the local file system by the map-reduce Tasks. File_Bytes_Write: Total number of bytes written to the local file system. During the Map phase, the mapper task writes the intermediate results to the local file system and during the shuffle phase of the Reducer task also write to the local file system when they spill intermediate results to the local file system during sorting.
JOB Counters:
You will get Job information related to Mapper and reducer under JOB Counters. The following are the key job counters.
MapReduce Framework counters:
You will get all the statistic of MapReduce job under MapReduce framework counter. It will help you to do the performance tuning of the job.
Other counters are as follows:
Map-reduce jobs are limited by the bandwidth available on the cluster, hence it is beneficial if the data transferred between map and reduce tasks can be minimized. This can be achieved using Hadoop Combiner. A combiner runs on a map output and its output forms the input to the reducer. It decreases the amount of data that needs to be transferred between the mapper and reducer, as well as improves the performance of a map-reduce job. A combiner can, however, be used for functions that are commutative or associative.
Partitioner controls which partition a given key-value pair will go to. Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer. The total number of practitioners that run in a Hadoop job is equal to the number of reducers.
The partition phase takes place after the map phase and the reduce phase. A map-reduce job having both partitioner and reducer work like below: Output from each mapper is written to a memory buffer and spilled to a local directory in case of overflow. The spilled data is partitioned according to the partitioner. Data in each partition is sorted and combined based on the logic in the combiner. The combined data is sent to reducer based on the partition key.
A job consists of the following components: The client which submits map-reduce job, Resource manager which coordinates allocation of compute resources, Node managers which launch and monitor the compute containers, Hadoop Distributed File System (HDFS) which is used for sharing resources between the above components and Application Master which coordinates tasks running in map-reduce job.
The map-reduce job begins when the client/job submitter sends the request to the Resource Manager. It asks for a new application id to be allocated. It also checks whether the output directory specified exists or not, and computes input splits for the job as well. The resources needed to run the job including the application jar are copied to HDFS. Finally, the job submitter submits the job to Resource Manager.
The Resource Manager now allocates a container and launches the application master. The application master determines no of the mapper and reducer tasks that need to be launched and requests resource manager to launch containers for the same. Resource Manager, in turn, directs Node Managers to launch the containers where the tasks get run. Once the tasks are initiated, the application master keeps track of them. In case any task fails or gets stuck it relaunches them on another container. Requests for map tasks are made first and with a higher priority than those for reduce tasks, since all the map tasks must complete before the sort phase of the reduce can start. Once the mapper task completes, its output undergoes sorting, shuffling and partitioning (in case of multiple reducers), is sent to the combiner (if any) and finally sent to reducer(s). The output of reducer is written to HDFS.
The usual block size on HDFS is 128 MB. The size of the HDFS block is kept large enough to minimize the seek cost. When the block size is large enough the time to transfer data will be significantly longer than the time to seek the start of a block. As data transfer is much higher than the disk seek rate it is optimal to keep the block size large. The seek time is usually kept as 1% of transfer time. e.g. If seek time around 10 ms and the data transfer rate is 100MB/s then block size comes to around 128 MB.
However, this doesn’t mean that the block size can be made indefinitely large. Map tasks operate on one block (assuming split size is equal to block size) at a time. Having a huge block size will result in fewer splits and hence less number of mappers which will reduce the advantage that can be gained by parallelly working on multiple blocks.
Having a block abstraction for a distributed file system has many benefits.
High availability in HDFS implies that the system does not have any single point of failure, is available 24/7 so that there is no or limited impact on client applications and is able to self-recover from failure without any manual intervention.
For implementing High Availability in HDFS, a pair of NameNodes is set up in an active-standby configuration. The passive node is kept in sync with the active node. Both active and passive nodes have access to shared storage space. When any namespace modification is performed by the Active node, it logs a record of the modification to an edit log file stored in the shared directory. The Standby node is constantly watching this directory for edits, and as it sees the edits, it applies them to its own namespace thereby keeping in sync with Active node.
In case of a failure of active NameNode, the standby node takes over and starts servicing client requests. The transition from active to standby node is managed by Failover Controller. It uses Zookeeper to ensure that only NameNode is active at a given time. Each NameNode runs a failover controller process that monitors its NameNode for failures using a heartbeat mechanism and triggers a failover in case of failure.
However, it needs to be ensured that only NameNode is active at a given time. Two active NameNodes at the same time will cause the corruption of data. To avoid such a scenario fencing is done which ensures that only NameNode is active at a given time. The Journal Nodes perform fencing by allowing one NameNode to be writer at a time. The Standby NameNode takes over the responsibility of writing to the JournalNodes and forbid any other NameNode to remain active.
In the case of large data, it’s advised to use more than one reducer. In the case of multiple reducers, the thread spilling map output to disk first divides the data into partitions corresponding to the number of reducers. Within each partition, an in-memory sort on the data is performed. A combiner, if any, is applied to the output of the sort. Finally, the data is sent to reducer based on the partitioning key.
Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer, thus allowing for even distribution of the map output over the reducer. The Default partitioner in a map-reduce job is Hash Partitioner which computes a hash value for the key and assigns the partition-based its result.
However, care must be taken to ensure that partitioning logic is optimal and data gets sent evenly to the reducers. In the case of a sub-optimal design, some reducers will have more work to do than others, as a result, the entire job will wait for that one reducer to finish its extra load share.
The replication factor in HDFS can be modified /overwritten in 2 ways-
$hadoop fs –setrep –w 2 /my/sample.xml
sample.xml is the filename whose replication factor will be set to 2
$hadoop fs –setrep –w 6 /my/sample_dir
sample_dir is the name of the directory and all the files in this directory will have a replication factor set to 6.
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -touchz /hadoop/sample ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -ls /hadoop
Found 2 items
-rw-r--r-- 2 ubuntu supergroup | 0 2018-11-08 00:57 /hadoop/sample |
-rw-r--r-- 2 ubuntu supergroup | 16 2018-11-08 00:45 /hadoop/test |
fsck a utility to check health of the file system, to find missing files, over-replicated, under-replicated and corrupted blocks.
Command for finding the blocks for a file:
$ hadoop fsck -files -blocks –racks
Hadoop distributed file system (HDFS) is the primary storage system of Hadoop. HDFS stores very large files running on a cluster of commodity hardware. It works on the principle of storage of less number of large files rather than the huge number of small files.
HDFS stores data reliably even in the case of hardware failure. It provides high throughput access to the application by accessing in parallel. Components of HDFS:
Update the network addresses in the dfs.include and mapred.include
$ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes Update the slave file.
Start the DataNode and NodeManager on the added Node.
One should note that this is one of the most frequently asked Hadoop interview questions for freshers in recent times.
It dsplays the Access Control Lists (ACLs) of files and directories. If a directory has a default ACL, then getfacl also displays the default ACL.
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -getfacl /hadoop
file: /hadoop
owner: ubuntu
group: supergroup
This exception means there is no communication between the DataNode and the DataNode due to any of the below reasons :
Expect to come across this, one of the most important Hadoop interview questions for experienced professionals in data management, in your next interviews.
You can provide dfs.block.size on command line :
hadoop fs -D dfs.block.size=<blocksizeinbytes> -cp /source /destination
hadoop fs -D dfs.block.size=<blocksizeinbytes> -put /source /destination
Below command is used to enter Safe Mode manually –
$ Hdfs dfsadmin -safe mode enter
Once the safe mode is entered manually, it should be removed manually.
Below command is used to leave Safe Mode manually –
$ hdfs dfsadmin -safe mode leave
The two popular utilities or commands to find HDFS space consumed are
HDFS provides reliable storage by copying data to multiple nodes. The number of copies it creates is usually referred to as the replication factor which is greater than one.
Hadoop task log files are stored on the local disk of the slave node running in the disk. In general, log related configuration properties are yarn.nodemanager.log-dirs and yarn.log-aggregation-enable. yarn.nodemanager.log-dirs property determines where the container logs are stored on the node when the containers are running. its default value is ${yarn.log.dir}/userlogs. An application localized log directory will be found in /{yarn.nodemanager.log-dirs}/application_${application_id}.individual containers log directories will be shown in subdirectories named container_{$conatinerid}.
For MapReduce application, each container directory will contain the files STDERR, STDOUT and SYSLOG generated by the container.
The yarn.log-aggregation-enable property specifies whether to enable or disable log aggregation. If this function is disabled, then the node manager will keep the logs locally and not aggregate them.
Following properties are in force when log aggregation is enabled.
yarn.nodemanager.remote-app-log-dir: This location is found on the default file system (usually HDFS) and indicates where the node manager should aggregate logs. It should not be the local file system otherwise serving daemon such as the history server will not be able to serve the aggregated logs.the default value is /tmp/logs.
yarn.nodemanager.remote-app-log-dir-suffix: the remote log directory will be created at {yarn.nodemanager.remote-app-log-dir}/${user}/{suffix}. the default suffix value is "logs".
yarn.log-aggregation.retain.seconds: This property defines how long to wait before deleting aggregated logs; -1 or any other negative value disables the deletion of aggregated logs.
yarn.log-aggregation.retain-check-interval-seconds: This property determines how long tom wait between aggregated log retention checks.if its value is set to 0 or a negative value then the value is computed as one-tenth of the aggregated log retention time. The default value is -1.
yarn.log.server.url: once an application is done, Nodemanagers redirect the web UI users to this URL, where aggregated logs are served, it points to MapReduce-Specific job history.
The following properties are used when log aggregation is disabled:
yarn.nodemanager.log.retain-seconds: The time in seconds to retain user logs on the individual nodes if log aggregation is disabled. the default is 10800.
yarn.nodemanager.log.deletion-threads-count: The number of threads used by the node managers to clean up logs once the log retention time is hit for local log files when aggregation is disabled.
YARN requires a staging directory for temporary files created by running jobs. local directories for storing various scripts that are generated to start up the job's containers (which will run the map reduce task).
Staging directory:
Local directory:
[yarn.nodemanager.local-dirs]/usercache/$user/appcache/application_${app_is}
When a client launches an application, the corresponding application master container is launched with ID 000001. The default size is 1 GB for each application master container but some time data size will be more. In that case, the application master reaches the limits of its memory in this case application will fail and you will get a similar message as mentioned below.
Application application_1424873694018_3023 failed 2 times due to AM Container for appattempt_1424873694018_3023_000002 exited with exitCode: 143 due to: Container
[pid=25108,containerID=container_1424873694018_3023_02_000001] is running beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory used; 1.5 GB of 2.1 GB virtual memory used. Killing container.
One of the most frequently posed Hadoop scenario based interview questions, be ready for this conceptual question.
When configuring MapReduce 2 resource utilization on YARN, there are three aspects to be considered:
Physical RAM limit for each Map and Reduce Task
*********************************************
You can define how much maximum memory each Map and Reduce task will take. Since each Map and each Reduce task will run in a separate container, these maximum memory settings should be at least equal to or more than the YARN minimum Container allocation(yarn.scheduler.minimum-allocation-mb).
In mapred-site.xml: <name>mapreduce.map.memory.mb</name> <value>4096</value> <name>mapreduce.reduce.memory.mb</name> <value>8192</value> The JVM heap size limit for each task
*********************************
The JVM heap size should be set to lower than the Map and Reduce memory defined above, so that they are within the bounds of the Container memory allocated by YARN.
In mapred-site.xml: <name>mapreduce.map.java.opts</name> <value>-Xmx3072m</value> <name>mapreduce.reduce.java.opts</name> <value>-Xmx6144m</value> the amount of virtual memory each task will get
********************************************
Virtual memory is determined on upper limit of the physical RAM that each Map and Reduce task will use.default value is 2.1. for example if Total physical RAM allocated = 4 GB than Virtual memory upper limit = is 4*2.1 = 8.2 GB
Under the Fair scheduler when a single application is running that application may request the entire cluster (if needed). If additional applications are submitted, resources that are free are assigned "fairly" to the new application so that each application gets roughly the same amount of resources. Fair scheduler organizes application further into queues and shares resources fairly between these queues. The fair scheduler in YARN supports hierarchical queues which means sub-queues can be created within a dedicated queue. All queues descend from a queue named “root”.A queue’s name starts with the names of its parents, with periods as separators. So a queue named “parent1” under the root queue would be referred to as “root.parent1”, and a queue named “queue2” under a queue named “parent1” would be referred to as “root.parent1.queue2”
A join operation is used to combine two or more datasets. In Map Reduce joins are of two types – map side joins and reduces side join.
A map side join is one in which join between two tables is performed in the Map phase without the involvement of the Reduce phase. It can be used when one of the data sets is much smaller than other data set and can easily be stored in DistributedCache. One of the ways to store datasets in DistributedCache is to do it in setup() method of Mapper. Since in map side join, the join is performed in mapper phase itself, it reduces the cost that is incurred for sorting and merging data in the shuffle and reduce phase, thereby improving the performance of the task
Reduce side join on the other hand works well for large datasets. Here the reducer is responsible for performing the join operation. This type of join is much simpler to implement as data undergo sorting and shuffling before reaching the reducer and values having identical keys are sent to the same reducer. The reducer is responsible for performing the join operation. It is comparatively simple and easier to implement than the map side join as the sorting and shuffling phase sends the values having identical keys to the same reducer and therefore, by default, the data is organized for us. However, the I/O cost is much higher due to data movement involved in the sorting and shuffling phase.
The map-reduce framework doesn’t guarantee that the combiner will be executed for every job run. The combiner is executed at each buffer spill. During a spill, the thread writing data to the disk first divides data into partitions corresponding to the number of reducers. Within each partition, the thread performs an in-memory sort on the data and applies the combiner function (if any) on the output of sort.
Various ways to reduce data shuffling in a map-reduce job are:
In a map-reduce job, the application master decides how many tasks need to be created for the job to be executed. The number of mapper tasks created is equal to the number of splits. These tasks are usually launched in different JVMs than the application master (governed by data locality).
However, if the job is small, the application master may decide to run the tasks in the same JVM as itself. In such a case the overhead of allocating containers for new tasks and monitoring them to gain that would be had in running the tasks in parallel compared to running the tasks sequentially on the same node. Such a job is called an Uber task.
So how does the application master determine if the job is small enough to be run as an uber task? By default, if the job requires less than 10 mappers for its processing and one 1 reducer, and input size is less than the size of one HDFS block, then the application master may consider launching the job as an uber task.
If the job doesn’t qualify to be run as an uber task then the app master requests for containers to be launched for all map and reduce tasks from the resource manager.
A file is read by a Map-Reduce job using an InputFormat. It defines how the file being read needs to be split up and read. InputFormat, in turn, defines a RecordReader which is responsible for reading actual records from the input files. The split computed by InputFormat is operated upon by map task. Map task uses Record Reader corresponding to InputFormat to read the data within each split and create key-value pairs.
The various types of InputFormat in Hadoop are:
YARN stands for Yet Another Resource Negotiator. YARN is taking care of Job tracker's work like resource management and a part of that YARN is working as a schedule as well. It Supports a variety of processing engines and Applications. When we are saying different data processing engine it means it supports Graph processing, Interactive Stream processing and batch processing to run and process the data which is stored in HDFS. Basically, the Resource manager receives the Job request from the client and accordingly it will Launch Application master JVM having default memory as 1 core and 2gb.
Application Master will contact Name Node and get the location of the block, based on the availability of block in Node Manager It will check whether sufficient resources are available or not, Accordingly it will inform the Resource manager and Resource manager will provide resources to Node Manager to Launch the JVM for the JOB.
Yarn is working as a schedule it means the Scheduler is responsible for allocating the resources to running the Application. It will not monitor the Application as well as it will not track the Application. It will not restart the failed task whether it is failed due to Application failure or Hardware Failure.
YARN Scheduler supports three types of scheduler
1. FIFO scheduler
2. FAIR scheduler
3. Capacity Scheduler.
Based on the Application requirement Hadoop Admin will select either FIFO, FAIR or Capacity Scheduler.
FIFO scheduling is First in First out, in our current environment, this is rarely used. Fair scheduling is a method where resources are distributed in such a way that it is more or less equally divided to each job. Capacity scheduler where you can make sure that some percentage of resources you can assign to cluster based on your demand or computing need.
Prior to start the YARN services, start the Resource manager and node manager services. In between Resource manager and Node, the manager makes sure the resource manager should start before starting node manager services. Please start your YARN services in the sequence mentioned below.
#service Hadoop-yarn-resource manager start
#service -yarn-nodemanager start
#service Hadoop-MapReduce-history server start
Fundamentally snapshot means taking a Xerox copy of the content from the entire file-level or subtree of the file system until a certain time and its read-only. Snapshot is handling data corruption of user or application and accidental delete. It is always quicker to recovery from snapshot as compared to restore of the whole FSImage and it is easy to create a snapshot of the important directory before changing anything to it.
Snapshot can be taken on any directory once you can be marked as "snapshot table", to doing the same you have to provide the command as "Hdfs dfsadmin -allowSnapshot <Path>".Once the snapshot table directory has been created than under that, subdirectory has been created as .snapshot, It is the place where snapshots are stored. There is no limit on the number of snapshot table directories, any number of a directory can create and snapshot table directory can contain 65536 snapshots simultaneously. We can change the name of a snapshot or we can use the default one (based on timestamp: "s'yyyyMMdd-HHmmss.SSS"). If there are any snapshots in the snapshot table directory then neither you can delete the directory nor rename the directory. deleting the snapshot table directory you have to delete all the snapshots under that directory. during the upgrading version of HDFS, ".snapshot" need to first be renamed or deleted to avoid conflicting with the reserved path.
Snapshots are easily created with hdfs dfsadmin command, Please find the few commands related to snapshot.
a. # Create directory structure hdfs dfs -mkdir /my_dir_bibhu b. # Allow snapshots creation for /my_dir_bibhu hdfs dfsadmin -allowSnapshot /my_dir_bibhu Allowing snaphot on /my_dir_bibhu succeeded c. # Create the first snapshot hdfs dfs -createSnapshot /my_dir_bibhu snaptest1 Created snapshot /my_dir_bibhu/.snapshot/snaptest1 d. # .snapshot can be read directly using below command hdfs dfs -ls /my_dir_bibhu/.snapshot Found 1 items drwxr-xr-x - bibhu supergroup 0 2016-12-03 09:52 /my_dir/.snapshot/snaptest1 e. # Create new snapshot - this time for directory containing a file hdfs dfs -createSnapshot /my_dir_bibhu snaptest2 Created snapshot /my_dir_bibhu/.snapshot/snaptest2 f. # This command serves to compare snapshots hdfs snapshotDiff /my_dir_bibhu .snapshot/snaptest1 .snapshot/snaptest2 g. # Restore snapshot directory to a temporary place and check if file is there or not hdfs dfs -cp /my_dir_bibhu/.snapshot/snaptest2 /tmp/dir_from_snapshot hdfs dfs -ls /dir_from_snapshot
Usually, YARN is taking all of the available resources on each machine in the cluster into consideration. Based on the available resources, YARN negotiates the resources as requested from the application or map-reduce running in the cluster. YARN is allocating containers based on how much resources are required to the application. A container is the basic unit of processing capacity in YARN, and the resource element included memory CPU, etc. In the Hadoop cluster, it is required to balance the usage of memory(RAM), processors (CPU cores) and disks so that processing is not controlled by any one of these cluster resources. As per the best practice, it allows for two containers per disk and one core gives the best balance for cluster utilization.
When you are considering the appropriate YARN and MapReduce memory configurations for a cluster node, in such a case, it is an ideal situation to consider the below values in each node.
Prior to calculating how much RAM, how much CORE and how much disks are required, you have to be aware of the below parameters.
Basically "mapreduce.task.io.sort.mb" is the total amount of buffer memory which is to use while sorting files. It is representing in megabytes.
Tune or provide the io.sort.mb value in such a way that the number of spilled records equals or is as close to equal the number of map output records.
Map-reduce job makes the assurance that the input to every reducer is sorted by key. The process by which the system performs the sort and then transfers the mapper output to the reducers as inputs are known as shuffle. In the Map-reduce job, shuffle is an area of the code where fine-tuning and improvements are continually being made. In many ways, the shuffle is the heart of the map-reduce job. When the map function starts producing output, the process takes an advantage of buffering and writes in memory and doing some presorting for more efficiency as well.
Each map task has a circular memory buffer that writes the output too. The buffer is 100mb by default, a size which can be tuned by changing the io.sort.mb property when the contents of the buffer reach a certain threshold size. Usually the default threshold size of io.sort.spill is 0.8 or 80% when it reaches the threshold a background thread will start to spill the contents to disk. Mapper output will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time the map will block until the spill is complete. Spills are written in a round-robin fashion to the directories specified by the mapred.local.dir property in a subdirectory.
Each time when the memory buffer reaches the spill threshold at that time a new spill file is created, so after the map task has written its last output record there could be several spill files before the task is finished. The spill files are merged into single partitioned and sorted the output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once. the default value of io.sort.factor is 10.
Just want to brief about how io.sort.factor is working, when the Mapper task is running it continuously writing data into Buffers, to maintain the buffer we have to set up a parameter called io.sort.spill .percent.
The value of io.sort.spill.percent will indicate, after which point the data will be written into disk instead of a buffer which is filling up. All of this spilling to disk is done in a separate thread so that the Map can continue running. There may be multiple spills on the task tracker after the map task finished. Those files have to be merged into one single sorted file per partition which is fetched by a reducer. The property io.sort.factor says how many of those spill files will be merged into one file at a time.
Basically DFS.HOST file contains all the data node details and it allows access to all the nodes mentioned in the DFS.HOST file. This is the default configuration used by the name node. DFS.HOST and DFS.HOST.EXCLUDE will help to re-commission and decommission the data nodes.
Hadoop provides the decommission feature to exclude a set of existing data nodes, the nodes to be taken out, should be included in excluding file and the exclude file name should be specified as a configuration parameter as dfs.hosts.exclude. You can find the example mentioned below.
Examples:
Modify the conf/mapred-site.xml, add: <property> <name>dfs.hosts</name> <value>/opt/hadoop/Bibhu/conf/datanode-allow.list</value> </property> <property> <name>dfs.hosts.exclude</name> <value>/opt/hadoop/Bibhu/conf/datanode-deny.list</value> </property>
Decommission cannot happen immediately because it requires replication of potentially a large number of blocks and we do not want the cluster to be overwhelmed with just this one job. The decommission progress can be monitored on the name-node web UI or Cloudera UI. Till all blocks are replicated, the status of nodes will be in the "Decommission in progress" state. when decommission is done the state will change to "Decommissioned". The node can be removed whenever decommission is finished.
We can use below commands Without creating a dfs.hosts file or making any entries, run the commands hadoop.dfsadminrefreshModes on the Name Node.
# $HADOOP_HOME/bin/hadoop dfsadmin -refresh nodes
-refreshNodes, It will update the name node with a set of data nodes so that data nodes are allowed to connect the Name node.
This, along with other Hadoop basic interview questions, is a regular feature in Hadoop interviews, be ready to tackle it with the approach mentioned above.
Java garbage collection is the process by which Java programs perform automatic memory management. when we are talking about automatic memory management, it is a technique that automatically manages to allocation and deallocation of memory. Java programs compile to bytecode that can be run on a Java Virtual Machine alternatively Byte code is the compiled format of java program, once java program has been converted to byte code afterward it will execute by JVM and transferred across a network. While Java programs are running on the JVM , JVM has consumed memory which is called heap memory to do the same. Heap memory is a part of memory dedicated to the program.
Hadoop mapper is a java process and every java process has its own heap memory. Heap memory maximum allocation settings configured as mapred.map.child.java.opts or mapreduce.map.java.opts in Hadoop2. If the mapper process runs out of heap memory then the mapper throws a java out of memory exceptions as mentioned below.
Error: java.lang.Runtimeexception:Java.lang.OutofMemoryError
The java heap settings or size should be smaller than the Hadoop container memory limit because we need to reserve some memory for java code. Usually, it is recommended to reserve 20% memory for code. So if the settings are correct then Java-based Hadoop tasks will never get killed by Hadoop so you will not see the "Killing container" error like above.
To execute the actual map or reduce task, YARN will run a JVM within the container. the Hadoop property MapReduce.{map|reduc}.java.opts is proposed to pass to this JVM. This could include -Xmx to set the max heap size of the JVM.
Example: hadoop jar<jarName> -Dmapreduce.reduce.memory.mb=4096 -Dmapreduce.map.java.opts=-Xmx3276
Hive works on structured data provide a SQL like a layer on top of HDFS, Map-reduce task will execute for each query of Hive which is trying to do some compute of HDFS data. Impala is a Massive parallel processing SQL query engine that is capable enough to handle a huge volume of data. Impala is faster than Hive because Impala is not storing the intermediate query results on disk, it processes the SQL query in Memory without running any Map-reduce.
Below are the few Hive components
1. Hive Clients:
Hive clients are helping hive to perform the queries. There are three types of clients we can use to perform the queries
2. Hive Services
The compiler will verify the syntax check with the help of schema present in the metastore then optimizer generates the optimized logical plan in the form of Directed Acyclic Graph of Map-reduce and HDFS tasks. The Executor executes the tasks after the compilation and optimization steps. The Executor directly interacts with the Hadoop Job Tracker for scheduling of tasks to be run.
Impala components are 1. Impala daemon(Impalad) 2. Impala State Store 3. Impala Catalog Service.
Whenever query submitted in any impala daemon, the related node is considered " central coordinator node" for that query. After accepting the query, IMPALAD logically divides the query into smaller parallel queries and distribute them to different nodes in the impala cluster. all the Impalad gather all the intermediate result and send it to the central coordinator node, accordingly central coordinator node constructs the final query output.
Example : INVALIDATE METADATA [[db_name.]table_name];
REFRESH [db_name.]table_name];
As we know that most of the Hive tables are containing billions and millions records and for any computation hive query will process with the help of Mapper and Reducer and it will consume more time and memory. Few of the optimization techniques which will always help hive query to perform better . Please find few of the below techniques.
1. Use Tez to Fasten the execution:
Apache TEZ is an execution engine used for faster query execution. Tez will allow you to launch a single Application Master for each session for multiple job, condition is that jobs are comparatively small so that Tez memory can use for those jobs. You need to set up the processing engine as Tez instead of default Map-Reduce execution engine providing below parameter.
Set hive.execution.engine=tez;
If you are using Cloudera/Hortonworks, then you will find TEZ option in the Hive query editor as well.
2. Enable compression in Hive
Basically Compression techniques, It reduce the amount of data size being transferred, so that it reduces the data transfer between mappers and reducers and compression is not suggestible if your data is already compressed because the output file size might be larger than the original.
For better result, you need to perform compression at both mapper and reducer side separately. There are many compression formats are available out of which gzip is taking more CPU resources than Snappy or LZO but it provides higher compression ratio. It is not relevant for splittable table.
Other formats are snappy, lzo, bzip, etc. You can set compression at mapper and reducer side using codes below:
set mapred.compress.map.output = true;
set mapred.output.compress= true;
Users can also set the following properties in hive-site.xml and map-site.xml to get permanent effects.
<property> <name>mapred.compress.map.output</name> <value>true</value> </property> <property> <name>mapred.map.output.compression(for MR)/compress(for Yarn).codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>
3. Use ORC file format
ORC (optimized record columnar) is an appropriate format for hive performance tunin,query performance can improve using ORC file format easily. We can use ORC file format for all kind of table whether it is partitioned or single and in response, you get faster computation and compressed file size.
4. Optimize your joins
If your table is having large data then it is not advisable to just use normal joins which we use in SQL. There are many other joins like Map Join; bucket joins, etc. which will help to improve Hive query performance.
5. Use Map Join
When we are talking about Map join, It is beneficial when one table is as compare to other table which will take part of the Join. so that it can fit into the memory. Hive has a property which can do auto-map join when enabled. Set the below parameter to true to enable auto map join. Set hive.auto.convert.join to true to enable the auto map join. we can set this from the command line as well as from the hive-site.xml file
<property> <name>hive.auto.convert.join</name> <value>true</value> <description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size</description> </property>
6. Bucketed Map Join
If tables are bucketed by a particular column, you can use bucketed map join to improve the hive query performance. You can set the below two property to enable the bucketed map to join in Hive.
<property> <name>hive.optimize.bucketmapjoin</name> <value>true</value> <description>Whether to try bucket mapjoin</description> </property> <property> <name>hive.optimize.bucketmapjoin.sortedmerge</name> <value>true</value> <description>Whether to try sorted bucket merge map join</description> </property>
7. Use Partition
Partition is always helpful for huge data. It is used to segregate the large table based on certain columns so that the whole data can be divided into small chunks. When we are saying partition the table, basically It allows you to store the data under sub-directory inside a table.
Selecting the partition table is always a critical decision, and you need to take care of future data and volume of data as well. For example, if you have data of a particular location then you can partition the table based on state. You can also partition the data in month wise as well. You can define the partition column based on your requirement.
Here is the syntax to create a partition table
CREATE TABLE countrydata_partition (Id int, country name string, population int, description string) PARTITIONED BY (country VARCHAR(64), state VARCHAR(64)) row format delimited fields terminated by ‘\t’ stored AS textfile;
There are two types of partition in Hive.
By default, the partition is static in a hive. In static partition usually we are providing the parameter as " PARTITIONED BY (department String) ". when loading big files into the hive, the static partition is preferred.
Single insert to partition table is known as dynamic partition and it load the data from non partitioned Table. If you don't know how many columns are available in your table in this scenario also dynamic partition is suitable. To use dynamic partition in Hive, you need to set the following property-
8. Use Vectorization
A standard query is executed one row at a time. vectorized query execution, it improves performance of operation like scan, aggregation, filter and joins and it is considering 1024 rows at a time to perform the operation. To use Vectorization you can use the below parameter.
LDAP and Active Directory are providing a centralized security system for managing both servers and users, It is managing for all user accounts and associated privileges for your employee. Kerberos is handled Authentication it means when a user trying to connect any Hadoop services, Kerberos will authenticate the user first then it will authenticate service too. when you are considering AD, LDAP and Kerberos in this scenario Kerberos will only provide authentication, all Identity Management is handled outside of Kerberos that is in AD and LDAP.
In the high level when a new employee joins, his/her id has to be added in Active directory first then LDAP and Kerberos because AD is a directory service, owned by Microsoft and AD supports several standard protocols such as LDAP and Kerberos.
LDAP and AD communicating with each other based on what user ID belongs to which group, for example, user Bibhu is a member of which groups and what kind of access permission he is having in different directories or files. These are the information is managed differently in AD and Linux system. In Windows, we have a concept called SID or Window security identifiers and in Linux, we do have a User ID or Group ID. SSSD can use the SID of an AD user to algorithmically generate POSIX IDs in a process called ID mapping. ID mapping creates a map between SIDs in AD and UID/GID on Linux.
AD can create and store POSIX attributes such as uidNumber, gidNumber, unixHomeDirectory, or login Shell
There are two ways to mapping these SID and UID/GID using SSSD.
ldap_id_mapping = true
ldap_id_mapping = False
Below are few concepts need to know to understand the Integration of AD/LDAP/Kerberos
PAM: PAM stands for pluggable authentication Module, which allows integration of authentication technology such as Unix, Linux, LDAP, etc into system services such as password, login, ssh, etc. alternatively When you're prompted for a password, that's usually PAM's doing. PAM provides an API through which authentication requests are mapped into technology-specific actions. This kind of mapping is done by PAM configuration files. Authentication mechanism is providing for each service.
NSS: NSS uses a common API and a configuration file (/etc/nsswitch.conf) in which the name service providers for every supported database are specified. Here Names include hostnames, usernames, group names such as /etc/passwd, /etc/group, and /etc/hosts.
Below are 3 ways of integrating Linux with AD for Authentication
Let’s understand clearly:
1. Using LDAP/Kerberos PAM and NSS Module:
PAM is configured to use Kerberos for authentication and NSS is to use the LDAP protocol for querying UID or GID information. nss_ldap, pam_ldap, and pam_krb5 modules are available to support.
Here Problem is no caching of the credentials and there is no such offline support available here.
2. Using Winbind:
Samba Winbind was a traditional or usual way of connecting Linux systems to AD. Basically, Winbind copy a Windows client on a Linux system and is able to communicate to AD servers alternatively we have winbind daemon which will receive calls from PAM and NSS, Once it is received it will translate into corresponding Active directory calls using either LDAP, KERBEROS or Remote protocol(RPC) depending on the requirement. The current versions of the System Security Services Daemon (SSSD) closed a feature gap between Samba Winbind and SSSD so Samba Winbind is no longer the first choice in general.
3. Using SSSD that is system services daemon for Integrating with Active Directory:
The System Security Services Daemon (SSSD) is an intermediary between local clients and any Remote Directories and Authentication Mechanism. The local clients connect to SSSD and then SSSD contacts the external providers that are AD, LDAP server. So here SSSD is working as a Bridge which will help you to Access the AD, LDAP.
Basically System authentication is configured locally which means initially services check with a local user store to determine users and credentials. SSSD allows a local service to check with local cache in SSSD so Local cache information might have taken from an LDAP directory or AD or Kerberos Realm.
Below are the few advantages related to SSSD
sssd daemon provides different services for different purposes. We have a configuration file called sssd.conf which determines what tasks sssd can do. The file has 2 main parts as we can see here:
[sssd]
domains = WIN.EXAMPLE.COM
services = nss, pam
config_file_version = 2
[domain/WINDOWS]
id_provider = ad
auth_provider = ad
access_provider = ad
In the first part, we have clearly mentioned that what services on the system must use sssd, here in the above example nss and Pam has mentioned. The second part, domain/WINDOWS defines directory services also called identity provider for example AD, LDAP server. SSSD connecting AD/LDAP for querying the information, authentication, password change, etc.
In brief below are the steps how SSSD is working or brief about the above diagram
Sentry is a role-based authorization to both data and metadata stored on a Hadoop cluster for a user. Prior to know more about Sentry, below are the components based on which sentry is working.
Sentry server only helps you to get the metadata information. The actual authorization decision is made by a Data engine that runs in data processing applications such as Hive or Impala. Each component loads the Sentry plug-in it means for each service like Hive/Hdfs/Impala/solr, each sentry plug-in has to be installed for dealing with the Sentry services and the policy engine to validate the authorization request.
Below are the few capabilities which sentry is having.
1. Fine-Grained Authorization:
It means Permissions on object hierarchies for example Server level, Database level, Table level, view (Row/column level authorization), URI and permissions hierarchies will be Select/insert/All this is called Fine-Grained Authorization.
2. Role-Based Authorization(RBAC):
Sentry is providing role-based authorization where it is supporting a set of privileges and it supports for role templates which combine multiple access rules for a large set of users and data objects(Database, Table, etc).
For example, If Bibhu joins the Finance Department, all you need to do is add him to the finance-department group in Active Directory. This will give Bibhu access to data from the Sales and Customer tables.
You can create a role called Analyst and grant SELECT on tables Customer and Sales to this role.
Now Bibhu who is a member of the finance-department group gets the SELECT privilege to the Customer and Sales tables.
3. Multi Tanent Administration or Delegate Admin responsibilities:
It is having the capability to delegate or assign the admin responsibilities for a subset of resources. Delegate admin responsibility it means Delegated-Admin Privilege is assigned on a specific set of resources for a specific set of users/groups by a person who has already Delegated-Admin privilege on the specific set of resources.
4. User Identity and Group Mapping: Sentry depends on Kerberos or LDAP to identify the user. It also uses the group mapping mechanism configured in Hadoop to ensure that Sentry sees the same group mapping as other components of the Hadoop ecosystem.
For example, considering that users Bibhu and Sibb belong to an Active Directory (AD) group called the finance-department. Sibb also belongs to a group called finance-managers. In Sentry, create the roles first and then grant required privileges to those roles. For example, you can create a role called Analyst and grant SELECT on tables Customer and Sales to this role.
The next step is to join these authentication entities (users and groups) to authorization entities (roles). This can be done by granting the Analyst role to the finance-department group. Now Bibhu and Sibb who are members of the finance-department group get the SELECT privilege to the Customer and Sales tables.
GRANT ROLE Analyst TO GROUP finance-department ;
Below are some scenarios where Hive, Impala, HDFS, and search activities are working with Sentry. Considering a few examples we will try to understand how it works.
1. Hive and Sentry :
Here in the above query Hive will identify that user Bibhu is requesting SELECT access to the Status table. At this point, Hive will ask the Sentry plugin to validate the access request of Bibhu. The plugin will retrieve Bibhu's privileges related to the Status table and the policy engine will determine if the request is valid or not.
2. Impala and Sentry:
Authorization processing in Impala is more or less the same as Hive. The main difference is the caching of privileges. Usually, Impala’s Catalog server is managing caching roles and privileges or metadata, and spread it to all Impala server nodes. As a result, Impala daemon can authorize queries much faster referring to the metadata from the cache memory. The only drawback related to performance is it will take some time for privilege changes to take effect, it might take a few seconds.
3. Sentry-HDFS Synchronization:
When we are talking about Sentry and HDFS authorization, it basically speaks about Hive warehouse data. Warehouse data means whether it is Hive or Impala data related to Table. The main objective is when other components like Pig, MapReduce or Spark trying to access the hive table at that time similar authorization check will occur. At this point, this feature does not replace HDFS ACLs. The tables which are not associated with sentry those retain their old ACLs.
The mapping of Sentry privileges to HDFS ACL permissions is as follows:
When NameNode loads a Sentry plugin that caches Sentry privileges as well as Hive metadata. It helps HDFS to keep file permissions and Hive tables privileges in sync. The Sentry plugin periodically communicates the Sentry and Metastore to keep the metadata changes are in sync.
For example, if Bibhu runs a Pig job, which is reading from the Sales table data files, anyhow data files will be stored in HDFS. Sentry plugin on the Name Node will figure out that data file is part of Hive data and cover Sentry privileges on top of the file ACLs, It means HDFS will get the same privileges for this Pig client that Hive would have applied for a SQL query.
For HDFS-Sentry synchronization to work, for doing the same you must use the Sentry service, not policy file authorization.
4. Search and Sentry:
Sentry can apply restriction on search tasks which are coming from a browser or command line or through the admin console.
With Search, Sentry stores its privilege policies in a policy file (for example, sentry-provider.ini) which is stored in an HDFS location such as hdfs://ha-nn-uri/user/solr/sentry/sentry-provider.ini.
Multiple policy files for multiple databases is not supported by Sentry with Search. However, you must use a separate policy file for each Sentry-enabled service.
5. Disabling Hive CLI:
To execute the hive queries you have to use beeline. when you will disable Hive CLI, Hive CLI is not supported with Sentry and Hive Metastore also be disabled. This is especially necessary if the Hive metastore has sensitive metadata.
To do the same, you have to modify the hadoop.proxyuser.hive.groups in core-site.xml on the Hive Metastore host.
For example, to give the hive user permission to members of the hive and hue groups, set the property to:
<property> <name>hadoop.proxyuser.hive.groups</name> <value>hive,hue</value> </property>
If More user groups that require access to the Hive Metastore can be added to the comma-separated list as needed.
$ hadoop fs -copyToLocal $ hadoop fs -copyFromLocal $ hadoop fs -put
Below are the main tasks of JobTracker:
Following are the three configuration files in Hadoop:
NameNode- It is also known as Master node. It maintains the file system tree and the metadata for all the files and directories present in the system. NameNode is a very highly available server that manages the File System Namespace and controls access to files by clients. It records the metadata of all the files stored in the cluster i.e. location of blocks stored, size of the files, hierarchy,permissions etc .
NameNode is the master daemon that manages and maintains all the DataNodes (slave nodes).
There are two files associated with the metadata:
Checkpoint node- Checkpoint node is the new implementation of Secondary NameNode . It is used to create periodic checkpoints of file system metadata by merging edits file with fsimage file and finally it uploads the new image back to the active NameNode
It is structured in the same directory as the NameNode and stores the latest checkpoint .
Backup Node - Backup Node is an extended checkpoint node that performs checkpointing and also supports online streaming of file system edits.
Its main role is to act as the dynamic Backup for the Filesystem Namespace (Metadata )in the Primary Namenode of the Hadoop Ecosystem.
The Backup node keeps an in-memory, up-to-date copy of the file system namespace which is always synchronized with the active NameNode state.
Backup node does not need to download fsimage and edits files from the active NameNode to create a checkpoint, as it already has an up-to-date state of the namespace in it’s own main memory. So, creating checkpoint in backup node is just saving a copy of file system meta-data (namespace) from main-memory to its local files system.
It is worth mentioning that this is one of the most frequently asked Hadoop interview questions and answers for freshers in recent times.
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -distcp hdfs://namenodeA/apache_hadoop hdfs://namenodeB/Hadoop
Linux file system
*****************
Hadoop Distributed file system
*****************************
High Availability of cluster was introduced in Hadoop 2 to solve the single point of Name node failure problem in Hadoop 1.
The High availability Name node architecture provides an opportunity to have two name nodes as Active name node and Passive/Standby name node. So, both are running Name Nodes at the same time in a High Availability cluster.
Whenever Active Name Node goes down due to crashes of server or graceful failover during the maintenance period at the same time control will go to passive/Standby Name Node automatically and it reduces the cluster downtime. There are two problems in maintaining consistency in the HDFS High Availability cluster:
As discussed above There are two types of failover: A. Graceful Failover: In this case, we manually initiate the failover for routine maintenance. B. Automatic Failover: In this case, the failover is initiated automatically in case of Name Node failure or Name node crashes.
In either case of a Name Node failure, Passive or Stand by Name Node can take control of exclusive lock in Zookeeper and showing as it wants to become the next Active Name Node.
In HDFS High availability cluster, Apache Zookeeper is a service which provides the automatic failover. When the Name Node is active at that time Zookeeper maintains a session with the active Name Node. In any scenario when active Name Node get failed at that time the session will expire and the Zookeeper will inform to Passive or Stand by Name Node to initiate the failover process.
The ZookeeperFailoverController (ZKFC) is a Zookeeper client that also monitors and manages the Name Node status. Each of the Name Nodes runs a ZKFC also. ZKFC is responsible for monitoring the health of the Name Nodes periodically.
When zookeeper is installed in your cluster you should make sure that below are the process, or daemons running in Active Name Node, Standby Name Node and Data node.
When you do JPS (Java Virtual Machine Process Status Tool ) in Active NameNode you should get below Daemons:
When you do JPS (Java Virtual Machine Process Status Tool ) in Standby NameNode you should get below Daemons:
When you do JPS (Java Virtual Machine Process Status Tool ) in DataNode you should get below Daemons:
The complexity of the answer makes this a must-know for anyone looking for top Hadoop interview questions. Not to mention this is one of the most frequently asked Hadoop interview questions.
It is a facility provided by Hadoop map-reduce framework to access small file needed by an application during its execution. These files are small as it is in KB's and MB's in size. The type of files are mainly text, archive or jar files. These files are small that is why it will keep in the cache memory which is one of the fast memories. Applications which need to use distributed cache to distribute a file should make sure that the file is available and can be accessed via URLs. URLs can either be hdfs:// or http://
Once the file is present on the mentioned URL, the Map-Reduce framework will copy the necessary files on all the nodes before initiation of the tasks on those nodes. In case the files provided are archives, these will be automatically unarchived on the nodes after transfer.
Example: In a Hadoop cluster, we have three data nodes there are 30 tasks we run in the cluster. So each node will get 10 tasks each. Our nature of the task is such kind of task where it needs some information or a particular jar to be adopted before its execution. To fulfil this, we can cache these files which contain the info or jar files. Before execution of the job, the cache files will copy to each slave node application master. Application master than reads the files and start the tasks. The task can be mapper or reducer and these are read-only files. By default Hadoop, the distributed cache is 10GB if you want to change the same you have to modify the size in mapred-site.xml. Here it is coming to our mind that why cache memory is required to perform the tasks. why can’t we keep the file in HDFS on each data node already present and have the application read it? they are a total of 30 tasks and in real time it should be more than 100 or 1000 tasks. If we put the files in HDFS than to perform 30 tasks the application has to access the HDFS location 30 times and then read it but HDFS is not very efficient to access small files for this many times. this is the reason why we are using cache memory and it reduces the number of reads from HDFS locations.
In Hadoop cluster when we are talking about Data node, Data node is where the actual data we are keeping. Data nodes are sending a heartbeat message to the name node in every 3 seconds to confirm that they are active. If the Name Node does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead. Then Name Node initiates the replication of Dead data node blocks to some other data nodes which are active. Data nodes can talk to each other to rebalance the data, move and copy the data around and keep the replication active in the cluster. You can get the block report using below HDFS commands.
Example:
hadoop fsck / ==> Filesystem check on HDFS
# hadoop fsck /hadoop/container/pbibhu
FSCK ended at Thu Oct 20 20:49:59 CET 2011 in 7516 milliseconds
The filesystem under path '/hadoop/container/pbibhu 'is HEALTHY
Name node is the node which stores the file system metadata when we are talking about metadata, it is having information like List of file names, Owner, Permissions, Timestamps, Size, Replication Factor, List of Blocks for each file etc. Metadata, which files maps to what block location and which blocks are stored in which data node. When data nodes are storing a block of information, it maintains a checksum for each block as well. when any data has been written to HDFS, checksum value has been written simultaneously and when it reads by default verifies the same checksum value.
The data nodes update the name node with the block information periodically and before updating verify the value of the checksum. when the checksum value is not correct for a particular block then we will consider as disk level corruption for that particular block , it skips that block information while reporting to the name node, in this way name node will get to know the disk level corruption on that data node and takes necessary steps like it can be replicated from its alternate locations to other active data nodes to bring the replication factor back to the normal level. Data nodes can be listed in DFS.HOSTS file, It contains a list of hosts that are permitted to connect to the Name Node.
Example:
Add this property to hdfs-site.xml: <property> <name>dfs.hosts</name> <value>/home/hadoop/includes</value> </property> includes: host name1 hostname2 hostname3
If include file is empty then all hosts are permitted but it is not a definitive list of active data nodes. Name node will consider those data nodes from which Name Node will receive the heart beats.
ANS:
LVM stands for Logical Volume Management. It is a system of managing logical volumes or filesystems, that is much more advanced and flexible than the traditional method of partitioning a disk into one or more segments and formatting that partition with a filesystem. Today the disks are huge (> 1TB) and LVM is the right tools to dynamically allocate and resize partitions of these huge disks.
If you are using Linux to deploy Hadoop nodes, master or slaves, it is strongly recommended that you should not use LVM in Linux because of below points
HDFS data might not always be distributed uniformly across DataNodes for different reasons like if some DataNodes have less disk space available for use by HDFS or During the normal usage/ when usage is more, the disk utilization on the DataNode machines may become uneven or when a new Data Nodes are added to an existing cluster at that time also data nodes utilizations are uneven. to mitigate this problem balancer is required.
A balancer is a tool that balances disk space usage on an HDFS cluster and it analyzes block placement and balances data across the DataNodes. The balancer moves blocks until the cluster is deemed to be balanced, which means that the utilization of every DataNode more or less equally distributed. The balancer does not balance between individual volumes on a single DataNode.
HDFS balancer [-policy <policy>]
The two supported policies are Blackpool and data node. Setting the policy to Blackpool means that the cluster is balanced if each pool in each node is balanced while the data node means that a cluster is balanced if each DataNode is balanced. The default policy is the data node.
HDFS balancer [-threshold <threshold>] specifies a number in [1.0, 100.0] representing the acceptable threshold of the percentage of storage capacity so that storage utilization outside the average +/- the threshold is considered as over/underutilized. The default threshold is 10.0.
When we are talking about Rack, It is the collection of multiple servers based on your requirement. All these servers are connected using the same network switch and if that network goes down then all machines in that rack will be out of service and we can say rack is downstate.
To mitigate the same, Rack Awareness was introduced for Hadoop by Apache. In Rack Awareness, Name Node chooses the Data Node which is closer to the rack where the Name Node will be available or nearby that rack. Name Node maintains all the Rack ids of each Data Node to get the rack information and based on Rack ID Name Node can communicate with Data Node. In Hadoop, when we are maintaining a Rack we have to follow certain rules as mentioned below.
Below are some points due to which we are following Rack Awareness in Hadoop. Please find the details as mentioned below:
There are two types of tables which HIVE supports.
Hive Managed Tables:
Hive Managed Table is also known as an internal table. When we will create a table in Hive, by default Managed table will create and it manages the data as well. It means that Hive is storing the data into its warehouse directory. A managed table is stored under the hive.metastore.warehouse.dir path property and default location of table will be /apps/hive/warehouse/<db_name>.db/<table_name>. This path will be modifiable. If a managed table or partition is dropped, then the data and corresponding metadata of the table or partition are deleted. If you do not specify the PURGE option then the data is moved to a trash folder for a certain period, it will be deleted permanently after that.
Example:
1. Create Table
hive> create table univercity_db.school_table(name string, roll no int) row format delimited fields terminated by ',';
OK
Time taken: 0.202 seconds
2. Describe table
hive> describe formatted univercity_db.school_table;
OK
you will get extra information like whether the table is managed or an external table. when the table is created, what kind of file format, Location of the data path in HDFS, whether the object is a table or view.
3. Load the data to table from the local path
hive>load data local inpath '/home/pbibhu/Desktop/blog/school' into table univercity_db.school_table;
After loading from the local path you can further use hive commands to select/count/describe etc
Hive External Tables:
while creating an External table the location of the data path is not the usual warehouse path, you have to provide the HDFS path outside of the warehouse directory. While Creating an external table location is mandatory in the create syntax. By any chance structure or partitioning of an external table is changed then an MSCK REPAIR TABLE table_name statement can be used to refresh metadata information. Basically, In External Table we cannot load the table from a local path. you have to load data from HDFS mentioning the path.
Use external tables when files are present in the remote locations, and the files should remain even if the external table is dropped.
Example:
1. Create Table
CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE/ORC LOCATION 'hdfs/pbibhu/school';
2. Create partition Table
CREATE EXTERNAL TABLE IF NOT EXISTS univercity_db.school_table( student_ID INT, FirstName STRING, LastName STRING) partitioned by (student_ID int) STORED AS ORC LOCATION 'hdfs/pbibhu/school';
3. insert the data to internal table from external table,data structure should be same for both the tables.
hive> CREATE TABLE IF NOT EXISTS office(EmployeeID INT,FirstName STRING, Title STRING, State STRING, Laptop STRING) STORED AS ORC;
OK
hive> CREATE EXTERNAL TABLE IF NOT EXISTS Office_text( EmployeeID INT,FirstName STRING, Title STRING, State STRING, Laptop STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/user/pbibhu/office';
OK
hive> INSERT OVERWRITE TABLE office SELECT * FROM office_text;
Resource allocation within the queues is controlled separately. Within a queue:
FairScheduler can apply any of FIFO policy, FairPolicy or DominantResouceFairnessPolicy.
CapacityScheduler can apply either FifoPolicy and fair policy.
Fair Scheduler can use different scheduling policies. The default scheduling policy is fair sharing, using memory as a resource. There’s also a FIFO policy first in first out which is not much use. It’s quite common to use the third type of scheduling policy, DRF, which allocates both memory and CPU resources to applications,DRF is similar to fair-scheduling, but it is important to keep in mind that it applies primarily to the allocation of resources among queues, an activity which is already dominated by queue weights. Thus, the most important thing about DRF is that considers multiple resources, rather than that it attempts to provide equal resource allocation.
Initially, TCS and Wipro each have some resources allocated to jobs in their respective queues and only 10 GB remains in the cluster. Each queue is requesting to run a map task requiring 20 GB, so memory is available 30 GB and the rest of the required resource will take from CPU. WIPRO currently holds 15 GB resources. Another 10 GB is required for mapper task so the fair scheduler will award a container the requested 10 GB of memory to WIPRO. Now the available memory is 5 GB for TCS and it will require another 20 GB to run the mapper task. In this case, there is no memory available for TCS and DRF will try to use 5GB from memory and rest 20 GB can be used from the CPU.
A staple in HDFS interview questions, be prepared to answer this using your hands-on experience. This is also one of the top interview questions to ask a Hadoop engineer.
Basically, we store files under some folders in HDFS, most of the time the folder that we give will be based on Application Name. When we talk about small files it should be lesser than the block size, for example, if the block size is 64mb or 128mb then smaller files are considered as lesser than the block size. If the files are smaller than the block size then we will face a problem at the HDFS level as well as Map-Reduce Level.
In HDFS when we are storing files/Directories, corresponding metadata will be stored in the Name Node, each file, directory, block metadata information will approximately occupy 150 bytes. Suppose if you have 1 million files and each are using approximately a block size or lesser then the block size then metadata size of the corresponding files/directories are approximately 300MB of memory, In such case lot of memory is occupied in the name node and after some time threshold will be reached and further it will be a problem with the current hardware. Certainly, performance will be a downgrade.
During the execution of Map-reduce, when the file size is less than or equivalent to the block size, for each block size or equivalent split size one mapper will launch so approximately large number of Mapper will launch for a large number of small files in this case processing time will be more for each file having small chunk of data .when we are reading and writing a large number of small files seek time will be more which will impact performance and seeks are generally expensive operation . Since Hadoop is designed in such a way to run over your entire dataset, it is best to minimize seeks by using large files.
Remediation plan:
We can merge all the small files using HDFS getmerge command into a big file. getmerge command can copy all the files available in the HDFS folder to a single concatenated file in the local system. after concatenated in the local system you can place the same file from local to HDFS using HDFS PUT command. Please find the example mentioned below.
hadoop fs -getmerge /hdfs_path/pbibhu/school_info_* /local_path/pbibhu/school_inf.txt hadoop fs -put school_inf.txt /hdfs_path/pbibhu/school_inf.txt
Below are the file formats which support Hadoop.
Usually, text format was very common prior to Hadoop and even it is very common in a Hadoop environment as well. Data are presented as lines and each line terminated by a newline character as /n or Tab separated as /t.
CSV stands for comma-separated-values, so data fields are separated or delimited by comma. For example, we have below value in excel sheet
Name | class | section | subject |
---|---|---|---|
Bibhu | 7 | A | English |
The above data will be present in a CSV formatted file as follows.
Bibhu,7, A, English
JSON stands for Javascript object Notion. It is a readable format for structuring data, basically, it is used to transfer the data from server to web Application. We can use it as an alternative to XML. In JSON data are presenting as key and value pairs. The key is always a string data type which is enclosed with a quotation mark. Value can be a String, Number, Boolean, Array or object.
the basic syntax is Key followed by a colon followed by a value.
Example: "Name" : "Bibhu"
AVRO stores the data in JSON format which is easy to read and understand. The Data itself stored in Binary format which is making it compressed and Efficient, Each value is stored without having any metadata other than a small schema identifier having a size of 1 to 4 bytes. it is having the capability to split the large data set into subsets which are very much suitable for Map Reduce processing.
In Hive following command is used to use AVRO.
Create table avro_school
(column_address)
stored as avro;
RC stands for Record Columnar which is one type of Binary file format, it will provide high compression on top of rows or on multiple rows at a time for which we want to do some operation.RC Files consisting of Binary Key/Value pairs. RC File format first partitions the rows horizontally into Row split and after that all the row split presented vertically in a columnar way. please find the example as mentioned below:
Step 1
First, partition the rows horizontally into Row split
501 | 502 | 503 | 504 |
505 | 506 | 507 | 508 |
509 | 510 | 511 | 512 |
513 | 514 | 515 | 516 |
Step 2
All the row split presented vertically in a columnar way
501 | 502 | 503 | 504 |
505 | 506 | 507 | 508 |
509 | 510 | 511 | 512 |
513 | 514 | 515 | 516 |
RC file combines Multiple functions such as data storage formatting, data compression, and data access optimization. It is able to meet all the four below requirements of data storage.
The ORC File provides a more efficient way to store the Relational Data than then RC file. It is basically reducing the data storage format by up to 75% of the original. as compared to the RC file ORC file takes less time to access the data and takes less space to store the data as well, It internally divides the data again with a default size of 250M.
In Hive following command is used to use the ORC file.
CREATE TABLE ...STORED AAS ORC
ALTER TABLE ... SET FILEFORMAT ORC
SET hive.default.fileformat=ORC
It's another column-oriented storage like RC format and ORC format but it's very good at handling nested data as well as good at query scan for a particular column in a table. In the Parquet New column can be added at the end of the structure. It is handling the compression using Snappy, ggip currently snappy is a default. The parquet is supported by Cloudera and optimized for Cloudera Impala.
Hive Parquet File Format Example:
Create table parquet_school_table
(column_specs)
stored as parquet;
Both the scheduler cannot be used in the same cluster. Both the scheduling algorithms have come up due to specific use-cases and cluster-wise you have to set up the configuration file for either Fair scheduler or Capacity Scheduler. you cannot set up both the scheduler for one cluster.
you can choose the Fair Scheduler using below scheduler class in yarn-site.xml as mentioned below:
<property> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value> </property>
To use the Capacity Scheduler you have to configure the Resource Manager in the conf/yarn-site.xml as mentioned below:
yarn.resourcemanager.scheduler.class- org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler while setting up the queues in Capacity Scheduler you need to make some changes in etc/hadoop/capacity-scheduler.xml configuration file.The Capacity Scheduler has a predefined queue called root.
whatever queues we will create in the system are children of the root queue.Setting up further queues- Configure property yarn.scheduler.capacity.root.queues with a list of comma-separated child queues.Setting up sub-queues within a queue- configure property yarn.scheduler.capacity.<queue-path>.queues queue-path can mention the full path of the queue’s hierarchy and it is starting at root with. (dot) as the delimiter.
Queue capacity is provided in percentage (%). The sum of capacities for all queues, at each queue level, must be equal to 100. If there are free resources in the queue then Applications in the queue may consume the required resources.
Capacity scheduler queue configuration example:
If there are two child queues starting from root XYZ and ABC. XYZ further divides the queue into technology and development. XYZ is given 60% of the cluster capacity and ABC is given 40% in this scenario please find the details as mentioned below to set up your yarn-site.xml.
<property> <name>yarn.scheduler.capacity.root.queues</name> <value>XYZ, ABC</value> </property> <property> <name>yarn.scheduler.capacity.root.XYZ.queues</name> <value>technology,marketing</value> </property> <property> <name>yarn.scheduler.capacity.root.XYZ.capacity</name> <value>60</value> </property> <property> <name>yarn.scheduler.capacity.root.ABC.capacity</name> <value>40</value> </property>
Basically It is massaging system which is exchanging the large volume of Streaming/log data in between processes, Application and servers. Distributed messaging is based on the queue which can handle a high volume of data and allow you to pass the messages from one end to another. Kafka is appropriate for both offline and online message consumption.
Prior to talk about Kafka further, we need to know about the components belongs to Kafka and below are the details.
Kafka Broker: Kafka cluster consists of one or more server that is called kafka broker in which kafka is running. Producers are nothing but processes that distribute data into Kafka topics within the brokers, then consumer of topics drag the messages off from the Kafka topics.
few basic points related to Kafka Broker:
Kafka Topics: A Topic is nothing but category or feed name to which messages are stored and distributed. All kafka massages are prepared into topics. so whenever you want to send a message you can send it to specific Topic and whenever you want to read the messages you can read it from a specific topic.
Kafka Topic Partition: Kafka topics are divided into a number of partitions and it contains the messages in a sequence, sequence is only applicable within a partition. Each massage in partition is recognized by its offset value. Here offset is represented as an incremental ID which is maintained by Zookeeper. The offsets are meaningful for that partition, It does not have any value across the partition. A topic may contain any number of partitions. Basically there is no such rule and regulation for write the available messages to which partition. However, there is an option available to adding a key to a massage. If a producer distributes the messages with a Key then all the messages with the same key will go to the same partition.
Kafka producers: Basically producers are writing data to a topic, while writing data, producers need to specify the Topic name and one broker name to connect to. Kafka is having own mechanism to send the data to the right partition of the right broker automatically.
Producers having the Mechanism where producer can receive an acknowledgment of data it writes. Below is the acknowledgment which the producer receives.
Kafka Consumer: Basically consumer reads data from topics. As we know Topics are divided into multiple partitions so consumer reads data from each partition of topic. Consumers need to mention the topic name as well as broker. Consumer read data from a partition in sequence. when consumer connects a broker Kafka will make sure that it connected to an entire cluster.
Kafka Consumer Group: Consumer group consists of multiple consumer process. One consumer group having one unique group Id. One consumer instance in one consumer group will read data from one partition. If the number of consumers exceeds the number of partition then in this case extra number of consumers will be inactive. For example, there are 6 partitions in total and there are 8 consumers in a single consumer group. In this case, there will be 2 inactive consumers.
Here in Kafka two types of massaging patterns are available such as:
1. Point to point messaging system:
In point to point messaging system, Massages are keeping on the queue. One or more consumers read the message in the queue but a particular message can be read by one consumer at a time.
Basically Point-to-point messaging is used when a single message will be received by only one message consumer. There may be multiple consumers reading on the queue for the same message but only one of the consumers will receive it. There can be multiple producers as well. They will be sending messages to the queue but it will be received by only one receiver.
2. Publish subscribe messaging system:
Here in Publish subscribe messaging system, message producers are called publishers and message consumers are called subscribers. Here in this scenario Topic can have multiple receivers and each and every receiver receives a copy of each message.
Based on the above picture, below are a few points that explain the publish-subscribe messaging system.
Massages are shared through channel and it is called as Topic. Topics are placed in a centralized place where the producer can distribute and a consumer can read the messages.
Each message is delivered to one or more than one consumer and it is called subscribers. The publisher or producer is not aware of which massage or topic is receiving by which consumer or subscriber. A single message created by one publisher may be copied and distributed to hundreds or thousands of subscribers.
Role of Zookeeper in Kafka:
Zookeeper is a mandatory component in Kafka ecosystem, It helps in managing kafka brokers and helps in leader election of partitions. It helps in maintaining the cluster membership. For example, when a new broker is added or a broker is removed and a new topic is added or a topic is deleted, when a broker goes down or comes up etc, Zookeeper manages such situations informing Kafka. It also handle the topic configurations like number of partitions a topic has and the leader of the partitions for a topic.
The sequence of starting the Kafka services:
This is default zookeeper configuration file available in Kafka, for which below are the properties
dataDir=/tmp/zookeeper Client Port= 2183 [root@xxxx]# /bin/zookeeper-server-start.sh /config/zookeeper.properties
You can start the Kafka broker with the default configuration file. Below are the configuration properties
broker.id=0 log.dir=/tmp/Kafka-logs zookeeper.connect=localhost:2183
Here one broker whose ID is 0 and its connecting the zookeeper using port as 2183.
[root@xxxx]# /bin/kafka-server-start.sh /config/server.properties
Below is the example to create a topic with a single partition and replica
[root@xxxx]#/bin/Kafka-create-topic.sh -zookeeper localhost:2183 -replica 1 -partition 1 -topic examtopic
Here in the above example, we created a topic as an examtopic.
[root@xxxx]#/bin/Kafka-console-producer.sh -broker-list localhost:9090 -topic exam topic
broker-list ==> this is the server and port information for the brokers, here in the above example we have provided server as localhost and port as 9090
Topic==> Name of the Topic here in the above example we have provided as examtopic
in command line we created the producer client that accepts your massages and distributes it to a cluster as massages then a consumer can consume or read the messages.
Producer client is running you can type something on the terminal where the producer is running
Hi Bibhu, How are you?
[root@xxxx]#/bin/Kafka-console-consumer.sh -zookeeper localhost:2183 -topic examtopic -from-beginning
Consumer runs with the default configuration properties as mentioned below, this information will be there in the consumer. Properties file.
Basically, SQOOP can use to get the Data from Relational database that is DB2, MYSQL, Oracle, etc and load into Hadoop that is HDFS, Hive, Hbase, etc or vice versa this process is called ETL for Extract, Transform and Load. Alternatively, SQOOP can import and export data from the Relational database to Hadoop.
Below are some of the important features which are Sqoop having:
Sqoop creating SQL query for each mapper internally which is ingesting data from a source table to HDFS, basically, 4 mappers will be generated by default but you can modify the number of mappers based on your logic and requirements. The number of mapper influence the split by column. split by column work based on where condition and each mapper have a logical partition of the Target table or directory. For example, if we used three mappers and a split-by column. suppose 1,000,000 records are there. Sqoop can segregate using min and max call to the DB on the split-by column. Sqoop's first mapper would try to get values from 0 to 333333 records, the second mapper would pull from 333334 to 666666 records and the last would grab from 666667 to 1000000 records.
Scoop is running a Map-only job, as we know the Reduce phase is required in case of aggregations. But here in Apache Sqoop we just import and export the data. It does not perform any aggregations. Map job launch multiple mappers depending on the number defined by the user in the above example we are considering as 3. For Sqoop import, each mapper task will be assigned with a part of the data to be imported. Sqoop distributes the input data among the mappers equally to get high performance. Then each mapper creates a connection with the database using JDBC and fetches the part of data assigned by Sqoop and writes it into HDFS or Hive or HBase based on the arguments provided in the CLI so alternatively Mappers drop the data in the Target-dir with a file named as part-m-00000, part-m-00001, part-m-00002.
Here in this scenario, we will discuss how sqoop will import data from the Relational database to Hive. Sqoop can only import the data as a text file or sequence file into a hive database. If you want to use the ORC file format then you must follow a two-stage approach, in the first stage sqoop can get the data into HDFS as a text file format or sequence file format, then in the second stage hive can convert the data into ORC file format.
Please find the steps as mentioned below.
Example:
sqoop import --connect jdbc:mysql://db.bib.com/sales --table EMPLOYEES --username <username> --password-file ${user.home}/.password
Example:
sqoop import --connect jdbc:mysql://db.bib.com/sales --table EMPLOYEES --columns "employee_id,first_name,last_name,job_title" --where "start_date > '2010-01-01'"
--num-mappers 8
Example:
sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' --split-by a.id --target-dir /user/bib/sales
1. HDFS as target directory
sqoop import --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' --split-by a.id --target-dir /user/bib/sales
2. Hive as Target table
sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --hive-import
After jobs submissions, Job IDs are generated by job tracker in Hadoop 1 and in Hadoop 2/3 Application IDs are generated. Application ID or Job ID is represented as a globally unique identifier for an Application or Job.
Example: job_1410450250506_002 / application_1410450250506_002
_1410450250506 ==> this is the start time of Resource manager which is achieved by using "cluster timestamp"
_002 ==> Basically counter is used to keep track of occurrences of the job
Task IDs are formed by replacing the job or Application with task prefix within the job
Example: task_1410450250506_0002_m_000002
Here in the above example, _000002 is the third map task of the job "job_1410450250506_002"
Tasks may be executed more than once due to task failure so to identify different instances of task execution, Task attempts are given unique IDs.
Example: attempt_1410450250506_0002_m_0000002_0
_0 is the first attempt of the task task_1410450250506_002_m_0000002
When you will open the Job history WEB UI, you will get the image below. Here in the image, you can able to see the Job state where the Job is succeeded or Failed. How many Mappers and Reducers are launched whether all the Mappers and Reducers are completed or not you can find all these details.
JOB HISTORY Server:
When you click the Job id from the Job history server, you will get below image and more or less similar information you will get as above.
Overview:
Hadoop Counters:
This is the most useful option to examine job performance. Hadoop provides several built-in counters as well as you can customize counters as per your requirements. Counters help you to get the below kind of information.
Hadoop counters provide three types of Built-in counters such as :
In addition to this Hadoop provides another 3 counters from other groups by default, such as:
File system counters:
Under File system counter You can get the information regarding reading and write operations in both the local file system and HDFS as well. The total number of bytes read and written depending upon compression algorithms. Here are the few key counters.
File_Bytes_Read: The total number of bytes read from the local file system by the map-reduce Tasks. File_Bytes_Write: Total number of bytes written to the local file system. During the Map phase, the mapper task writes the intermediate results to the local file system and during the shuffle phase of the Reducer task also write to the local file system when they spill intermediate results to the local file system during sorting.
JOB Counters:
You will get Job information related to Mapper and reducer under JOB Counters. The following are the key job counters.
MapReduce Framework counters:
You will get all the statistic of MapReduce job under MapReduce framework counter. It will help you to do the performance tuning of the job.
Other counters are as follows:
Map-reduce jobs are limited by the bandwidth available on the cluster, hence it is beneficial if the data transferred between map and reduce tasks can be minimized. This can be achieved using Hadoop Combiner. A combiner runs on a map output and its output forms the input to the reducer. It decreases the amount of data that needs to be transferred between the mapper and reducer, as well as improves the performance of a map-reduce job. A combiner can, however, be used for functions that are commutative or associative.
Partitioner controls which partition a given key-value pair will go to. Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer. The total number of practitioners that run in a Hadoop job is equal to the number of reducers.
The partition phase takes place after the map phase and the reduce phase. A map-reduce job having both partitioner and reducer work like below: Output from each mapper is written to a memory buffer and spilled to a local directory in case of overflow. The spilled data is partitioned according to the partitioner. Data in each partition is sorted and combined based on the logic in the combiner. The combined data is sent to reducer based on the partition key.
A job consists of the following components: The client which submits map-reduce job, Resource manager which coordinates allocation of compute resources, Node managers which launch and monitor the compute containers, Hadoop Distributed File System (HDFS) which is used for sharing resources between the above components and Application Master which coordinates tasks running in map-reduce job.
The map-reduce job begins when the client/job submitter sends the request to the Resource Manager. It asks for a new application id to be allocated. It also checks whether the output directory specified exists or not, and computes input splits for the job as well. The resources needed to run the job including the application jar are copied to HDFS. Finally, the job submitter submits the job to Resource Manager.
The Resource Manager now allocates a container and launches the application master. The application master determines no of the mapper and reducer tasks that need to be launched and requests resource manager to launch containers for the same. Resource Manager, in turn, directs Node Managers to launch the containers where the tasks get run. Once the tasks are initiated, the application master keeps track of them. In case any task fails or gets stuck it relaunches them on another container. Requests for map tasks are made first and with a higher priority than those for reduce tasks, since all the map tasks must complete before the sort phase of the reduce can start. Once the mapper task completes, its output undergoes sorting, shuffling and partitioning (in case of multiple reducers), is sent to the combiner (if any) and finally sent to reducer(s). The output of reducer is written to HDFS.
The usual block size on HDFS is 128 MB. The size of the HDFS block is kept large enough to minimize the seek cost. When the block size is large enough the time to transfer data will be significantly longer than the time to seek the start of a block. As data transfer is much higher than the disk seek rate it is optimal to keep the block size large. The seek time is usually kept as 1% of transfer time. e.g. If seek time around 10 ms and the data transfer rate is 100MB/s then block size comes to around 128 MB.
However, this doesn’t mean that the block size can be made indefinitely large. Map tasks operate on one block (assuming split size is equal to block size) at a time. Having a huge block size will result in fewer splits and hence less number of mappers which will reduce the advantage that can be gained by parallelly working on multiple blocks.
Having a block abstraction for a distributed file system has many benefits.
High availability in HDFS implies that the system does not have any single point of failure, is available 24/7 so that there is no or limited impact on client applications and is able to self-recover from failure without any manual intervention.
For implementing High Availability in HDFS, a pair of NameNodes is set up in an active-standby configuration. The passive node is kept in sync with the active node. Both active and passive nodes have access to shared storage space. When any namespace modification is performed by the Active node, it logs a record of the modification to an edit log file stored in the shared directory. The Standby node is constantly watching this directory for edits, and as it sees the edits, it applies them to its own namespace thereby keeping in sync with Active node.
In case of a failure of active NameNode, the standby node takes over and starts servicing client requests. The transition from active to standby node is managed by Failover Controller. It uses Zookeeper to ensure that only NameNode is active at a given time. Each NameNode runs a failover controller process that monitors its NameNode for failures using a heartbeat mechanism and triggers a failover in case of failure.
However, it needs to be ensured that only NameNode is active at a given time. Two active NameNodes at the same time will cause the corruption of data. To avoid such a scenario fencing is done which ensures that only NameNode is active at a given time. The Journal Nodes perform fencing by allowing one NameNode to be writer at a time. The Standby NameNode takes over the responsibility of writing to the JournalNodes and forbid any other NameNode to remain active.
In the case of large data, it’s advised to use more than one reducer. In the case of multiple reducers, the thread spilling map output to disk first divides the data into partitions corresponding to the number of reducers. Within each partition, an in-memory sort on the data is performed. A combiner, if any, is applied to the output of the sort. Finally, the data is sent to reducer based on the partitioning key.
Partitioning ensures that all the values for each key are grouped together and the values having the same key go to the same reducer, thus allowing for even distribution of the map output over the reducer. The Default partitioner in a map-reduce job is Hash Partitioner which computes a hash value for the key and assigns the partition-based its result.
However, care must be taken to ensure that partitioning logic is optimal and data gets sent evenly to the reducers. In the case of a sub-optimal design, some reducers will have more work to do than others, as a result, the entire job will wait for that one reducer to finish its extra load share.
The replication factor in HDFS can be modified /overwritten in 2 ways-
$hadoop fs –setrep –w 2 /my/sample.xml
sample.xml is the filename whose replication factor will be set to 2
$hadoop fs –setrep –w 6 /my/sample_dir
sample_dir is the name of the directory and all the files in this directory will have a replication factor set to 6.
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -touchz /hadoop/sample ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -ls /hadoop
Found 2 items
-rw-r--r-- 2 ubuntu supergroup | 0 2018-11-08 00:57 /hadoop/sample |
-rw-r--r-- 2 ubuntu supergroup | 16 2018-11-08 00:45 /hadoop/test |
fsck a utility to check health of the file system, to find missing files, over-replicated, under-replicated and corrupted blocks.
Command for finding the blocks for a file:
$ hadoop fsck -files -blocks –racks
Hadoop distributed file system (HDFS) is the primary storage system of Hadoop. HDFS stores very large files running on a cluster of commodity hardware. It works on the principle of storage of less number of large files rather than the huge number of small files.
HDFS stores data reliably even in the case of hardware failure. It provides high throughput access to the application by accessing in parallel. Components of HDFS:
Update the network addresses in the dfs.include and mapred.include
$ hadoop dfsadmin -refreshNodes and hadoop mradmin -refreshNodes Update the slave file.
Start the DataNode and NodeManager on the added Node.
One should note that this is one of the most frequently asked Hadoop interview questions for freshers in recent times.
It dsplays the Access Control Lists (ACLs) of files and directories. If a directory has a default ACL, then getfacl also displays the default ACL.
ubuntu@ubuntu-VirtualBox:~$ hdfs dfs -getfacl /hadoop
file: /hadoop
owner: ubuntu
group: supergroup
This exception means there is no communication between the DataNode and the DataNode due to any of the below reasons :
Expect to come across this, one of the most important Hadoop interview questions for experienced professionals in data management, in your next interviews.
You can provide dfs.block.size on command line :
hadoop fs -D dfs.block.size=<blocksizeinbytes> -cp /source /destination
hadoop fs -D dfs.block.size=<blocksizeinbytes> -put /source /destination
Below command is used to enter Safe Mode manually –
$ Hdfs dfsadmin -safe mode enter
Once the safe mode is entered manually, it should be removed manually.
Below command is used to leave Safe Mode manually –
$ hdfs dfsadmin -safe mode leave
The two popular utilities or commands to find HDFS space consumed are
HDFS provides reliable storage by copying data to multiple nodes. The number of copies it creates is usually referred to as the replication factor which is greater than one.
Hadoop task log files are stored on the local disk of the slave node running in the disk. In general, log related configuration properties are yarn.nodemanager.log-dirs and yarn.log-aggregation-enable. yarn.nodemanager.log-dirs property determines where the container logs are stored on the node when the containers are running. its default value is ${yarn.log.dir}/userlogs. An application localized log directory will be found in /{yarn.nodemanager.log-dirs}/application_${application_id}.individual containers log directories will be shown in subdirectories named container_{$conatinerid}.
For MapReduce application, each container directory will contain the files STDERR, STDOUT and SYSLOG generated by the container.
The yarn.log-aggregation-enable property specifies whether to enable or disable log aggregation. If this function is disabled, then the node manager will keep the logs locally and not aggregate them.
Following properties are in force when log aggregation is enabled.
yarn.nodemanager.remote-app-log-dir: This location is found on the default file system (usually HDFS) and indicates where the node manager should aggregate logs. It should not be the local file system otherwise serving daemon such as the history server will not be able to serve the aggregated logs.the default value is /tmp/logs.
yarn.nodemanager.remote-app-log-dir-suffix: the remote log directory will be created at {yarn.nodemanager.remote-app-log-dir}/${user}/{suffix}. the default suffix value is "logs".
yarn.log-aggregation.retain.seconds: This property defines how long to wait before deleting aggregated logs; -1 or any other negative value disables the deletion of aggregated logs.
yarn.log-aggregation.retain-check-interval-seconds: This property determines how long tom wait between aggregated log retention checks.if its value is set to 0 or a negative value then the value is computed as one-tenth of the aggregated log retention time. The default value is -1.
yarn.log.server.url: once an application is done, Nodemanagers redirect the web UI users to this URL, where aggregated logs are served, it points to MapReduce-Specific job history.
The following properties are used when log aggregation is disabled:
yarn.nodemanager.log.retain-seconds: The time in seconds to retain user logs on the individual nodes if log aggregation is disabled. the default is 10800.
yarn.nodemanager.log.deletion-threads-count: The number of threads used by the node managers to clean up logs once the log retention time is hit for local log files when aggregation is disabled.
YARN requires a staging directory for temporary files created by running jobs. local directories for storing various scripts that are generated to start up the job's containers (which will run the map reduce task).
Staging directory:
Local directory:
[yarn.nodemanager.local-dirs]/usercache/$user/appcache/application_${app_is}
When a client launches an application, the corresponding application master container is launched with ID 000001. The default size is 1 GB for each application master container but some time data size will be more. In that case, the application master reaches the limits of its memory in this case application will fail and you will get a similar message as mentioned below.
Application application_1424873694018_3023 failed 2 times due to AM Container for appattempt_1424873694018_3023_000002 exited with exitCode: 143 due to: Container
[pid=25108,containerID=container_1424873694018_3023_02_000001] is running beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory used; 1.5 GB of 2.1 GB virtual memory used. Killing container.
One of the most frequently posed Hadoop scenario based interview questions, be ready for this conceptual question.
When configuring MapReduce 2 resource utilization on YARN, there are three aspects to be considered:
Physical RAM limit for each Map and Reduce Task
*********************************************
You can define how much maximum memory each Map and Reduce task will take. Since each Map and each Reduce task will run in a separate container, these maximum memory settings should be at least equal to or more than the YARN minimum Container allocation(yarn.scheduler.minimum-allocation-mb).
In mapred-site.xml: <name>mapreduce.map.memory.mb</name> <value>4096</value> <name>mapreduce.reduce.memory.mb</name> <value>8192</value> The JVM heap size limit for each task
*********************************
The JVM heap size should be set to lower than the Map and Reduce memory defined above, so that they are within the bounds of the Container memory allocated by YARN.
In mapred-site.xml: <name>mapreduce.map.java.opts</name> <value>-Xmx3072m</value> <name>mapreduce.reduce.java.opts</name> <value>-Xmx6144m</value> the amount of virtual memory each task will get
********************************************
Virtual memory is determined on upper limit of the physical RAM that each Map and Reduce task will use.default value is 2.1. for example if Total physical RAM allocated = 4 GB than Virtual memory upper limit = is 4*2.1 = 8.2 GB
Under the Fair scheduler when a single application is running that application may request the entire cluster (if needed). If additional applications are submitted, resources that are free are assigned "fairly" to the new application so that each application gets roughly the same amount of resources. Fair scheduler organizes application further into queues and shares resources fairly between these queues. The fair scheduler in YARN supports hierarchical queues which means sub-queues can be created within a dedicated queue. All queues descend from a queue named “root”.A queue’s name starts with the names of its parents, with periods as separators. So a queue named “parent1” under the root queue would be referred to as “root.parent1”, and a queue named “queue2” under a queue named “parent1” would be referred to as “root.parent1.queue2”
A join operation is used to combine two or more datasets. In Map Reduce joins are of two types – map side joins and reduces side join.
A map side join is one in which join between two tables is performed in the Map phase without the involvement of the Reduce phase. It can be used when one of the data sets is much smaller than other data set and can easily be stored in DistributedCache. One of the ways to store datasets in DistributedCache is to do it in setup() method of Mapper. Since in map side join, the join is performed in mapper phase itself, it reduces the cost that is incurred for sorting and merging data in the shuffle and reduce phase, thereby improving the performance of the task
Reduce side join on the other hand works well for large datasets. Here the reducer is responsible for performing the join operation. This type of join is much simpler to implement as data undergo sorting and shuffling before reaching the reducer and values having identical keys are sent to the same reducer. The reducer is responsible for performing the join operation. It is comparatively simple and easier to implement than the map side join as the sorting and shuffling phase sends the values having identical keys to the same reducer and therefore, by default, the data is organized for us. However, the I/O cost is much higher due to data movement involved in the sorting and shuffling phase.
The map-reduce framework doesn’t guarantee that the combiner will be executed for every job run. The combiner is executed at each buffer spill. During a spill, the thread writing data to the disk first divides data into partitions corresponding to the number of reducers. Within each partition, the thread performs an in-memory sort on the data and applies the combiner function (if any) on the output of sort.
Various ways to reduce data shuffling in a map-reduce job are:
In a map-reduce job, the application master decides how many tasks need to be created for the job to be executed. The number of mapper tasks created is equal to the number of splits. These tasks are usually launched in different JVMs than the application master (governed by data locality).
However, if the job is small, the application master may decide to run the tasks in the same JVM as itself. In such a case the overhead of allocating containers for new tasks and monitoring them to gain that would be had in running the tasks in parallel compared to running the tasks sequentially on the same node. Such a job is called an Uber task.
So how does the application master determine if the job is small enough to be run as an uber task? By default, if the job requires less than 10 mappers for its processing and one 1 reducer, and input size is less than the size of one HDFS block, then the application master may consider launching the job as an uber task.
If the job doesn’t qualify to be run as an uber task then the app master requests for containers to be launched for all map and reduce tasks from the resource manager.
A file is read by a Map-Reduce job using an InputFormat. It defines how the file being read needs to be split up and read. InputFormat, in turn, defines a RecordReader which is responsible for reading actual records from the input files. The split computed by InputFormat is operated upon by map task. Map task uses Record Reader corresponding to InputFormat to read the data within each split and create key-value pairs.
The various types of InputFormat in Hadoop are:
YARN stands for Yet Another Resource Negotiator. YARN is taking care of Job tracker's work like resource management and a part of that YARN is working as a schedule as well. It Supports a variety of processing engines and Applications. When we are saying different data processing engine it means it supports Graph processing, Interactive Stream processing and batch processing to run and process the data which is stored in HDFS. Basically, the Resource manager receives the Job request from the client and accordingly it will Launch Application master JVM having default memory as 1 core and 2gb.
Application Master will contact Name Node and get the location of the block, based on the availability of block in Node Manager It will check whether sufficient resources are available or not, Accordingly it will inform the Resource manager and Resource manager will provide resources to Node Manager to Launch the JVM for the JOB.
Yarn is working as a schedule it means the Scheduler is responsible for allocating the resources to running the Application. It will not monitor the Application as well as it will not track the Application. It will not restart the failed task whether it is failed due to Application failure or Hardware Failure.
YARN Scheduler supports three types of scheduler
1. FIFO scheduler
2. FAIR scheduler
3. Capacity Scheduler.
Based on the Application requirement Hadoop Admin will select either FIFO, FAIR or Capacity Scheduler.
FIFO scheduling is First in First out, in our current environment, this is rarely used. Fair scheduling is a method where resources are distributed in such a way that it is more or less equally divided to each job. Capacity scheduler where you can make sure that some percentage of resources you can assign to cluster based on your demand or computing need.
Prior to start the YARN services, start the Resource manager and node manager services. In between Resource manager and Node, the manager makes sure the resource manager should start before starting node manager services. Please start your YARN services in the sequence mentioned below.
#service Hadoop-yarn-resource manager start
#service -yarn-nodemanager start
#service Hadoop-MapReduce-history server start
Fundamentally snapshot means taking a Xerox copy of the content from the entire file-level or subtree of the file system until a certain time and its read-only. Snapshot is handling data corruption of user or application and accidental delete. It is always quicker to recovery from snapshot as compared to restore of the whole FSImage and it is easy to create a snapshot of the important directory before changing anything to it.
Snapshot can be taken on any directory once you can be marked as "snapshot table", to doing the same you have to provide the command as "Hdfs dfsadmin -allowSnapshot <Path>".Once the snapshot table directory has been created than under that, subdirectory has been created as .snapshot, It is the place where snapshots are stored. There is no limit on the number of snapshot table directories, any number of a directory can create and snapshot table directory can contain 65536 snapshots simultaneously. We can change the name of a snapshot or we can use the default one (based on timestamp: "s'yyyyMMdd-HHmmss.SSS"). If there are any snapshots in the snapshot table directory then neither you can delete the directory nor rename the directory. deleting the snapshot table directory you have to delete all the snapshots under that directory. during the upgrading version of HDFS, ".snapshot" need to first be renamed or deleted to avoid conflicting with the reserved path.
Snapshots are easily created with hdfs dfsadmin command, Please find the few commands related to snapshot.
a. # Create directory structure hdfs dfs -mkdir /my_dir_bibhu b. # Allow snapshots creation for /my_dir_bibhu hdfs dfsadmin -allowSnapshot /my_dir_bibhu Allowing snaphot on /my_dir_bibhu succeeded c. # Create the first snapshot hdfs dfs -createSnapshot /my_dir_bibhu snaptest1 Created snapshot /my_dir_bibhu/.snapshot/snaptest1 d. # .snapshot can be read directly using below command hdfs dfs -ls /my_dir_bibhu/.snapshot Found 1 items drwxr-xr-x - bibhu supergroup 0 2016-12-03 09:52 /my_dir/.snapshot/snaptest1 e. # Create new snapshot - this time for directory containing a file hdfs dfs -createSnapshot /my_dir_bibhu snaptest2 Created snapshot /my_dir_bibhu/.snapshot/snaptest2 f. # This command serves to compare snapshots hdfs snapshotDiff /my_dir_bibhu .snapshot/snaptest1 .snapshot/snaptest2 g. # Restore snapshot directory to a temporary place and check if file is there or not hdfs dfs -cp /my_dir_bibhu/.snapshot/snaptest2 /tmp/dir_from_snapshot hdfs dfs -ls /dir_from_snapshot
Usually, YARN is taking all of the available resources on each machine in the cluster into consideration. Based on the available resources, YARN negotiates the resources as requested from the application or map-reduce running in the cluster. YARN is allocating containers based on how much resources are required to the application. A container is the basic unit of processing capacity in YARN, and the resource element included memory CPU, etc. In the Hadoop cluster, it is required to balance the usage of memory(RAM), processors (CPU cores) and disks so that processing is not controlled by any one of these cluster resources. As per the best practice, it allows for two containers per disk and one core gives the best balance for cluster utilization.
When you are considering the appropriate YARN and MapReduce memory configurations for a cluster node, in such a case, it is an ideal situation to consider the below values in each node.
Prior to calculating how much RAM, how much CORE and how much disks are required, you have to be aware of the below parameters.
Basically "mapreduce.task.io.sort.mb" is the total amount of buffer memory which is to use while sorting files. It is representing in megabytes.
Tune or provide the io.sort.mb value in such a way that the number of spilled records equals or is as close to equal the number of map output records.
Map-reduce job makes the assurance that the input to every reducer is sorted by key. The process by which the system performs the sort and then transfers the mapper output to the reducers as inputs are known as shuffle. In the Map-reduce job, shuffle is an area of the code where fine-tuning and improvements are continually being made. In many ways, the shuffle is the heart of the map-reduce job. When the map function starts producing output, the process takes an advantage of buffering and writes in memory and doing some presorting for more efficiency as well.
Each map task has a circular memory buffer that writes the output too. The buffer is 100mb by default, a size which can be tuned by changing the io.sort.mb property when the contents of the buffer reach a certain threshold size. Usually the default threshold size of io.sort.spill is 0.8 or 80% when it reaches the threshold a background thread will start to spill the contents to disk. Mapper output will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time the map will block until the spill is complete. Spills are written in a round-robin fashion to the directories specified by the mapred.local.dir property in a subdirectory.
Each time when the memory buffer reaches the spill threshold at that time a new spill file is created, so after the map task has written its last output record there could be several spill files before the task is finished. The spill files are merged into single partitioned and sorted the output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once. the default value of io.sort.factor is 10.
Just want to brief about how io.sort.factor is working, when the Mapper task is running it continuously writing data into Buffers, to maintain the buffer we have to set up a parameter called io.sort.spill .percent.
The value of io.sort.spill.percent will indicate, after which point the data will be written into disk instead of a buffer which is filling up. All of this spilling to disk is done in a separate thread so that the Map can continue running. There may be multiple spills on the task tracker after the map task finished. Those files have to be merged into one single sorted file per partition which is fetched by a reducer. The property io.sort.factor says how many of those spill files will be merged into one file at a time.
Basically DFS.HOST file contains all the data node details and it allows access to all the nodes mentioned in the DFS.HOST file. This is the default configuration used by the name node. DFS.HOST and DFS.HOST.EXCLUDE will help to re-commission and decommission the data nodes.
Hadoop provides the decommission feature to exclude a set of existing data nodes, the nodes to be taken out, should be included in excluding file and the exclude file name should be specified as a configuration parameter as dfs.hosts.exclude. You can find the example mentioned below.
Examples:
Modify the conf/mapred-site.xml, add: <property> <name>dfs.hosts</name> <value>/opt/hadoop/Bibhu/conf/datanode-allow.list</value> </property> <property> <name>dfs.hosts.exclude</name> <value>/opt/hadoop/Bibhu/conf/datanode-deny.list</value> </property>
Decommission cannot happen immediately because it requires replication of potentially a large number of blocks and we do not want the cluster to be overwhelmed with just this one job. The decommission progress can be monitored on the name-node web UI or Cloudera UI. Till all blocks are replicated, the status of nodes will be in the "Decommission in progress" state. when decommission is done the state will change to "Decommissioned". The node can be removed whenever decommission is finished.
We can use below commands Without creating a dfs.hosts file or making any entries, run the commands hadoop.dfsadminrefreshModes on the Name Node.
# $HADOOP_HOME/bin/hadoop dfsadmin -refresh nodes
-refreshNodes, It will update the name node with a set of data nodes so that data nodes are allowed to connect the Name node.
This, along with other Hadoop basic interview questions, is a regular feature in Hadoop interviews, be ready to tackle it with the approach mentioned above.
Java garbage collection is the process by which Java programs perform automatic memory management. when we are talking about automatic memory management, it is a technique that automatically manages to allocation and deallocation of memory. Java programs compile to bytecode that can be run on a Java Virtual Machine alternatively Byte code is the compiled format of java program, once java program has been converted to byte code afterward it will execute by JVM and transferred across a network. While Java programs are running on the JVM , JVM has consumed memory which is called heap memory to do the same. Heap memory is a part of memory dedicated to the program.
Hadoop mapper is a java process and every java process has its own heap memory. Heap memory maximum allocation settings configured as mapred.map.child.java.opts or mapreduce.map.java.opts in Hadoop2. If the mapper process runs out of heap memory then the mapper throws a java out of memory exceptions as mentioned below.
Error: java.lang.Runtimeexception:Java.lang.OutofMemoryError
The java heap settings or size should be smaller than the Hadoop container memory limit because we need to reserve some memory for java code. Usually, it is recommended to reserve 20% memory for code. So if the settings are correct then Java-based Hadoop tasks will never get killed by Hadoop so you will not see the "Killing container" error like above.
To execute the actual map or reduce task, YARN will run a JVM within the container. the Hadoop property MapReduce.{map|reduc}.java.opts is proposed to pass to this JVM. This could include -Xmx to set the max heap size of the JVM.
Example: hadoop jar<jarName> -Dmapreduce.reduce.memory.mb=4096 -Dmapreduce.map.java.opts=-Xmx3276
Hive works on structured data provide a SQL like a layer on top of HDFS, Map-reduce task will execute for each query of Hive which is trying to do some compute of HDFS data. Impala is a Massive parallel processing SQL query engine that is capable enough to handle a huge volume of data. Impala is faster than Hive because Impala is not storing the intermediate query results on disk, it processes the SQL query in Memory without running any Map-reduce.
Below are the few Hive components
1. Hive Clients:
Hive clients are helping hive to perform the queries. There are three types of clients we can use to perform the queries
2. Hive Services
The compiler will verify the syntax check with the help of schema present in the metastore then optimizer generates the optimized logical plan in the form of Directed Acyclic Graph of Map-reduce and HDFS tasks. The Executor executes the tasks after the compilation and optimization steps. The Executor directly interacts with the Hadoop Job Tracker for scheduling of tasks to be run.
Impala components are 1. Impala daemon(Impalad) 2. Impala State Store 3. Impala Catalog Service.
Whenever query submitted in any impala daemon, the related node is considered " central coordinator node" for that query. After accepting the query, IMPALAD logically divides the query into smaller parallel queries and distribute them to different nodes in the impala cluster. all the Impalad gather all the intermediate result and send it to the central coordinator node, accordingly central coordinator node constructs the final query output.
Example : INVALIDATE METADATA [[db_name.]table_name];
REFRESH [db_name.]table_name];
As we know that most of the Hive tables are containing billions and millions records and for any computation hive query will process with the help of Mapper and Reducer and it will consume more time and memory. Few of the optimization techniques which will always help hive query to perform better . Please find few of the below techniques.
1. Use Tez to Fasten the execution:
Apache TEZ is an execution engine used for faster query execution. Tez will allow you to launch a single Application Master for each session for multiple job, condition is that jobs are comparatively small so that Tez memory can use for those jobs. You need to set up the processing engine as Tez instead of default Map-Reduce execution engine providing below parameter.
Set hive.execution.engine=tez;
If you are using Cloudera/Hortonworks, then you will find TEZ option in the Hive query editor as well.
2. Enable compression in Hive
Basically Compression techniques, It reduce the amount of data size being transferred, so that it reduces the data transfer between mappers and reducers and compression is not suggestible if your data is already compressed because the output file size might be larger than the original.
For better result, you need to perform compression at both mapper and reducer side separately. There are many compression formats are available out of which gzip is taking more CPU resources than Snappy or LZO but it provides higher compression ratio. It is not relevant for splittable table.
Other formats are snappy, lzo, bzip, etc. You can set compression at mapper and reducer side using codes below:
set mapred.compress.map.output = true;
set mapred.output.compress= true;
Users can also set the following properties in hive-site.xml and map-site.xml to get permanent effects.
<property> <name>mapred.compress.map.output</name> <value>true</value> </property> <property> <name>mapred.map.output.compression(for MR)/compress(for Yarn).codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>
3. Use ORC file format
ORC (optimized record columnar) is an appropriate format for hive performance tunin,query performance can improve using ORC file format easily. We can use ORC file format for all kind of table whether it is partitioned or single and in response, you get faster computation and compressed file size.
4. Optimize your joins
If your table is having large data then it is not advisable to just use normal joins which we use in SQL. There are many other joins like Map Join; bucket joins, etc. which will help to improve Hive query performance.
5. Use Map Join
When we are talking about Map join, It is beneficial when one table is as compare to other table which will take part of the Join. so that it can fit into the memory. Hive has a property which can do auto-map join when enabled. Set the below parameter to true to enable auto map join. Set hive.auto.convert.join to true to enable the auto map join. we can set this from the command line as well as from the hive-site.xml file
<property> <name>hive.auto.convert.join</name> <value>true</value> <description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size</description> </property>
6. Bucketed Map Join
If tables are bucketed by a particular column, you can use bucketed map join to improve the hive query performance. You can set the below two property to enable the bucketed map to join in Hive.
<property> <name>hive.optimize.bucketmapjoin</name> <value>true</value> <description>Whether to try bucket mapjoin</description> </property> <property> <name>hive.optimize.bucketmapjoin.sortedmerge</name> <value>true</value> <description>Whether to try sorted bucket merge map join</description> </property>
7. Use Partition
Partition is always helpful for huge data. It is used to segregate the large table based on certain columns so that the whole data can be divided into small chunks. When we are saying partition the table, basically It allows you to store the data under sub-directory inside a table.
Selecting the partition table is always a critical decision, and you need to take care of future data and volume of data as well. For example, if you have data of a particular location then you can partition the table based on state. You can also partition the data in month wise as well. You can define the partition column based on your requirement.
Here is the syntax to create a partition table
CREATE TABLE countrydata_partition (Id int, country name string, population int, description string) PARTITIONED BY (country VARCHAR(64), state VARCHAR(64)) row format delimited fields terminated by ‘\t’ stored AS textfile;
There are two types of partition in Hive.
By default, the partition is static in a hive. In static partition usually we are providing the parameter as " PARTITIONED BY (department String) ". when loading big files into the hive, the static partition is preferred.
Single insert to partition table is known as dynamic partition and it load the data from non partitioned Table. If you don't know how many columns are available in your table in this scenario also dynamic partition is suitable. To use dynamic partition in Hive, you need to set the following property-
8. Use Vectorization
A standard query is executed one row at a time. vectorized query execution, it improves performance of operation like scan, aggregation, filter and joins and it is considering 1024 rows at a time to perform the operation. To use Vectorization you can use the below parameter.
LDAP and Active Directory are providing a centralized security system for managing both servers and users, It is managing for all user accounts and associated privileges for your employee. Kerberos is handled Authentication it means when a user trying to connect any Hadoop services, Kerberos will authenticate the user first then it will authenticate service too. when you are considering AD, LDAP and Kerberos in this scenario Kerberos will only provide authentication, all Identity Management is handled outside of Kerberos that is in AD and LDAP.
In the high level when a new employee joins, his/her id has to be added in Active directory first then LDAP and Kerberos because AD is a directory service, owned by Microsoft and AD supports several standard protocols such as LDAP and Kerberos.
LDAP and AD communicating with each other based on what user ID belongs to which group, for example, user Bibhu is a member of which groups and what kind of access permission he is having in different directories or files. These are the information is managed differently in AD and Linux system. In Windows, we have a concept called SID or Window security identifiers and in Linux, we do have a User ID or Group ID. SSSD can use the SID of an AD user to algorithmically generate POSIX IDs in a process called ID mapping. ID mapping creates a map between SIDs in AD and UID/GID on Linux.
AD can create and store POSIX attributes such as uidNumber, gidNumber, unixHomeDirectory, or login Shell
There are two ways to mapping these SID and UID/GID using SSSD.
ldap_id_mapping = true
ldap_id_mapping = False
Below are few concepts need to know to understand the Integration of AD/LDAP/Kerberos
PAM: PAM stands for pluggable authentication Module, which allows integration of authentication technology such as Unix, Linux, LDAP, etc into system services such as password, login, ssh, etc. alternatively When you're prompted for a password, that's usually PAM's doing. PAM provides an API through which authentication requests are mapped into technology-specific actions. This kind of mapping is done by PAM configuration files. Authentication mechanism is providing for each service.
NSS: NSS uses a common API and a configuration file (/etc/nsswitch.conf) in which the name service providers for every supported database are specified. Here Names include hostnames, usernames, group names such as /etc/passwd, /etc/group, and /etc/hosts.
Below are 3 ways of integrating Linux with AD for Authentication
Let’s understand clearly:
1. Using LDAP/Kerberos PAM and NSS Module:
PAM is configured to use Kerberos for authentication and NSS is to use the LDAP protocol for querying UID or GID information. nss_ldap, pam_ldap, and pam_krb5 modules are available to support.
Here Problem is no caching of the credentials and there is no such offline support available here.
2. Using Winbind:
Samba Winbind was a traditional or usual way of connecting Linux systems to AD. Basically, Winbind copy a Windows client on a Linux system and is able to communicate to AD servers alternatively we have winbind daemon which will receive calls from PAM and NSS, Once it is received it will translate into corresponding Active directory calls using either LDAP, KERBEROS or Remote protocol(RPC) depending on the requirement. The current versions of the System Security Services Daemon (SSSD) closed a feature gap between Samba Winbind and SSSD so Samba Winbind is no longer the first choice in general.
3. Using SSSD that is system services daemon for Integrating with Active Directory:
The System Security Services Daemon (SSSD) is an intermediary between local clients and any Remote Directories and Authentication Mechanism. The local clients connect to SSSD and then SSSD contacts the external providers that are AD, LDAP server. So here SSSD is working as a Bridge which will help you to Access the AD, LDAP.
Basically System authentication is configured locally which means initially services check with a local user store to determine users and credentials. SSSD allows a local service to check with local cache in SSSD so Local cache information might have taken from an LDAP directory or AD or Kerberos Realm.
Below are the few advantages related to SSSD
sssd daemon provides different services for different purposes. We have a configuration file called sssd.conf which determines what tasks sssd can do. The file has 2 main parts as we can see here:
[sssd]
domains = WIN.EXAMPLE.COM
services = nss, pam
config_file_version = 2
[domain/WINDOWS]
id_provider = ad
auth_provider = ad
access_provider = ad
In the first part, we have clearly mentioned that what services on the system must use sssd, here in the above example nss and Pam has mentioned. The second part, domain/WINDOWS defines directory services also called identity provider for example AD, LDAP server. SSSD connecting AD/LDAP for querying the information, authentication, password change, etc.
In brief below are the steps how SSSD is working or brief about the above diagram
Sentry is a role-based authorization to both data and metadata stored on a Hadoop cluster for a user. Prior to know more about Sentry, below are the components based on which sentry is working.
Sentry server only helps you to get the metadata information. The actual authorization decision is made by a Data engine that runs in data processing applications such as Hive or Impala. Each component loads the Sentry plug-in it means for each service like Hive/Hdfs/Impala/solr, each sentry plug-in has to be installed for dealing with the Sentry services and the policy engine to validate the authorization request.
Below are the few capabilities which sentry is having.
1. Fine-Grained Authorization:
It means Permissions on object hierarchies for example Server level, Database level, Table level, view (Row/column level authorization), URI and permissions hierarchies will be Select/insert/All this is called Fine-Grained Authorization.
2. Role-Based Authorization(RBAC):
Sentry is providing role-based authorization where it is supporting a set of privileges and it supports for role templates which combine multiple access rules for a large set of users and data objects(Database, Table, etc).
For example, If Bibhu joins the Finance Department, all you need to do is add him to the finance-department group in Active Directory. This will give Bibhu access to data from the Sales and Customer tables.
You can create a role called Analyst and grant SELECT on tables Customer and Sales to this role.
Now Bibhu who is a member of the finance-department group gets the SELECT privilege to the Customer and Sales tables.
3. Multi Tanent Administration or Delegate Admin responsibilities:
It is having the capability to delegate or assign the admin responsibilities for a subset of resources. Delegate admin responsibility it means Delegated-Admin Privilege is assigned on a specific set of resources for a specific set of users/groups by a person who has already Delegated-Admin privilege on the specific set of resources.
4. User Identity and Group Mapping: Sentry depends on Kerberos or LDAP to identify the user. It also uses the group mapping mechanism configured in Hadoop to ensure that Sentry sees the same group mapping as other components of the Hadoop ecosystem.
For example, considering that users Bibhu and Sibb belong to an Active Directory (AD) group called the finance-department. Sibb also belongs to a group called finance-managers. In Sentry, create the roles first and then grant required privileges to those roles. For example, you can create a role called Analyst and grant SELECT on tables Customer and Sales to this role.
The next step is to join these authentication entities (users and groups) to authorization entities (roles). This can be done by granting the Analyst role to the finance-department group. Now Bibhu and Sibb who are members of the finance-department group get the SELECT privilege to the Customer and Sales tables.
GRANT ROLE Analyst TO GROUP finance-department ;
Below are some scenarios where Hive, Impala, HDFS, and search activities are working with Sentry. Considering a few examples we will try to understand how it works.
1. Hive and Sentry :
Here in the above query Hive will identify that user Bibhu is requesting SELECT access to the Status table. At this point, Hive will ask the Sentry plugin to validate the access request of Bibhu. The plugin will retrieve Bibhu's privileges related to the Status table and the policy engine will determine if the request is valid or not.
2. Impala and Sentry:
Authorization processing in Impala is more or less the same as Hive. The main difference is the caching of privileges. Usually, Impala’s Catalog server is managing caching roles and privileges or metadata, and spread it to all Impala server nodes. As a result, Impala daemon can authorize queries much faster referring to the metadata from the cache memory. The only drawback related to performance is it will take some time for privilege changes to take effect, it might take a few seconds.
3. Sentry-HDFS Synchronization:
When we are talking about Sentry and HDFS authorization, it basically speaks about Hive warehouse data. Warehouse data means whether it is Hive or Impala data related to Table. The main objective is when other components like Pig, MapReduce or Spark trying to access the hive table at that time similar authorization check will occur. At this point, this feature does not replace HDFS ACLs. The tables which are not associated with sentry those retain their old ACLs.
The mapping of Sentry privileges to HDFS ACL permissions is as follows:
When NameNode loads a Sentry plugin that caches Sentry privileges as well as Hive metadata. It helps HDFS to keep file permissions and Hive tables privileges in sync. The Sentry plugin periodically communicates the Sentry and Metastore to keep the metadata changes are in sync.
For example, if Bibhu runs a Pig job, which is reading from the Sales table data files, anyhow data files will be stored in HDFS. Sentry plugin on the Name Node will figure out that data file is part of Hive data and cover Sentry privileges on top of the file ACLs, It means HDFS will get the same privileges for this Pig client that Hive would have applied for a SQL query.
For HDFS-Sentry synchronization to work, for doing the same you must use the Sentry service, not policy file authorization.
4. Search and Sentry:
Sentry can apply restriction on search tasks which are coming from a browser or command line or through the admin console.
With Search, Sentry stores its privilege policies in a policy file (for example, sentry-provider.ini) which is stored in an HDFS location such as hdfs://ha-nn-uri/user/solr/sentry/sentry-provider.ini.
Multiple policy files for multiple databases is not supported by Sentry with Search. However, you must use a separate policy file for each Sentry-enabled service.
5. Disabling Hive CLI:
To execute the hive queries you have to use beeline. when you will disable Hive CLI, Hive CLI is not supported with Sentry and Hive Metastore also be disabled. This is especially necessary if the Hive metastore has sensitive metadata.
To do the same, you have to modify the hadoop.proxyuser.hive.groups in core-site.xml on the Hive Metastore host.
For example, to give the hive user permission to members of the hive and hue groups, set the property to:
<property> <name>hadoop.proxyuser.hive.groups</name> <value>hive,hue</value> </property>
If More user groups that require access to the Hive Metastore can be added to the comma-separated list as needed.
Hadoop is an open source framework highly adopted by several organizations to store and process a large amount of structured and unstructured data by applying the MapReduce programming model. There are so many top rated companies using Apache Hadoop framework to deal with their large amount of data that is increasing continuously every minute. Coming to the Hadoop cluster, Yahoo is the first name in the list having around 4500 nodes followed by Linkedin and Facebook.
Here are some of the world’s most popular and top-rated organizations that are using Hadoop for their production and research. Adobe, AOL, Alibaba, eBay, and Fox Audience network etc.
If you are looking to build your career in the field of big data Hadoop, then give a start with learning big data Hadoop. You can also take up big data and hadoop certification and start a career as a big data Hadoop professional to solve large data problems.
Interview questions on Hadoop here are the top Hadoop Interview questions asked frequently and which are scenario based. You will also see how to explain Hadoop project in an interview which carries a lot of weight in the interview.
These Hadoop developer interview questions have been designed specially to get you familiarized with the nature of questions that you might face during your interview and will help you to crack Hadoop Interview easily & acquire your dream career as a Hadoop Developer. Top big data Hadoop interview questions will surely boost your confidence to face an interview and will prepare you to answer your interviewer’s questions in the best manner. These interview questions on Hadoop are suggested by the experts. Turn yourself into a Hadoop Developer with big data certifications. Live your dream career!
Submitted questions and answers are subjecct to review and editing,and may or may not be selected for posting, at the sole discretion of Knowledgehut.
Get a 1:1 Mentorship call with our Career Advisor
By tapping submit, you agree to KnowledgeHut Privacy Policy and Terms & Conditions