The journey of upgrading to Flink 1.14.3 to stamp out log4j security risk

Norman He
3 min readFeb 21, 2022

There is a severe security bug before log4j 2.17.1. Only Flink 1.14.3 and the log4j 2.17.1 library can fix the issue. To do so, we need to first upgrade all the Flink clusters managed by docker-swarm from version 1.12.1 to version 1.14.3.

We use Flink sql-client to manage almost all of our Flink streaming job by utilizing metadata[connect data sources or data sinks register in a catalog]. Let us touch upon the metadata management part and Flink 1.14.3’s limitation of not supporting table join in the streaming job.

We choose to use the hive catalog as the current metadata store. It is straightforward integration with the hive catalog, just spring up a hive cluster using docker with Postgres, initializing the Postgress as metadata database storage.

During upgrading, we find out that Flink 1.14.3 is much easier to use due to everything is using the init file format which is the same as all Flink SQL commands. like the following

create catalog myhive with ( ‘type’= ‘hive’, ‘hive-conf-dir’=’/opt/hive_conf’);

use catalog myhive;

use wap;

you first create a catalog and use the catalog, then create database wap, then use wap, since the wap database is created once, so the sql.ini file above doesn’t have the line create database wap [our database name].

Here is the sql.ini other parts which we adopted.

— Properties that change the fundamental execution behavior of a table program.

SET ‘execution.runtime-mode’ = ‘streaming’;

SET ‘sql-client.execution.result-mode’ = ‘table’;

SET ‘sql-client.execution.max-table-result.rows’ = ‘10000’;

SET ‘parallelism.default’ = ‘4’;

SET ‘pipeline.auto-watermark-interval’ = ‘200’;

SET ‘pipeline.max-parallelism’ = ‘10’;

SET ‘restart-strategy’ = ‘fixed-delay’;

— Configuration options for adjusting and tuning table programs.

SET ‘table.optimizer.join-reorder-enabled’ = ‘true’;

SET ‘table.exec.spill-compression.enabled’ = ‘true’;

SET ‘table.exec.spill-compression.block-size’ = ‘128kb’;

SET ‘table.exec.state.ttl’ = ‘86400000’;

SET ‘table.dynamic-table-options.enabled’ = ‘true’;

SET ‘table.exec.source.idle-timeout’ = ‘360000’;

SET ‘table.exec.sink.buffer-flush.interval’ = ‘120 s’;

SET ‘table.exec.sink.buffer-flush.max-rows’ = ‘0’;

SET ‘table.optimizer.distinct-agg.split.enabled’ = ‘true’;

SET ‘table.exec.mini-batch.enabled’ = ‘true’;

SET ‘table.optimizer.agg-phase-strategy’ = ‘TWO_PHASE’;

SET ‘table.exec.mini-batch.size’ = ‘10000’;

SET ‘table.exec.mini-batch.allow-latency’ = ’50 s’;

SET ‘execution.checkpointing.interval’ = ‘180000’;

SET ‘execution.checkpointing.min-pause’ = ‘60000’;

SET ‘execution.checkpointing.timeout’ = ‘1800000’;

SET ‘execution.checkpointing.mode’ = ‘AT_LEAST_ONCE’;

SET ‘state.backend.rocksdb.predefined-options’ = ‘FLASH_SSD_OPTIMIZED’;

SET ‘state.backend’ = ‘rocksdb’;

SET ‘state.backend.incremental’ = ‘true’;

SET ‘state.backend.local-recovery’ = ‘true’;

SET ‘state.checkpoints.num-retained’ = ‘3’;

The syntax is a standard set property method which you could also run in sql client command prompt which is much better than maintaining YAML file which is a different format from Flink SQL format before 1.14.

The next part is just put into correct jars into the /lib or /jars directory. Here is what we used in hive catalog integration with Postgres we put into /opt/flink/jars

you can now run sql-client.sh embedded -l jars -i /youmounteddisk/sql.ini

The next good improvement is done by the Flink team in 1.14.3. You can set the job name when you submit your SQL by giving a name like the following

set pipeline.name=’first insert into vip’;

insert into my_vip select * from kafa_flatter;

Now we will cover if you want to join a static table in the streaming job, since there is a current limitation in flink1.14.3 that join static table[like Postgres table] with the streaming job, the static table in DAG is finished and hence there won’t be any more checkpoint for the streaming job. Since the Streaming job needs checkpointing to function properly, the streaming job with static table join won’t progress further.

The current workaround is writing a user-defined function[UDF] and using it in the streaming job.

We will cover UDF for Flink in the next article.

--

--