Blog

Including custom Flume components in Cloudera Manager

21 Feb, 2016
Xebia Background Header Wave

I’m currently working on a Hadoop project using Cloudera’s stack. We’re running a couple of Flume jobs to move data around our cluster. Our Flume Metric Details page in Cloudera Manager looked like this:

cmf-screenshot

You could infer from the image that we run a BarSource alongside our FooSource and BazSource and you would be correct. However, it doesn’t show up in Cloudera Manager. Why not?

The FooSource and BazSource are standard source types included with the platform. The BarSource is a subclass of AbstractEventDrivenSource that we wrote ourselves to pull data from a customer-specific system.

How do you get a custom Flume source or sink included on this dashboard? This is not difficult, the secret is simply JMX. Unfortunately, the documentation is a bit thin. At the time of writing, the Flume Developer Guide doesn’t mention JMX at all.

The flume-core package includes JMX MBeans for each of the component types: SourceCounter, ChannelCounter and SinkCounter. If you include the appropriate counter MBean in your custom Flume component, that component will appear in Cloudera Manager. Here’s a simple example of SourceCounter in use:

package com.xebia.blog.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractEventDrivenSource;

/**
 * Demonstrates the use of the {@code SourceCounter}` in Flume-NG.
 */
public class DemoSource extends AbstractEventDrivenSource {

    private SourceCounter counter;

    @Override
    protected void doConfigure(Context context) throws FlumeException {

        // Counter MBeans are created in the configure method, with the component name we've been provided.
        this.counter = new SourceCounter(this.getName());
    }

    @Override
    protected void doStart() throws FlumeException {
        // You start the counter in start()
        this.counter.start();

        // This example is an event-driven source, so we'll typically have some sort of connection and callback method.
        connectToDataSourceWithCallback(this);
        this.counter.setOpenConnectionCount(1);
    }

    @Override
    protected void doStop() throws FlumeException {
        // Disconnect from the data source...
        disconnect();
        this.counter.setOpenConnectionCount(0);

        // ...and stop the counter.
        this.counter.stop();
    }

    /**
     * Callback handler for our example data source.
     */
    public void onIncomingData(Object dataSourceEvent) {
        // Count how many events we receive...
        this.counter.incrementEventReceivedCount();

        // ...do whatever processing it is we do...
        Event flumeEvent = convertToFlumeEvent(dataSourceEvent);

        // ...and count how many are successfully forwarded.
        getChannelProcessor().processEvent(flumeEvent);
        this.counter.incrementEventAcceptedCount();
    }
}

The SourceCounter MBean has some other metrics that you can increment as needed. The ChannelCounter and SinkCounter MBeans work the same way. In lieu of full documentation, the Flume source code can be mined for examples: sources, channels and sinks.

This article was originally published on the Xebia Blog.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts