Chapter 4. Free-Form Query Import

The previous chapters covered the use cases where you had an input table on the source database system and you needed to transfer the table as a whole or one part at a time into the Hadoop ecosystem. This chapter, on the other hand, will focus on more advanced use cases where you need to import data from more than one table or where you need to customize the transferred data by calling various database functions.

For this chapter we’ve slightly altered the test table cities (see Table 4-1), normalizing the country name to a standalone table called countries (see Table 4-2). The normalized variant of the table cities is called normcities and will be created and populated automatically via the script mysql.tables.sql as described in Chapter 2.

Table 4-1. Normalized cities

idcountry_idcity

1

1

Palo Alto

2

2

Brno

3

1

Sunnyvale

Table 4-2. Countries

country_idcountry

1

USA

2

Czech Republic

Importing Data from Two Tables

Problem

You need to import one main table; however, this table is normalized. The important values are stored in the referenced dictionary tables, and the main table contains only numeric foreign keys pointing to the values in the dictionaries rather than to natural keys as in the original cities table. You would prefer to resolve the values prior to running Sqoop and import the real values rather than the numerical keys for the countries.

Solution

Instead of using table import, use free-form query import. In this mode, Sqoop will allow you to specify any query for importing data. Instead of the parameter --table, use the parameter --query with the entire query for obtaining the data you would like to transfer.

Let’s look at an example with the normalized table normcities and its dictionary countries. In order to achieve the same output as with importing the denormalized table cities, you could use the following Sqoop command:

sqoop import \
  --connect jdbc:mysql://mysql.example.com/sqoop \
  --username sqoop \
  --password sqoop \
  --query 'SELECT normcities.id, \
                  countries.country, \
                  normcities.city \
                  FROM normcities \
                  JOIN countries USING(country_id) \
                  WHERE $CONDITIONS' \
  --split-by id \
  --target-dir cities

Discussion

The free-form query import is one of the advanced features of Sqoop. As with all advanced software features, it gives you great power. With great power comes significant responsibility.

There is a lot to be aware of when using free-form query imports. By using query imports, Sqoop can’t use the database catalog to fetch the metadata. This is one of the reasons why using table import might be faster than the equivalent free-form query import. Also, you have to manually specify some additional parameters that would otherwise be populated automatically. In addition to the --query parameter, you need to specify the --split-by parameter with the column that should be used for slicing your data into multiple parallel tasks. This parameter usually automatically defaults to the primary key of the main table. The third required parameter is --target-dir, which specifies the directory on HDFS where your data should be stored.

Caution

The free-form query import can’t be used in conjunction with the \--warehouse-dir parameter.

Sqoop performs highly efficient data transfers by inheriting Hadoop’s parallelism. To help Sqoop split your query into multiple chunks that can be transferred in parallel, you need to include the $CONDITIONS placeholder in the where clause of your query. Sqoop will automatically substitute this placeholder with the generated conditions specifying which slice of data should be transferred by each individual task. While you could skip $CONDITIONS by forcing Sqoop to run only one job using the --num-mappers 1 parameter, such a limitation would have a severe performance impact.

Sqoop will concurrently run several instances of your query at the same time for different slices of data. With one straightforward join, this won’t be an issue, but it can be an issue for more complex queries.

Note

If your query needs more than a few seconds in order to start sending data, it might not be suitable for the free-form query import. If this is the case, you can always run the expensive query once prior to Sqoop import and save its output in a temporary table. Then you can use table import to transfer the data into Hadoop.

Using Custom Boundary Queries

Problem

You found free-form query import to be very useful for your use case. Unfortunately, prior to starting any data transfer in MapReduce, Sqoop takes a long time to retrieve the minimum and maximum values of the column specified in the --split-by parameter that are needed for breaking the data into multiple independent tasks.

Solution

You can specify any valid query to fetch minimum and maximum values of the --split-by column using the --boundary-query parameter:

sqoop import \
  --connect jdbc:mysql://mysql.example.com/sqoop \
  --username sqoop \
  --password sqoop \
  --query 'SELECT normcities.id, \
                  countries.country, \
                  normcities.city \
                  FROM normcities \
                  JOIN countries USING(country_id) \
                  WHERE $CONDITIONS' \
  --split-by id \
  --target-dir cities \
  --boundary-query "select min(id), max(id) from normcities"

Discussion

In order to partition data into multiple independent slices that will be transferred in a parallel manner, Sqoop needs to find the minimum and maximum value of the column specified in the --split-by parameter. In a table-based import, Sqoop uses the table’s primary key by default and generates the query select min(col), max(col) from tbl (for table tbl and split column col). In the case of the free-form query import, there is no table that Sqoop can use for fetching those values; instead, it will use the entire query specified on the command line as a subquery in place of the table name, resulting in a query select min(col), max(col) from ($YOUR_QUERY). Such a query is highly inefficient, as it requires materialization of the output result set prior to moving any data just for the purpose of getting the import boundaries.

Without understanding your query and the underlying data, there aren’t many optimizations that Sqoop can automatically apply. Sqoop does offer the parameter --boundary-query, with which a custom query can override the generated query. The only requirement for this query is to return exactly one row with exactly two columns. The first column will be considered the lower bound, while the second column will be the upper bound. Both values are inclusive and will be imported. The type of both columns must be the same as the type of the column used in the --split-by parameter. Knowing your data and the purpose of your query allows you to easily identify the main table, if there is one, and select the boundaries from this table without any additional join or data transformations.

The query used for fetching boundaries can indeed be arbitrary. Let’s walk through a few examples. If you happen to know the boundaries prior to running Sqoop, you can select them directly without opening a single table using a constant boundary query like SELECT 1, 500. If you’re storing the minimum and maximum values in different tables for accounting purposes, you can fetch the data from there as well. There is no requirement to reference any table used in the --query parameter inside the --boundary-query parameter. As the output of the boundary query serves as the basis for importing data, it is imperative that the return value not skew the import process.

Renaming Sqoop Job Instances

Problem

You run several concurrent free-form query imports from various databases at the same time on your Hadoop cluster. All MapReduce jobs are named QueryResult.jar, so it’s very hard to see which MapReduce job belongs to which imported query.

Solution

You can use the command-line parameter --mapreduce-job-name to specify the name of the generated MapReduce job. This name will then show up in the JobTracker web UI. To name your job normcities, you would use the following command:

sqoop import \
  --connect jdbc:mysql://mysql.example.com/sqoop \
  --username sqoop \
  --password sqoop \
  --query 'SELECT normcities.id, \
                  countries.country, \
                  normcities.city \
                  FROM normcities \
                  JOIN countries USING(country_id) \
                  WHERE $CONDITIONS' \
  --split-by id \
  --target-dir cities \
  --mapreduce-job-name normcities

Discussion

Sqoop follows the default behavior of Hadoop in using the submitted JAR name for the MapReduce job name. In a table import, the JAR is named after the table name, resulting in unique JAR and therefore also MapReduce job names. In the free-form query import case, with no single table involved, Sqoop will use QueryResult as the base name for the JAR. All query imports will look exactly the same on the JobTracker web UI. You can use the --mapreduce-job-name parameter to choose a name for your job.

Importing Queries with Duplicated Columns

Problem

You have more than one table that you’re joining in your free-form query. Your Sqoop import is failing with an error message about duplicate columns, similar to the following one:

Imported Failed: Duplicate Column identifier specified: 'id'

Solution

You might need to use SQL projection to rename columns in the query so that each column in the output result set has a unique name. You can do that using the AS syntax. For example, to import city names from the tables cities and normcities, you can use the following query:

--query "SELECT \
    cities.city AS first_city \
    normcities.city AS second_city \
  FROM cities \
  LEFT JOIN normcities USING(id)"

Discussion

During initial preparation and before submitting the MapReduce job, Sqoop performs several actions. One such action is to fetch metadata about the transferred columns and their associated types. During this step, Sqoop will generate a Java class that contains one attribute for each column that will be named as the column itself. Java attributes must be unique; therefore, all columns in your query must have unique names.

While databases generally enforce unique column names in tables, it is a likely scenario that during a join operation two columns from different tables will have the same name. The output result set then contains two columns with the same name. This is especially problematic if your query selects all columns from all join tables using fragments like select table1.*, table2.*. In this case, you must break the general statement down, name each column separately, and use the AS clause to rename the duplicate columns so that the query will not have duplicate names.

Get Apache Sqoop Cookbook now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.