Hadoop Analysis of Apache Logs Using Flume-NG, Hive and Pig

Posted on December 27, 2012

Big Data is the hotness, there is no doubt about it.  Every year its just gotten bigger and bigger and shows no sign of slowing.  There is a lot out there about big data, but despite the hype, there isn’t a lot of good technical content for those who want to get started.  The lack of technical how-to info is made worse by the fact that many Hadoop projects have moved their documentation around over time and Google searches commonly point to obsolete docs.  My intent here is to provide some solid guidance on how to actually get started with practical uses of Hadoop and to encourage others to do the same.

From an SA perspective, the most interesting Hadoop sub-projects have been those for log transport, namely Scribe, Chukwa, and Flume.  Lets examine each.

Log Transport Choices

Scribe was created at Facebook and got a lot of popularity early on due to adoption at high profile sites like Twitter, but development has apparently ceased  and word is that Facebook stopped using it themselves.  So Scribe is off my list.

Chukwa is a confusing beast, its said to be distributed with Hadoop’s core but its just an old version in the same sub-directory of the FTP site, the actual current version is found under the incubator sub-tree.  It is a very comprehensive solution, including a web interface for log analysis, but that functionality is based on HBase, which is fine if you want to use HBase but may be a bit more than you wish to chew off for simple Hive/Pig analysis.  Most importantly, the major Hadoop distributions from HortonWorks, MapR, and Cloudera use Flume instead.  So if your looking for a comprehensive toolset for log analysis, Chukwa is worth checking out, but if you simply need to efficiently get data into Hadoop for use by other Hadoop components, Flume is the clear choice.

That brings us to Flume, more specifically Flume-NG.  The first thing to know about Flume is that there were major changes to Flume pre and post 1.0, major enough that they took to refering to pre 1.0 as “Flume OG” (“Old generation” or “Origonal Gangsta” depending on your mood) and the new post 1.0 releases as “Flume NG”.  Whenever looking at documentation or help on the web about Flume be certain as to which you are looking at!  In particular, stay away from the Flume CWiki pages,  refer only to the flume.apache.org.  I say that because there is so much old cruft in the CWiki pages that you can be easily mislead and become frustrated, so just avoid it.

Now that we’ve thinned out the available options, what can we do with Flume?

Getting Started with Flume

Flume is a very sophisticated tool for transporting data.  We are going to focus on log data, however it can transport just about anything you throw at it.  For our purposes we’re going to use it to transport Apache log data from a web server back to our Hadoop cluster and store it in HDFS where we can then operate on it using other Hadoop tools.

Flume NG is a java application that, like other Hadoop tools, can be downloaded, unpacked, configured and run, without compiling or other forms of tinkering.  Download the latest “bin” tarball and untar it into /opt and rename or symlink to “/opt/flume” (it doesn’t matter where you put it, this is just my preference).  You will need to have Java already installed.

Before we can configure Flume its important to understand its architecture.  Flume runs as an agent.  The agent is sub-divided into 3 categories: sources, channels, and sinks.  Inside the Flume agent process there is a pub-sub flow between these 3 components.  A source accepts or retrieves data and sends it into a channel.  Data then queues in the channel.  A sink takes data from the channel and does something with it.  There can be multiple sources, multiple channels, and multiple sinks per agent.  The only important thing to remember is that a source can write to multiple channels, but a sink can draw from only one channel.

Lets take an example.  A “source” might tail a file.  New log lines are sent into a channel where they are queued up.  A “sink” then extracts the log lines from the channel and writes them into HDFS.

At first glance this might appear overly complicated, but the distinct advantage  here is that the channel de-couples input and output, which is important if you have performance slowdowns in the sinks.  It also allows the entire system to be plugin-based.  Any number of new sinks can be created to do something with data… for instance, Casandra sinks are available, there is an IRC sink for writing data into an IRC channel.  Flume is extremely flexible thanks to this architecture.

In the real world we want to collect data from a local file, send it across the network and then store it centrally.  In Flume we’d accomplish this by chaining agents together.  The “sink” of one agent sends to the “source” of another.  The standard method of sending data across the network with Flume is using Avro.  For our purposes here you don’t need to know anything about Avro except one of the things it can do is to move data over the network.  Here is what this ultimately looks like:

So on our web server, we create a /opt/flume/conf/flume.conf that looks like this:

## Flume NG Apache Log Collection
## Refer to https://cwiki.apache.org/confluence/display/FLUME/Getting+Started
##
# http://flume.apache.org/FlumeUserGuide.html#exec-source
agent.sources = apache
agent.sources.apache.type = exec
agent.sources.apache.command = gtail -F /var/log/httpd/access_log
agent.sources.apache.batchSize = 1
agent.sources.apache.channels = memoryChannel
agent.sources.apache.interceptors = itime ihost itype
# http://flume.apache.org/FlumeUserGuide.html#timestamp-interceptor
agent.sources.apache.interceptors.itime.type = timestamp
# http://flume.apache.org/FlumeUserGuide.html#host-interceptor
agent.sources.apache.interceptors.ihost.type = host
agent.sources.apache.interceptors.ihost.useIP = false
agent.sources.apache.interceptors.ihost.hostHeader = host
# http://flume.apache.org/FlumeUserGuide.html#static-interceptor
agent.sources.apache.interceptors.itype.type = static
agent.sources.apache.interceptors.itype.key = log_type
agent.sources.apache.interceptors.itype.value = apache_access_combined

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100

## Send to Flume Collector on 1.2.3.4 (Hadoop Slave Node)
# http://flume.apache.org/FlumeUserGuide.html#avro-sink
agent.sinks = AvroSink
agent.sinks.AvroSink.type = avro
agent.sinks.AvroSink.channel = memoryChannel
agent.sinks.AvroSink.hostname = 1.2.3.4
agent.sinks.AvroSink.port = 4545

## Debugging Sink, Comment out AvroSink if you use this one
# http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
#agent.sinks = localout
#agent.sinks.localout.type = file_roll
#agent.sinks.localout.sink.directory = /var/log/flume
#agent.sinks.localout.sink.rollInterval = 0
#agent.sinks.localout.channel = memoryChannel

This configuration looks overwhelming at first, but it breaks down simply into an “exec” source, a “memory” channel, and an “Avro” sink, with additional parameters specified for each. The syntax for each is in the following form:

agent_name.sources = source1 source2 ...
agent_name.sources.source1.type = exec
...

agent_name.channel = channel1 channel2 ...
agent_name.channel.channel1.type = memory
...

agent_name.sinks = sink1 sink2 ...
agent_name.sinks.sink1.type = avro
...

In my example the agent name was “agent”, but you can name it anything you want. You will specify the agent name when you start the agent, like this:

$ cd /opt/flume
$ bin/flume-ng agent -f conf/flume.conf -n agent

Now that our agent is running on the web server, we need to setup the other agent which will deposit logs lines into HDFS. This type of agent is commonly called a “collector”. Here is the config:

## Sources #########################################################
## Accept Avro data In from the Edge Agents
# http://flume.apache.org/FlumeUserGuide.html#avro-source
collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2

## Channels ########################################################
## Source writes to 2 channels, one for each sink (Fan Out)
collector.channels = mc1 mc2

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 100

collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100

## Sinks ###########################################################
collector.sinks = LocalOut HadoopOut

## Write copy to Local Filesystem (Debugging)
# http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
collector.sinks.LocalOut.type = file_roll
collector.sinks.LocalOut.sink.directory = /var/log/flume
collector.sinks.LocalOut.sink.rollInterval = 0
collector.sinks.LocalOut.channel = mc1

## Write to HDFS
# http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = mc2
collector.sinks.HadoopOut.hdfs.path = /flume/events/%{log_type}/%{host}/%y-%m-%d
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
collector.sinks.HadoopOut.hdfs.rollSize = 0
collector.sinks.HadoopOut.hdfs.rollCount = 10000
collector.sinks.HadoopOut.hdfs.rollInterval = 600

This configuration is a little different because the source accepts Avro network events and then sends them into 2 memory channels (“fan out”) which feed 2 different sinks, one for HDFS and another for a local log file (for debugging). We start this agent like so:

# bin/flume-ng agent -f conf/flume.conf -n collector

Once both sides are up, you should see data moving. Use “hadoop fs -lsr /flume” to examine files there and if you included the file_roll sink, look in /var/log/flume.

# hadoop fs -lsr /flume/events
drwxr-xr-x   - root supergroup          0 2012-12-24 06:17 /flume/events/apache_access_combined
drwxr-xr-x   - root supergroup          0 2012-12-24 06:17 /flume/events/apache_access_combined/cuddletech.com
drwxr-xr-x   - root supergroup          0 2012-12-24 09:50 /flume/events/apache_access_combined/cuddletech.com/12-12-24
-rw-r--r--   3 root supergroup     224861 2012-12-24 06:17 /flume/events/apache_access_combined/cuddletech.com/12-12-24/FlumeData.1356329845948
-rw-r--r--   3 root supergroup      85437 2012-12-24 06:27 /flume/events/apache_access_combined/cuddletech.com/12-12-24/FlumeData.1356329845949
-rw-r--r--   3 root supergroup     195381 2012-12-24 06:37 /flume/events/apache_access_combined/cuddletech.com/12-12-24/FlumeData.1356329845950

Flume Tunables & Gotcha’s

There are a lot of tunables to play with and carefully consider in the example configs above. I included the documentation links for each component and I highly recommend you review it. Lets specifically look at some things that might cause you frustration while getting started.

First, interceptors. If you look at our HDFS sink path, you’ll see the path includes “log_type”, “host”, and a date. That data is associated with an event when the source grabs it, it is meta-data headers on each event. You associate that data with the event using an “interceptor”. So look back at the source where we ‘gtail’ our log file and you’ll see that we’re using interceptors to associate the log_type, “host”, and date with each event.

Secondly, by default Flume’s HDFS sink writes out SequenceFiles. This seems fine until you run Pig or Hive and get inconsistent or usual results back. Ensure that you specify the “fileType” as “DataStream” and the “writeFormat” as “Text”.

Lastly, there are 3 triggers that will cause Flume to “roll” the HDFS output file: size, count, and interval. When Flume writes data, if any one of the triggers is true it will roll to use a new file. By default the count is 30 (seconds), size is 1024 (bytes), and count is 10. Think about that, if any of those is true the file is rolled. So you end up with a LOT of HDFS files, which may or may not be what you want. Setting any value to 0 disables that type of rolling.

Analysis using Pig

Pig is a great tool for the Java challenged. Its quick, easy, and repeatable. The only real challenge is in accurately describing the data your asking it to chew on.

The PiggyBank library can provide you with a set of loaders which can save you from regex hell. The following is an example of using Pig on my Flume ingested Apache combined format logs using the PiggyBank “CombinedLogLoader”:

# cd /opt/pig
# ./bin/pig 
2012-12-23 10:32:56,053 [main] INFO  org.apache.pig.Main - Apache Pig version 0.10.0-SNAPSHOT (r: unknown) compiled Dec 23 2012, 10:29:56
2012-12-23 10:32:56,054 [main] INFO  org.apache.pig.Main - Logging error messages to: /opt/pig-0.10.0/pig_1356258776048.log
2012-12-23 10:32:56,543 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://10.12.29.198/
2012-12-23 10:32:57,030 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: 10.12.29.198:9101

grunt> REGISTER /opt/pig-0.10.0/contrib/piggybank/java/piggybank.jar;
grunt> raw = LOAD '/flume/events/apache_access_combined/cuddletech.com/12-12-24/'' 
    USING org.apache.pig.piggybank.storage.apachelog.CombinedLogLoader 
    AS (remoteAddr, remoteLogname, user, time, method, uri, proto, status, bytes, referer, userAgent); 
grunt> agents = FOREACH raw GENERATE userAgent;
grunt> agents_uniq = DISTINCT agents;
grunt> DUMP agents_uniq;
...

(-)
(Motorola)
(Mozilla/4.0)
(RSSaggressor)
(Java/1.6.0_24)
(restkit/4.1.2)
(Blogtrottr/2.0)
(Mozilla/5.0 ())
(Recorded Future)
...

While Pig is easy enough to install (unpack and run), you must build the Piggybank JAR, which means you’ll need a JDK and Ant. On a SmartMachine with Pig installed in /opt/pig, it’d look like this:

# pkgin in sun-jdk6-6.0.26 apache-ant
# cd /opt/pig/
# ant
....
# cd /opt/pig/contrib/piggybank/java
# ant
....
jar:
     [echo]  *** Creating pigudf.jar ***
      [jar] Building jar: /opt/pig-0.10.0/contrib/piggybank/java/piggybank.jar

BUILD SUCCESSFUL
Total time: 5 seconds

Analysis using Hive

Similar to Pig, the challenge with Hive is really just describing the schema around the data. Thankfully there is assistance out there for just this problem.

[root@hadoop02 /opt/hive]# bin/hive
Logging initialized using configuration in jar:file:/opt/hive-0.9.0-bin/lib/hive-common-0.9.0.jar!/hive-log4j.properties
Hive history file=/tmp/root/hive_job_log_root_201212241029_318322444.txt
hive> 
hive> CREATE EXTERNAL TABLE access(
    >   host STRING,
    >   identity STRING,
    >   user STRING,
    >   time STRING,
    >   request STRING,
    >   status STRING,
    >   size STRING,
    >   referer STRING,
    >   agent STRING)
    > ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
    > WITH SERDEPROPERTIES (
    >   "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?",
    >   "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
    > )
    > STORED AS TEXTFILE
    > LOCATION '/flume/events/apache_access_combined/cuddletech.com/12-12-24/';
OK
Time taken: 7.514 seconds
hive>

Now you can query to your hearts content. Please note that in the above example if you omit the “EXTERNAL” keyword when creating the table that Hive will move your data into its own data warehouse directory, which may not be what you want.

Next Steps

Hadoop provides an extremely powerful set of tools to solve very big problems. Pig and Hive are easy to use and very powerful. Flume-NG is an excellent tool for reliably moving data and extremely extensible. There is a lot I’m not getting into here, like using file-backed or database backed channels in Flume to protect against node failure thus increasing delivery reliability, or using multi-tiered aggregation by using intermediate Flume agents (meaning, Avro Source to Avro Sink)… there is a lot of fun things to explore here. My hope is that I’ve provided you with an additional source of data to help you on your way.

If you start getting serious with Hadoop, I highly recommend you buy the following O’Reilly books for Hadoop, which are very good and will save you a lot of time wasted in trial-and-error:

A Friendly Warning

In closing, I feel it necessarily to point out the obvious. For most people there is no reason to do any of this. Hadoop is a Peterbilt for data. You don’t use a Peterbilt for a job that can be done with a Ford truck, its not worth the time, money and effort.

When I’ve asked myself “How big must data be for it to be big data?” I’ve come up with the following rule: If a “grep” of a file takes more than 5 minutes, its big. If the file can not be reasonably sub-divided to be smaller files or any query requires examining multiple files, then it might be Hadoop time.

For most logging applications, I strongly recommend either Splunk (if you can afford it) or using Rsyslog/Logstash and ElasticSearch, they are far more suited to the task with less hassle, less complexity and much more functionality.