English 中文(简体)
Kafka
原标题:Rest API for Kafka

I need to write a REST API for kafka which can read or write data from consumer/producer respectively. How can I do so?

问题回答

这是来自秘密的AtentAPIC(Rest Proxy)法样本。 我必须加以分类,这样它可能含有一些错误。 我希望这能帮助你。

( Producer using REST API written in Python )

import requests
import base64
import json

url = "http://restproxy:8082/topics/my_topic"
headers = {
"Content-Type" : "application/vnd.kafka.binary.v1 + json"
   }
# Create one or more messages
payload = {"records":
       [{
           "key":base64.b64encode("firstkey"),
           "value":base64.b64encode("firstvalue")
       }]}
# Send the message
r = requests.post(url, data=json.dumps(payload), headers=headers)
if r.status_code != 200:
   print "Status Code: " + str(r.status_code)
   print r.text

<>strong>(消费者,使用Amart制成的书刊),

import requests
import base64
import json
import sys

#Base URL for interacting with REST server
baseurl = "http://restporxy:8082/consumers/group1"

#Create the Consumer instance
print "Creating consumer instance"
payload {
    "format": "binary"
    }
headers = {
"Content-Type" : "application/vnd.kafka.v1+json"
    }
r = requests.post(baseurl, data=json.dumps(payload), headers=headers)

if r.status_code !=200:
    print "Status Code: " + str(r.status_code)
    print r.text
    sys.exit("Error thrown while creating consumer")

# Base URI is used to identify the consumer instance
base_uri = r.json()["base_uri"]

#Get the messages from the consumer
headers = {
    "Accept" : "application/vnd.kafka.binary.v1 + json"
    }

# Request messages for the instance on the Topic
r = requests.get(base_uri + "/topics/my_toopic", headers = headers, timeout =20)

if r.status_code != 200: 
    print "Status Code: " + str(r.status_code)
    print r.text
    sys.exit("Error thrown while getting message")

#Output all messages
for message in r.json():
    if message["key"] is not None:
        print "Message Key:" + base64.b64decode(message["key"])
    print "Message Value:" + base64.b64decode(message["value"])

# When we re done, delete the consumer
headers = {
    "Accept" : "application/vnd.kafka.v1+json"
    }

r = requests.delete(base_uri, headers=headers)

if r.status_code != 204: 
    print "Status Code: " + str(r.status_code)
    print r.text

I guess that your question was more about how to write the "server" REST API interface not the client-side (which at the end is just making HTTP requests). You can use the Strimzi HTTP bridge for example (https://github.com/strimzi/strimzi-kafka-bridge) which works stand-alone or even in Kubernetes is you are willing to deploy the cluster there (then you can use Strimzi project, https://strimzi.io/).





相关问题
Allow RESTful DELETE method in asp.net mvc?

im currently setting up asp.net to accept DELETE http verb in the application. However, when i send "DELETE /posts/delete/1" i always get a 405 Method not allow error. I tried to take a look at ...

Most appropriate API for URL shortening service

I ve just finished an online service for shortening URLs (in php5 with Zend Framework); you can enter an URL and you get an short URL (like tinyurl and such sites). I m thinking about the API for ...

Use HTTPClient or HttpUrlConnection? [closed]

We re implementing a REST client on JRE 1.4. Seems two good options for a client REST framework are HttpClient and HttpUrlConnection. Is there a reason to use HttpClient over the JRE s ...

Why can t I find the truststore for an SSL handshake?

I m using the Spring RESTTemplate on the client side to make calls to a REST endpoint. The client in this case is a Spring app and Tomcat is the servlet container. I m running into issues making a ...

Which Http redirects status code to use?

friendfeed.com uses 302. bit.ly uses 301. I had decided to use 303. Do they behave differently in terms of support by browsers ?

Three Step Buyonline The RESTful way

We are re-developing our buyonline functionality and we are doing it the RESTful way. The process is a three step one and the customer is asked to enter data at each step. Let s say the three URL s ...

热门标签