Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1 The Next CEO of Stack OverflowKafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memoryFlink Kafka connectorflink read data from kafkaflink kafka consumer groupId not workingHow to use Flink with Kafka 0.10.1.0?Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memoryFlink on Yarn, parallel source with KafkaWhy using kafka with flinkFlink Kafka - how to make App run in Parallel?Kafka consumer in flinkPartition specific flink kafka consumer

Are police here, aren't itthey?

Is a distribution that is normal, but highly skewed considered Gaussian?

Why isn't acceleration always zero whenever velocity is zero, such as the moment a ball bounces off a wall?

Where does this common spurious transmission come from? Is there a quality difference?

Easy to read palindrome checker

Chain wire methods together in Lightning Web Components

Does soap repel water?

Why doesn't UK go for the same deal Japan has with EU to resolve Brexit?

What flight has the highest ratio of time difference to flight time?

Find non-case sensitive string in a mixed list of elements?

Help understanding this unsettling image of Titan, Epimetheus, and Saturn's rings?

Grabbing quick drinks

What happened in Rome, when the western empire "fell"?

Why don't programming languages automatically manage the synchronous/asynchronous problem?

Some questions about different axiomatic systems for neighbourhoods

Would a completely good Muggle be able to use a wand?

How to place nodes around a circle from some initial angle?

Would a grinding machine be a simple and workable propulsion system for an interplanetary spacecraft?

How to invert MapIndexed on a ragged structure? How to construct a tree from rules?

What connection does MS Office have to Netscape Navigator?

What was the first Unix version to run on a microcomputer?

How to count occurrences of text in a file?

Is micro rebar a better way to reinforce concrete than rebar?

Can MTA send mail via a relay without being told so?



Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1



The Next CEO of Stack OverflowKafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memoryFlink Kafka connectorflink read data from kafkaflink kafka consumer groupId not workingHow to use Flink with Kafka 0.10.1.0?Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memoryFlink on Yarn, parallel source with KafkaWhy using kafka with flinkFlink Kafka - how to make App run in Parallel?Kafka consumer in flinkPartition specific flink kafka consumer










0















I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.



If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:



java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)


How come it works with parallelism 1 but not with parallelism > 1?



Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?



I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.



I'm using kafka 0.10, flink 1.5.



Many thanks.










share|improve this question






















  • How do you set parallelism?env.setParallelism?

    – Soheil Pourbafrani
    Mar 7 at 14:25















0















I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.



If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:



java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)


How come it works with parallelism 1 but not with parallelism > 1?



Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?



I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.



I'm using kafka 0.10, flink 1.5.



Many thanks.










share|improve this question






















  • How do you set parallelism?env.setParallelism?

    – Soheil Pourbafrani
    Mar 7 at 14:25













0












0








0








I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.



If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:



java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)


How come it works with parallelism 1 but not with parallelism > 1?



Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?



I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.



I'm using kafka 0.10, flink 1.5.



Many thanks.










share|improve this question














I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.



If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:



java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)


How come it works with parallelism 1 but not with parallelism > 1?



Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?



I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.



I'm using kafka 0.10, flink 1.5.



Many thanks.







apache-kafka apache-flink






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Mar 7 at 14:15









gfytdgfytd

59311228




59311228












  • How do you set parallelism?env.setParallelism?

    – Soheil Pourbafrani
    Mar 7 at 14:25

















  • How do you set parallelism?env.setParallelism?

    – Soheil Pourbafrani
    Mar 7 at 14:25
















How do you set parallelism?env.setParallelism?

– Soheil Pourbafrani
Mar 7 at 14:25





How do you set parallelism?env.setParallelism?

– Soheil Pourbafrani
Mar 7 at 14:25












1 Answer
1






active

oldest

votes


















1














As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:



export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"


But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).



From this answer:




Kafka Consumers handles the data backlog by the following two
parameters,



max.poll.interval.ms

The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.



max.poll.records

The maximum number of records returned in a single
call to poll(). The default value is 500.



Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.




So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.



Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);





share|improve this answer

























  • Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?

    – gfytd
    Mar 8 at 3:48






  • 1





    Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.

    – Soheil Pourbafrani
    Mar 8 at 9:18












Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55045912%2fflink-kafka-java-lang-outofmemoryerror-when-parallelism-1%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:



export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"


But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).



From this answer:




Kafka Consumers handles the data backlog by the following two
parameters,



max.poll.interval.ms

The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.



max.poll.records

The maximum number of records returned in a single
call to poll(). The default value is 500.



Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.




So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.



Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);





share|improve this answer

























  • Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?

    – gfytd
    Mar 8 at 3:48






  • 1





    Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.

    – Soheil Pourbafrani
    Mar 8 at 9:18
















1














As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:



export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"


But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).



From this answer:




Kafka Consumers handles the data backlog by the following two
parameters,



max.poll.interval.ms

The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.



max.poll.records

The maximum number of records returned in a single
call to poll(). The default value is 500.



Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.




So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.



Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);





share|improve this answer

























  • Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?

    – gfytd
    Mar 8 at 3:48






  • 1





    Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.

    – Soheil Pourbafrani
    Mar 8 at 9:18














1












1








1







As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:



export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"


But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).



From this answer:




Kafka Consumers handles the data backlog by the following two
parameters,



max.poll.interval.ms

The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.



max.poll.records

The maximum number of records returned in a single
call to poll(). The default value is 500.



Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.




So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.



Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);





share|improve this answer















As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:



export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"


But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).



From this answer:




Kafka Consumers handles the data backlog by the following two
parameters,



max.poll.interval.ms

The maximum delay between invocations of a poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before the expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member. Default value is
300000.



max.poll.records

The maximum number of records returned in a single
call to poll(). The default value is 500.



Ignoring to set the above two parameters according to the requirement
could lead to polling of maximum data which the consumer may not be
able to handle with the available resources, leading to OutOfMemory or
failure to commit the consumer offset at times. Hence, it is always
advisable to use the max.poll.records, and max.poll.interval.ms
parameters.




So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.



Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);






share|improve this answer














share|improve this answer



share|improve this answer








edited Mar 7 at 16:41

























answered Mar 7 at 15:01









Soheil PourbafraniSoheil Pourbafrani

1,243532




1,243532












  • Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?

    – gfytd
    Mar 8 at 3:48






  • 1





    Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.

    – Soheil Pourbafrani
    Mar 8 at 9:18


















  • Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?

    – gfytd
    Mar 8 at 3:48






  • 1





    Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.

    – Soheil Pourbafrani
    Mar 8 at 9:18

















Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?

– gfytd
Mar 8 at 3:48





Excellent answer, thank you for the detailed explanation! I just tried max.poll.records setting, no longer see OOM. One more question, any side effect setting this to a number smaller than default(500)?

– gfytd
Mar 8 at 3:48




1




1





Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.

– Soheil Pourbafrani
Mar 8 at 9:18






Nice to see it helped! There is no disturbing side effect except that reducing the number of records returned in a single call will decrease your application throughput. It depends on your application needs. If this setting satisfies your needs for reading data from Kafka and process them in Flink, maybe it's the optimal case for you (however you should justify yourself why using distributed platforms, maybe sequential approaches suffice!), else you should customize settings or use more powerful hardware.

– Soheil Pourbafrani
Mar 8 at 9:18




















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55045912%2fflink-kafka-java-lang-outofmemoryerror-when-parallelism-1%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Save data to MySQL database using ExtJS and PHP [closed]2019 Community Moderator ElectionHow can I prevent SQL injection in PHP?Which MySQL data type to use for storing boolean valuesPHP: Delete an element from an arrayHow do I connect to a MySQL Database in Python?Should I use the datetime or timestamp data type in MySQL?How to get a list of MySQL user accountsHow Do You Parse and Process HTML/XML in PHP?Reference — What does this symbol mean in PHP?How does PHP 'foreach' actually work?Why shouldn't I use mysql_* functions in PHP?

Compiling GNU Global with universal-ctags support Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Tags for Emacs: Relationship between etags, ebrowse, cscope, GNU Global and exuberant ctagsVim and Ctags tips and trickscscope or ctags why choose one over the other?scons and ctagsctags cannot open option file “.ctags”Adding tag scopes in universal-ctagsShould I use Universal-ctags?Universal ctags on WindowsHow do I install GNU Global with universal ctags support using Homebrew?Universal ctags with emacsHow to highlight ctags generated by Universal Ctags in Vim?

Add ONERROR event to image from jsp tldHow to add an image to a JPanel?Saving image from PHP URLHTML img scalingCheck if an image is loaded (no errors) with jQueryHow to force an <img> to take up width, even if the image is not loadedHow do I populate hidden form field with a value set in Spring ControllerStyling Raw elements Generated from JSP tagds with Jquery MobileLimit resizing of images with explicitly set width and height attributeserror TLD use in a jsp fileJsp tld files cannot be resolved