Blake Smith

create. code. learn.

»

How to use Parquet Java without Hadoop

If you want to use the official Parquet Java library implementation, you’ll quickly see that it brings along Hadoop as a large, cumbersome transitive dependency. This makes it complicated to use parquet in small systems and simple use cases.

In this post, I’ll show you how you can eliminate almost all of the Hadoop dependency. I’m using this technique in production systems that need to export data with a clean, structured data schema. This works for both reading and writing parquet files using the official java implementation. In one case, I’ve seen an over 85% reduction in shaded jar size by cutting out most of the Hadoop transitive dependencies.

This technique can be summarized by:

  1. Switch to using all non-Hadoop Parquet interfaces.
  2. Remove all Hadoop imports from your code
  3. Explicitly bring in transitive dependencies that are still part of the parquet import graph. Exclude all other Hadoop dependencies.

For our method, where we want to write out parquet files to S3 object storage, we’re also going to utilize AWS’ Java NIO FileSystem SPI project to make it easy to do I/O to S3 without having to implement some of the more intricate S3 I/O for direct readinga and writing to S3.

Here are these steps in more detail.

Switch to non-Hadoop Parquet interfaces

Many of the reader and writer interfaces in parquet-java (formerly parquet-mr) are explicitly coupled to Hadoop classes. If all you want to do is read and write parquet files, you’re at a minimum going to interact with ParquetReader , ParquetWriter and their superclasses such as AvroParquetWriter or ProtoParquetWriter. All of these classes have constructors that take in instances of org.apache.hadoop.conf.Configuration to configure the writer, and will also specify input and output paths with a Hadoop filesystem Path.

You need to ensure that none of your code depends on these interfaces. You need to:

  • Replace Hadoop Configuration with an instance of ParquetConfiguration, most likely PlainParquetConfiguration. All parquet readers and writers should have constructor interfaces that take in this configuration object, instead of the Hadoop variant.
  • Replace all instances of Hadoop Path with either a parquet InputFile for reads, or an OutputFile for writes. If you want to write to your local filesystem, you can use the LocalInputFile or LocalOutputFile implementations.

Concretely, if you were writing out Parquet files using an AvroParquetWriter to export to a local FileSystem, you would change your code from something like this:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

final Path hadoopPath = new Path("file:///tmp/file.parquet");
final Configuration hadoopConfiguration = new Configuration();
final ParquetWriter<T> writer = AvroParquetWriter.<T>builder(hadoopPath)
    .withConf(hadoopConfiguration)
    .withSchema(schema)
    .build();

To something like this instead:

import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.io.LocalOutputFile;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Path;

final FileSystem fs = FileSystems.getFileSystem(new URI("file", null, "/", null, null));
final ParquetConfiguration parquetConf = new PlainParquetConfiguration();
final Path nioOutputPath = fs.getPath("file:///tmp/file.parquet");
final LocalOutputFile outputFile = new LocalOutputFile(nioOutputPath);
final ParquetWriter<T> writer = AvroParquetWriter.<T>builder(outputFile)
    .withConf(parquetConf)
    .withSchema(schema)
    .build();

Notice how we’ve switched to using Java’s built in NIO interfaces for file I/O.

In a later section, we’ll see how we can use these same interfaces to do direct I/O to S3 instead of local file system operations.

After you’ve moved over to these interfaces, you should remove all unused org.apache.hadoop.* imports from your code, before proceeding to the next step.

Dependency Changes

Even though none of your code won’t reference anything from Hadoop, we still can’t explicitly exclude all transitive Hadoop dependencies. This is because the core reader / writer classes we’re using still bring in these Hadoop dependencies as imports. While our technique will not explicitly use any of these Hadoop code paths, they are still referenced as imports from classes like ParquetWriter and friends.

I’m guessing that when parquet moves to 2.0, the parquet team will remove this explicit Hadoop coupling, but to do so any sooner would bring breaking interface changes. Until then, we’ll need to do some transitive dependency surgery ourselves.

We explicitly bring in hadoop-common, and intentionally only bring in dependencies that are referenced through the import graph. Everything else is excluded. There might be a better way to do this with maven, but this has been working so far.

For parquet-java version 1.14.1, here’s the relevant maven pom exclusions:

  <!-- BEGIN LINGERING HADOOP TRANSITIVE DEPENDENCIES
      There are still some lingering classes in parquet-mr that are coupled to Hadoop via their public interfaces.
      Remove this entirely once Hadoop is correctly made optional in parquet-mr for another
      bunch of jar size savings.
      NOTE: None of *our* code explicitly calls into any Hadoop code. However, some of the Parquet classes
      we depend on will import Hadoop dependencies (but not use them in our core code paths). Once parquet-mr
      makes Hadoop optional (probably sometime in 2.0.0, since these would break certain public interfaces),
      we can remove everything between these Hadoop dependencies.
      On wildcard exclusions: This is the bare-minimum jars that must be brought into the classpath to satisfy import dependencies.
      (The import graph from the classes we *do* use, such as ParquetWriter, etc.). We explicitly avoid bringing in any transitive dependencies, and just bring
      in the exact jars we need.
      -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <groupId>*</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.woodstox</groupId>
      <artifactId>woodstox-core</artifactId>
      <version>5.4.0</version>
      <exclusions>
        <exclusion>
          <groupId>*</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.codehaus.woodstox</groupId>
      <artifactId>stax2-api</artifactId>
      <version>4.2.1</version>
      <exclusions>
        <exclusion>
          <groupId>*</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>commons-collections</groupId>
      <artifactId>commons-collections</artifactId>
      <version>3.2.2</version>
      <exclusions>
        <exclusion>
          <groupId>*</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>${hadoop.version}</version>
      <exclusions>
        <exclusion>
          <groupId>*</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop.thirdparty</groupId>
      <artifactId>hadoop-shaded-guava</artifactId>
      <version>1.2.0</version>
      <exclusions>
        <exclusion>
          <groupId>*</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <!-- END LINGERING HADOOP TRANSITIVE DEPENDENCIES
         delete everything between these two blocks once parquet-mr makes Hadoop entirely optional.
    -->

Like I mentioned, cutting out all these transitives, along with hadoop-aws took one production shaded jar from ~657MB down to around ~96MB.

If upstream parquet removes all Hadoop interface coupling, we’d be able to get rid of this ugly maven hack altogether.

S3 Reading and Writing

So now we theoretically can read and write parquet files from the local filesystem using Java’s built in NIO interfaces, but what about direct I/O to S3? One of the nicities of using the Hadoop FileSystem abstraction was that we could read and write parquet files directly from blob storage, and we’d like to recreate that.

Let’s fix that by leveraging AWS’ Java NIO SPI for S3. This isn’t strictly necessary, but its implementation already can handle producing a seekable byte channel that implements I/O buffering for us. If you’d rather not bring this library into scope, you’re at a minimum going to need to recreate InputFile and OutputFile implementations that can buffer I/O to and from S3 to your liking.

If you follow the README directions to configure your credentials for S3, you’re 95% of the way to being able to just plug in directly to Parquet reading and writing. Instead of looking up java.nio.file.Path objects from our local file FileSystem, we instead look up java.nio.file.Path objects from the provided s3 filesystem implementation.

  • For reading and writing, lookup Path objects from the S3FileSystem.
  • For writing, the upstream LocalOutputFile implementation works just fine with S3 paths.
  • For reading, you’ll need to use a different InputFile implementation that works correctly with nio interfaces, and doesn’t fallback to legacy File operations.

Here’s an implementation that should work with our S3 NIO FileSystem implementation, that only depends on NIO compatible interfaces:

package me.blakesmith.parquet;

import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;

/**
 * An {@link org.apache.parquet.io.InputFile} implementation that only uses
 * java.nio.* interfaces for I/O operations. The LocalInputFile implementation in
 * upstream parquet-mr currently falls back to the old-school java file I/O APIs
 * (via Path#toFile) which won't work with nio remote FileSystems such as an S3
 * FileSystem implementation.
 */
public class NioInputFile implements InputFile {
  private final Path path;
  private long length = -1;

  public NioInputFile(Path file) {
    path = file;
  }

  @Override
  public long getLength() throws IOException {
    if (length == -1) {
      length = Files.size(path);
    }
    return length;
  }

  @Override
  public SeekableInputStream newStream() throws IOException {

    return new SeekableInputStream() {

      private final SeekableByteChannel byteChannel = Files.newByteChannel(path);
      private final ByteBuffer singleByteBuffer = ByteBuffer.allocate(1);

      @Override
      public int read() throws IOException {
        // There has to be a better way to do this?
        singleByteBuffer.clear();
        final int numRead = read(singleByteBuffer);
        if (numRead >= 0) {
          int value = (int)singleByteBuffer.get(0) & 0xFF;
          return value;
        } else {
          return -1;
        }
      }

      @Override
      public long getPos() throws IOException {
        return byteChannel.position();
      }

      @Override
      public void seek(long newPos) throws IOException {
        byteChannel.position(newPos);
      }

      @Override
      public void readFully(byte[] bytes) throws IOException {
        readFully(bytes, 0, bytes.length);
      }

      @Override
      public void readFully(byte[] bytes, int start, int len) throws IOException {
        final ByteBuffer buf = ByteBuffer.wrap(bytes);
        buf.position(start);
        readFully(buf);
      }

      @Override
      public int read(ByteBuffer buf) throws IOException {
        return byteChannel.read(buf);
      }

      @Override
      public void readFully(ByteBuffer buf) throws IOException {
        int numRead = 0;
        while (numRead < buf.limit()) {
          final int code = read(buf);
          if (code == -1) {
            return;
          } else {
            numRead += code;
          }
        }
      }

      @Override
      public void close() throws IOException {
        byteChannel.close();
      }
    };
  }
}

I’d like to get this implementation pushed upstream, either to replace the LocalInputFile implementation, or to sit alongside it for use cases like this where we want to plug in NIO FileSystem implementations that can’t fall back to legacy File interfaces.

Conclusion

We can now read / write Parquet files, using the official Java implementation, without any explicit dependencies on Hadoop, but still read and write directly to S3 blob storage. Not only does this reduce our jar sizes, but it also cuts down on classpath dependency sprawl. You can embed parquet functions inside smaller codebases where carrying around a prohibitively cumbersome Hadoop dependency would be a complete non-starter.

Parquet is an amazing file format, that’s going to be here for a long time, especially in our current age of cheap blob storage. One of the biggest things holding parquet java back from being ubiquitously usable is issues like this where the implementation bloats your codebases and deployables. I’m eager to both help and support however I can to reduce parquet-java’s dependency on Hadoop, and help bring the benefits of parquet to more code bases.


»

You Are The Business

We need to deliver that feature for the business”, words you might have heard at a daily standup meeting.

or

The business needs this data analysis so it can make a decision”, as you get a CSV dump of the latest sales transactions and hand it off somewhere.

This othering kind of language gives away your power. Treating “the business” like it’s some other entity that’s “over there” that you’re beneath diminishes your ability to succeed in your software craft. You’ve put yourself on the wrong side of a false equation, and made a false dichotomy in your mind.

As a software engineer, you are the business, equally much as any other job role of sales, marketing or HR at your company. The business isn’t something that happens somewhere else. It’s happening right now, with you doing your work, with you delivering useful things to customers that need what you’re making.

Try this language instead:

“We need to deliver that feature for our biggest medical customers”

“Our marketing team needs this data analysis so they can plan our next release launch”

The funny thing is: Once you change your language like this, your mindset and your actions change too. You’re taking responsibility for the outcomes you’re driving towards. You have a human connection to the recipients of your work. But most importantly, you’ve recognized your own power, and not given it away to an unnamed “business person” who doesn’t actually exist at your company.


»

How to Run NixOS Tests (Flake Edition)

This post builds on my last post, which covered how to develop your own custom NixOS test for custom packages and services.

If you want to contribute or work on upstream nixpkgs NixOS module tests, here’s a few pointers I picked up while experimenting with a flake based trial and error workflow.

To run an existing test:

  1. Find a test in nixpkgs/nixos/tests/all-tests.nix.
  2. cd to your local nixpkgs git checkout.
  3. Run the test with: nix run '.#nixosTests.$TEST_NAME.driver.

For example, to run the nginx NixOS tests via flakes, run:

$ nix run '.#nixosTests.nginx.driver'

You can also run the tests with an interactive shell via:

$ nix run '.#nixosTests.nginx.driverInteractive'

Nix package derivations often will link a related test suite via a passthru.tests attribute, so you can execute affected tests when you update or change a package. For example, the gotosocial package derivation links the tests like this:

buildGoModule rec {
  ...
  passthru.tests.gotosocial = nixosTests.gotosocial;
  ...
}

So another way to run the tests is via the linked attribute in the package derivation like so:

$ nix run '.#gotosocial.tests.gotosocial.driver'

Just like in my previous post, a QEMU VM will boot and execute the test suite and print its results to your terminal window.


» all posts