I tried realtime data streams project and use kafka, elasticsearch, kibana, postgres with docker compose and flink.
My data streams like this :
kafka -> flink -> elasticsearch and postgres.
当我试图将卡夫卡的数据输入弹性研究,而关于基巴纳开发工具(GET指数/研究或GET指数)时,我可以找到新的数据,直到取消脱钩工作。
flink工作开端-> can t que new data on kibana -> abolition flink work -> now I can see new data on kibana.。
我的法典的一部分是
DataStream<Transaction> transactionStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
transactionStream.sinkTo(
new Elasticsearch7SinkBuilder<Transaction>()
.setHosts(new HttpHost("localhost", 9200, "http"))
.setEmitter((transaction, runtimeContext, requestIndexer) -> {
String json = convertTransactionToJson(transaction);
IndexRequest indexRequest = Requests.indexRequest()
.index("transactions")
.id(transaction.getTransactionId())
.source(json, XContentType.JSON);
requestIndexer.add(indexRequest);
})
.build()
).name("Elasticsearch Sink");
邮资更新是罚款。
I use Mac and
Java version: 11
flink : 1.18.0
flink connector kafka : 3.0.1-1.18
flink sql connector elasticsearch7 : 3.0.1-1.17
我尝试的是:
- attach setBulkFlushInterval(30000) option Because I found this log WARN org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter - Writer was closed before all records were acknowledged by Elasticsearch.
but another error occurs
Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 200 OK}
- Clone original code repository
My code exactly same with this repository
https://github.com/airscholar/FlinkCommerce
https://www.youtube.com/watch?v=deepQRXnniM
So I clone this, try execute, but same problem happens. On his youtube this problem doesn t happen.
我可以做些什么?