Some Bits on PXF Plugins

“Occasionally it becomes desirable and necessary…to make real what currently is merely imaginary”
By Kyle Dunn

If you’ve not heard already, Pivotal eXtensible Framework, or PXF (for those of you with leftover letters in your alphabet soup), is a unified (and parallel) means of accessing a variety of data formats stored in HDFS, via a REST interface. The code base is a part of Apache HAWQ, where it was originally conceived to bridge the gap between HAWQ’s lineage (Greenplum DB on Hadoop) and the ever-growing menu of storage formats in the larger Hadoop ecosystem. Both Greenplum DB and HAWQ use binary storage formats derived from PostgreSQL 8.2 (as of this writing), whereas Hadoop supports a slew of popular formats: plain text delimited, binary, and JSON document, just to name a few too many. To restate more concisely, PXF is an API abstraction layer on top of disparate HDFS data storage formats.

One of the operative terms in the name, “Extensible”, for someone like me, looks like a challenge; just how extensible are we talking?

In the following post I’ll explain my wanderings through the PXF and HAWQ code bases to try and square i — more concretely, to add a new storage format type to PXF by way of the PXF plugin abstractions.

What: Read HAWQ data from an old version of HAWQ (1.x) using a new version of HAWQ (2.x) using PXF.

Why: 100+TB data migrations (read: export and reload) between major HAWQ versions is a hard sell and quite painful if you DO manage to sell it.

How: Weld together the Java library for reading HAWQ’s binary format (HAWQInputFormat) and the PXF plugin Java abstractions.

So where to begin? Enter Pivotal Docs for aspiring PXF developers, conveniently hyperlinked here.

PXF wants a developer to tell it the 1’s and 0’s of three concepts: where to find data, how to divide said data into records, and how to interpret those records in Java land. These concepts manifest themselves as the Fragmenter, Accessor, and Resolver, respectively.

For our task, the Fragmenter requirements mapped somewhat nicely to HAWQInputFormat’s getSplits() function, which essentially takes a YAML metadata file about an HAWQ table (see hawq extract in the appendix) and returns a list of input splits:

List splits

Once we have these splits, we iterate over them and pluck out some of the pertinents, mainly where that data is in HDFS land, and which datanodes can lend a hand in materializing it.

<BEGIN LOOP>
This metadata extraction is (not so elegantly) handled like this:

String filepath = ((HAWQAOSplit) split).getPath().toUri().getPath();
String[] hosts = split.getLocations();

We are also responsible for creating a metadata object, containing the byte offset location of the split in the actual HDFS file, how many bytes the split is, which hosts have a copy, etc. to the new Fragment object:

private static byte[] prepareFragmentMetadata(HAWQAOSplit fsp)
throws IOException {
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
objectStream.writeLong(fsp.getStart());
objectStream.writeLong(fsp.getLength());
objectStream.writeObject(fsp.getLocations());
objectStream.writeBoolean(fsp.getChecksum());
objectStream.writeInt(fsp.getBlockSize());
objectStream.writeObject(fsp.getCompressType());
return byteArrayStream.toByteArray();
}

The last step in this iteration cycle is to instantiate the Fragment object, opportunistically gate check it (to save on company expenses), and send the package over the prairie to its sunny destination – the Accessor.

byte[] fragmentMetadata = prepareFragmentMetadata((HAWQAOSplit) split);
Fragment fragment = new Fragment(filepath, hosts, fragmentMetadata);
fragments.add(fragment);
<END LOOP>
return fragments;

Now that we’ve made it to the Accessor, we head to baggage claim to make sure TSA didn’t tamper with our 18 year…

I mean data… data of the sizable variety.

A PXF Accessor functions perfectly well, so long as you tell it how to open, pour a glass, and re-seal without anyone noticing. Err, take a split from the list of them (sent to us by our Fragmenter friend), read it into a generic Java object, and put the reader back the way it was found (closed). This [ part | all ] isn’t particularly interesting, so I’ll show only a small piece of code for reading:

/**
* Fetches one record from the fragment.
*/
@Override
public OneRow readNextObject() throws Exception {
if (reader.nextKeyValue()) {
return new OneRow(null, (Object) reader.getCurrentValue());
}
else {
return null;
}
}

Each of the records we emit from the Accessor end up in the bit bucket of the Resolver. The following Resolver implementation is especially verbose and HAWQ-centric, but in a nutshell, we use the schema (provided from the YAML metadata file) to deserialize each generic row object into a typed record object, which, is coincidentally a list of field objects:

List org.apache.hawq.pxf.api.OneField record

Ok, ok, since I know you’re not into hand waving, here’s how that looks for something like HAWQ, which inherits a slew of possible datatypes from its fellow Postgres bits:

/**
* For a given field in the HAWQ record we extract its value and insert it
* into the output {@code List} record. A HAWQ field can be a
* primitive type or an array type.
*
* @param record list of fields to be populated
* @param value HAWQRecord containing the actual data read
* @param fieldType field data type (enum)
* @return the number of populated fields
*/
int populateRecord(List record, HAWQRecord value, PrimitiveType fieldType,
int index) throws HAWQException {

int ret = 0;
String sValue = null;

switch (fieldType) {
case BOOL:
ret = addOneFieldToRecord(record, DataType.BOOLEAN, value.getBoolean(index));
break;
case BIT:
ret = addOneFieldToRecord(record, DataType.BIT, value.getBit(index));
break;
case VARBIT:
ret = addOneFieldToRecord(record, DataType.VARBIT, value.getVarbit(index));
break;

case INET:
ret = addOneFieldToRecord(record, DataType.INET, value.getInet(index));
break;
case CIDR:
ret = addOneFieldToRecord(record, DataType.CIDR, value.getCidr(index));
break;
case XML:
sValue = (value != null) ? String.format(“%s”, value.getString(index))
: null;
ret = addOneFieldToRecord(record, DataType.XML, sValue);
break;
default:
break;
}
return ret;
}

And, we’re done. Magic. No, really though, after bumping into the usual suspects of Java “generics” – i.e. sample implementations doing things like:

import org.veryprestigous.ultrahighperformance.library.*
rather than:

import org.veryprestigous.ultrahighperformance.library.totallyradfeature
import org.veryprestigous.ultrahighperformance.library.somelegacyglue

Implementing a [mostly] net-new PXF plugin wasn’t too involved – even for a make-believe Java programmer like myself. Also, it’s worth noting there are some additional PXF API capabilities, like how to compute statistic metadata from the underlying data to the query optimizer and writeable equivalents for each of these classes. These are available and an exercise will be left for the reader on how to accomplish.