Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1 The Next CEO of Stack OverflowKafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memoryFlink Kafka connectorflink read data from kafkaflink kafka consumer groupId not workingHow to use Flink with Kafka 0.10.1.0?Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memoryFlink on Yarn, parallel source with KafkaWhy using kafka with flinkFlink Kafka - how to make App run in Parallel?Kafka consumer in flinkPartition specific flink kafka consumer
Are police here, aren't itthey?
Is a distribution that is normal, but highly skewed considered Gaussian?
Why isn't acceleration always zero whenever velocity is zero, such as the moment a ball bounces off a wall?
Where does this common spurious transmission come from? Is there a quality difference?
Easy to read palindrome checker
Chain wire methods together in Lightning Web Components
Does soap repel water?
Why doesn't UK go for the same deal Japan has with EU to resolve Brexit?
What flight has the highest ratio of time difference to flight time?
Find non-case sensitive string in a mixed list of elements?
Help understanding this unsettling image of Titan, Epimetheus, and Saturn's rings?
Grabbing quick drinks
What happened in Rome, when the western empire "fell"?
Why don't programming languages automatically manage the synchronous/asynchronous problem?
Some questions about different axiomatic systems for neighbourhoods
Would a completely good Muggle be able to use a wand?
How to place nodes around a circle from some initial angle?
Would a grinding machine be a simple and workable propulsion system for an interplanetary spacecraft?
How to invert MapIndexed on a ragged structure? How to construct a tree from rules?
What connection does MS Office have to Netscape Navigator?
What was the first Unix version to run on a microcomputer?
How to count occurrences of text in a file?
Is micro rebar a better way to reinforce concrete than rebar?
Can MTA send mail via a relay without being told so?
Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1
The Next CEO of Stack OverflowKafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memoryFlink Kafka connectorflink read data from kafkaflink kafka consumer groupId not workingHow to use Flink with Kafka 0.10.1.0?Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memoryFlink on Yarn, parallel source with KafkaWhy using kafka with flinkFlink Kafka - how to make App run in Parallel?Kafka consumer in flinkPartition specific flink kafka consumer
I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.
If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
How come it works with parallelism 1 but not with parallelism > 1?
Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?
I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.
I'm using kafka 0.10, flink 1.5.
Many thanks.
apache-kafka apache-flink
add a comment |
I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.
If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
How come it works with parallelism 1 but not with parallelism > 1?
Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?
I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.
I'm using kafka 0.10, flink 1.5.
Many thanks.
apache-kafka apache-flink
How do you set parallelism?env.setParallelism
?
– Soheil Pourbafrani
Mar 7 at 14:25
add a comment |
I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.
If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
How come it works with parallelism 1 but not with parallelism > 1?
Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?
I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.
I'm using kafka 0.10, flink 1.5.
Many thanks.
apache-kafka apache-flink
I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.
If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
How come it works with parallelism 1 but not with parallelism > 1?
Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?
I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.
I'm using kafka 0.10, flink 1.5.
Many thanks.
apache-kafka apache-flink
apache-kafka apache-flink
asked Mar 7 at 14:15
gfytdgfytd
59311228
59311228
How do you set parallelism?env.setParallelism
?
– Soheil Pourbafrani
Mar 7 at 14:25
add a comment |
How do you set parallelism?env.setParallelism
?
– Soheil Pourbafrani
Mar 7 at 14:25
How do you set parallelism?
env.setParallelism
?– Soheil Pourbafrani
Mar 7 at 14:25
How do you set parallelism?
env.setParallelism
?– Soheil Pourbafrani
Mar 7 at 14:25
add a comment |
1 Answer
1
active
oldest
votes
As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:
export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"
But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).
From this answer:
Kafka Consumers handles the data backlog by the following two
parameters,
max.poll.interval.ms
The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.
max.poll.records
The maximum number of records returned in a single
call to poll(). The default value is 500.
Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.
So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?
– gfytd
Mar 8 at 3:48
1
Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.
– Soheil Pourbafrani
Mar 8 at 9:18
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55045912%2fflink-kafka-java-lang-outofmemoryerror-when-parallelism-1%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:
export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"
But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).
From this answer:
Kafka Consumers handles the data backlog by the following two
parameters,
max.poll.interval.ms
The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.
max.poll.records
The maximum number of records returned in a single
call to poll(). The default value is 500.
Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.
So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?
– gfytd
Mar 8 at 3:48
1
Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.
– Soheil Pourbafrani
Mar 8 at 9:18
add a comment |
As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:
export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"
But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).
From this answer:
Kafka Consumers handles the data backlog by the following two
parameters,
max.poll.interval.ms
The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.
max.poll.records
The maximum number of records returned in a single
call to poll(). The default value is 500.
Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.
So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?
– gfytd
Mar 8 at 3:48
1
Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.
– Soheil Pourbafrani
Mar 8 at 9:18
add a comment |
As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:
export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"
But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).
From this answer:
Kafka Consumers handles the data backlog by the following two
parameters,
max.poll.interval.ms
The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.
max.poll.records
The maximum number of records returned in a single
call to poll(). The default value is 500.
Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.
So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:
export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"
But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).
From this answer:
Kafka Consumers handles the data backlog by the following two
parameters,
max.poll.interval.ms
The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.
max.poll.records
The maximum number of records returned in a single
call to poll(). The default value is 500.
Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.
So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
edited Mar 7 at 16:41
answered Mar 7 at 15:01
Soheil PourbafraniSoheil Pourbafrani
1,243532
1,243532
Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?
– gfytd
Mar 8 at 3:48
1
Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.
– Soheil Pourbafrani
Mar 8 at 9:18
add a comment |
Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?
– gfytd
Mar 8 at 3:48
1
Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.
– Soheil Pourbafrani
Mar 8 at 9:18
Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?
– gfytd
Mar 8 at 3:48
Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?
– gfytd
Mar 8 at 3:48
1
1
Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.
– Soheil Pourbafrani
Mar 8 at 9:18
Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.
– Soheil Pourbafrani
Mar 8 at 9:18
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55045912%2fflink-kafka-java-lang-outofmemoryerror-when-parallelism-1%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
How do you set parallelism?
env.setParallelism
?– Soheil Pourbafrani
Mar 7 at 14:25