Bulk Data Load Using Apache Beam JDBC

Sometimes we have to load data from relational database in to BigQuery.
In this article I will describe how to do this using 
MySQL as a datasource,  Apache Beam and    Google Dataflow. 
Moreover we will do this in parallel.
Lets start. Firs of all we will create Apache Beam project 
Apache Beam quick start guide
Then we will do it step by step:

1)  Add dependency to beam JDK module
<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-io-jdbc</artifactId>
   <version>${beam.version}</version>
</dependency>

2)  Because of we will have parallel data load  its good idea to have connection pool.
I choose c3p0 connection pool.
<dependency>
   <groupId>c3p0</groupId>
   <artifactId>c3p0</artifactId>
   <version>0.9.1</version>
</dependency>
Here is connection pool configuration:
ComboPooledDataSource dataSource = new ComboPooledDataSource();
dataSource.setDriverClass("com.mysql.cj.jdbc.Driver");
dataSource.setJdbcUrl("jdbc connection string");
dataSource.setUser("youruser");
dataSource.setPassword("yourpassword");
dataSource.setMaxPoolSize(10);
dataSource.setInitialPoolSize(6);
JdbcIO.DataSourceConfiguration config 
   = JdbcIO.DataSourceConfiguration.create(dataSource);
3) Now we have to define big query table schema:
List fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("book_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("book_name").setType("STRING"));
fields.add(new TableFieldSchema().setName("description").setType("STRING"));
fields.add(new TableFieldSchema().setName("author").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
4)  Lets create mysql table
CREATE TABLE `tutorials_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `author` varchar(100) COLLATE utf8_unicode_ci NOT NULL,
  `book_id` varchar(25) COLLATE utf8_unicode_ci NOT NULL,
  `book_name` varchar(25) COLLATE utf8_unicode_ci NOT NULL,
  `description` varchar(300) COLLATE utf8_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6424339 DEFAULT CHARSET=utf8 
COLLATE=utf8_unicode
5)  I will use field 'id' as a key for pagination.
I divide all range of ids into chunks of 1000 elements.
Below is code which retrieves count of elements and produces PCollection of Ranges.
On "Distribute" step I create PCollection of ranges.
Then I do simple reshuffle to break 'fusion' and to achieve parallelism.
You can read about 'fusion' here: fusion optimisation

PCollection ranges = p.apply(
        JdbcIO.read()
           .withDataSourceConfiguration(config)
           .withCoder(AvroCoder.of(Long.class))
           .withQuery(String.format("select count(id) from %s", "tutorials_tbl"))
           .withRowMapper(new JdbcIO.RowMapper() {
               @Override               

               public Long mapRow(ResultSet resultSet) throws Exception {
                   long count = resultSet.getLong(1);
                   return count;
               }
           }))
   .apply("Distribute", ParDo.of(new DoFn() {
       @ProcessElement       public void processElement(ProcessContext c) {
           Long count = c.element();
           int ranges = (int) (count / RANGE);
            for (int i = 0; i < ranges; i++) {
               Range range = new Range();
               range.setFrom(i * RANGE);
               range.setTo((i + 1) * RANGE);
               c.output(range);
           }
           if (count > ranges * RANGE) {
               Range range = new Range();
               range.setFrom(ranges * RANGE);
               range.setTo(ranges * RANGE + count % RANGE);
               c.output(range);
           }
       }
   }))

   .apply("Add key",ParDo.of(new Repartition.AddArbitraryKey(new Extract())))
   .apply("Reshuffle", Reshuffle.of())
   .apply("Rem Key", ParDo.of(new DoFn, Range>() {
       @ProcessElement       public void processElement(ProcessContext c) {
           c.output(c.element().getValue());
       }
   }));
6) Final step - now I can read all Ranges in parallel:
ranges.apply(JdbcIO.readAll()
        .withDataSourceConfiguration(mysqlConfig)
        .withFetchSize(1000)
        .withCoder(AvroCoder.of(Keyword.class))
        .withParameterSetter(new JdbcIO.PreparedStatementSetter() {
            @Override            

            public void setParameters(Range element,

                          PreparedStatement preparedStatement) throws Exception {
                preparedStatement.setLong(1, element.getFrom());
                preparedStatement.setLong(2, element.getTo());
            }
        })
        .withQuery("select * from tutorials_tbl where id >= ? and id < ?")
        .withRowMapper(new MyRowMapper()));

Comments

  1. Hi, could you provide a working example code? I don't know what import statements to use when looking just at your code. It would be helpful.

    Regards,

    Hoc Leng Chung

    ReplyDelete
    Replies
    1. I am most interested in how you define or where you import the Range class of your code.

      Delete

Post a Comment

Popular posts from this blog