English 中文(简体)
实时数据显示基巴纳的弹性研究
原标题:Realtime data doesn t show up on elasticsearch, kibana

I tried realtime data streams project and use kafka, elasticsearch, kibana, postgres with docker compose and flink.
My data streams like this :
kafka -> flink -> elasticsearch and postgres.

当我试图将卡夫卡的数据输入弹性研究,而关于基巴纳开发工具(GET指数/研究或GET指数)时,我可以找到新的数据,直到取消脱钩工作。

flink工作开端-> can t que new data on kibana -> abolition flink work -> now I can see new data on kibana.

我的法典的一部分是

DataStream<Transaction> transactionStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");

        transactionStream.sinkTo(
                new Elasticsearch7SinkBuilder<Transaction>()
                        .setHosts(new HttpHost("localhost", 9200, "http"))
                        .setEmitter((transaction, runtimeContext, requestIndexer) -> {
                            String json = convertTransactionToJson(transaction);

                            IndexRequest indexRequest = Requests.indexRequest()
                                    .index("transactions")
                                    .id(transaction.getTransactionId())
                                    .source(json, XContentType.JSON);
                            requestIndexer.add(indexRequest);
                        })
                        .build()
        ).name("Elasticsearch Sink");

邮资更新是罚款。

I use Mac and
Java version: 11
flink : 1.18.0
flink connector kafka : 3.0.1-1.18
flink sql connector elasticsearch7 : 3.0.1-1.17

我尝试的是:

  1. attach setBulkFlushInterval(30000) option Because I found this log WARN org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter - Writer was closed before all records were acknowledged by Elasticsearch.

but another error occurs
Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 200 OK}

  1. Clone original code repository

My code exactly same with this repository https://github.com/airscholar/FlinkCommerce
https://www.youtube.com/watch?v=deepQRXnniM

So I clone this, try execute, but same problem happens. On his youtube this problem doesn t happen.

我可以做些什么?

问题回答

学生研究会要么在检查站进行交易,要么在有1,000项行动时,就变换缓冲。 有利的检查站可能是最佳解决办法。

https://rmlies.apache.org/flink/flink-docs-stable/docs/linkors/datastream/elgalsearch/#elgalsearch-sinks-and-fault- Intolerance





相关问题
Roll over index with elastic search and serilog

We are using es 6.7 and serilog 7.1 in our dotnet core application. In our logger implementation vi are using the following index "app-{0:yyyy.MM}-1" for our ElasticsearchSinkOptions. This ...

Change the date format in kibana

I am working for my internship on the implementation of the ElasticSearch family (ElasticSearch+ Kibana+ logstash). Here is my question: I have a field "@ timestamp" with the following format: 2014-05-...

热门标签