Bulk Data Load Using Apache Beam JDBC
Sometimes we have to load data from relational database in to BigQuery.
In this article I will describe how to do this using
MySQL as a datasource, Apache Beam and Google Dataflow.
Moreover we will do this in parallel.
Lets start. Firs of all we will create Apache Beam project
Apache Beam quick start guide
Then we will do it step by step:
1) Add dependency to beam JDK module
2) Because of we will have parallel data load its good idea to have connection pool.
I choose c3p0 connection pool.
I divide all range of ids into chunks of 1000 elements.
Below is code which retrieves count of elements and produces PCollection of Ranges.
On "Distribute" step I create PCollection of ranges.
Then I do simple reshuffle to break 'fusion' and to achieve parallelism.
You can read about 'fusion' here: fusion optimisation
Sometimes we have to load data from relational database in to BigQuery.
In this article I will describe how to do this using
MySQL as a datasource, Apache Beam and Google Dataflow.
Moreover we will do this in parallel.
Lets start. Firs of all we will create Apache Beam project
Apache Beam quick start guide
Then we will do it step by step:
1) Add dependency to beam JDK module
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-jdbc</artifactId> <version>${beam.version}</version> </dependency>
2) Because of we will have parallel data load its good idea to have connection pool.
I choose c3p0 connection pool.
<dependency> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> <version>0.9.1</version> </dependency>Here is connection pool configuration:
ComboPooledDataSource dataSource = new ComboPooledDataSource(); dataSource.setDriverClass("com.mysql.cj.jdbc.Driver"); dataSource.setJdbcUrl("jdbc connection string"); dataSource.setUser("youruser"); dataSource.setPassword("yourpassword"); dataSource.setMaxPoolSize(10); dataSource.setInitialPoolSize(6); JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(dataSource);3) Now we have to define big query table schema:
List4) Lets create mysql tablefields = new ArrayList<>() ; fields.add(new TableFieldSchema().setName("book_id").setType("STRING")); fields.add(new TableFieldSchema().setName("book_name").setType("STRING")); fields.add(new TableFieldSchema().setName("description").setType("STRING")); fields.add(new TableFieldSchema().setName("author").setType("STRING")); TableSchema schema = new TableSchema().setFields(fields);
CREATE TABLE `tutorials_tbl` ( `id` int(11) NOT NULL AUTO_INCREMENT, `author` varchar(100) COLLATE utf8_unicode_ci NOT NULL, `book_id` varchar(25) COLLATE utf8_unicode_ci NOT NULL, `book_name` varchar(25) COLLATE utf8_unicode_ci NOT NULL, `description` varchar(300) COLLATE utf8_unicode_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6424339 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode5) I will use field 'id' as a key for pagination.
I divide all range of ids into chunks of 1000 elements.
Below is code which retrieves count of elements and produces PCollection of Ranges.
On "Distribute" step I create PCollection of ranges.
Then I do simple reshuffle to break 'fusion' and to achieve parallelism.
You can read about 'fusion' here: fusion optimisation
PCollection6) Final step - now I can read all Ranges in parallel:ranges = p.apply( JdbcIO. { @Override public Long mapRow(ResultSet resultSet) throws Exception { long count = resultSet.getLong(1); return count; } })) .apply("Distribute", ParDo.of(new DoFnread() .withDataSourceConfiguration(config) .withCoder(AvroCoder.of(Long.class)) .withQuery(String.format("select count(id) from %s", "tutorials_tbl")) .withRowMapper(new JdbcIO.RowMapper () () { @ProcessElement public void processElement(ProcessContext c) { Long count = c.element(); int ranges = (int) (count / RANGE); for (int i = 0; i < ranges; i++) { Range range = new Range(); range.setFrom(i * RANGE); range.setTo((i + 1) * RANGE); c.output(range); } if (count > ranges * RANGE) { Range range = new Range(); range.setFrom(ranges * RANGE); range.setTo(ranges * RANGE + count % RANGE); c.output(range); } } })) .apply("Add key",ParDo.of(new Repartition.AddArbitraryKey(new Extract()))) .apply("Reshuffle", Reshuffle.of()) .apply("Rem Key", ParDo.of(new DoFn { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getValue()); } }));, Range>()
ranges.apply(JdbcIO.readAll() .withDataSourceConfiguration(mysqlConfig) .withFetchSize(1000) .withCoder(AvroCoder.of(Keyword.class)) .withParameterSetter(new JdbcIO.PreparedStatementSetter () { @Override public void setParameters(Range element, PreparedStatement preparedStatement) throws Exception { preparedStatement.setLong(1, element.getFrom()); preparedStatement.setLong(2, element.getTo()); } }) .withQuery("select * from tutorials_tbl where id >= ? and id < ?") .withRowMapper(new MyRowMapper()));
Hi, could you provide a working example code? I don't know what import statements to use when looking just at your code. It would be helpful.
ReplyDeleteRegards,
Hoc Leng Chung
I am most interested in how you define or where you import the Range class of your code.
Delete