English 中文(简体)
反应有效载荷未使用Inyncio/aiohttp://www.un.org。
原标题:Response payload is not completed using asyncio/aiohttp

缩略语 我已进行了大量成功的审判,认为审判工作十分完美,但我最近偶尔会收到以下错误,并会因网上几乎没有这方面的报告/解决办法而损失:

aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed

Sam:

import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree

#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
                password=password,
                security_token=securityToken,
                organizationId=organizationId)
sfAPIURL =  https://myinstance.salesforce.com/services/async/45.0/job/ 
sfDataPath =  C:/Salesforce/Data/ 

#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary = 
{ Account : { job :
                { batch : { id :  8596P00000ihwpJulI , results : [ 8596V00000Bo9iU ], state :  Completed },
              id :  8752R00000iUjtReqS },
              soql :  select Id,Name from Account },

  Contact : { job :
                { batch : { id :  9874G00000iJnBbVgg , results : [ 7410t00000Ao9vp ], state :  Completed },
              id :  8800o00000POIkLlLa },
              soql :  select Id,Name from Contact }}

async def retrieveResults(jobId, batchId, sfObject):
    headers = {"X-SFDC-Session": sf.session_id,  Content-Encoding :  gzip }
    async with aiohttp.ClientSession() as session:
        async with session.get(url=f {sfAPIURL}{jobId}/batch/{batchId}/result , headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data) #list of batch results
            for resultID in batchResults:
                async with session.get(url=f {sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text} , headers=headers, timeout=None) as r:
                    async with aiofiles.open(f {sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv ,  wb ) as outfile: #save in temporary file for manipulation later
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)

async def asyncDownload():
    await asyncio.gather(*[retrieveResults(objectDictionary[sfObject][ job ][ id ], objectDictionary[sfObject][ job ][ batch ][ id ], sfObject) for sfObject in objectDictionary])

if __name__ == "__main__":
    asyncio.run(asyncDownload())

追捕(超重线赢得比照法,上面有):

追溯(最近发出的呼吁):

File "C:Codesalesforce.py", line 252, in asyncio.run(asyncDownload())

File "C:Program FilesPython37libasyncio unners.py", line 43, in run return loop.run_until_complete(main)

File "C:Program FilesPython37libasyncioase_events.py", line 584, in run_until_complete return future.result()

File "C:Codesalesforce.py", line 241, in asyncDownload await asyncio.gather(*[retrieveResults(objectDictionary[sfObject][ job ][ id ], objectDictionary[sfObject][ job ][ batch ][ id ], sfObject) for sfObject in objectDictionary])

File "C:Codesalesforce.py", line 183, in retrieveResults chunk = await r.content.read(81920)

File "C:Program FilesPython37libsite-packagesaiohttpstreams.py", line 369, in read await self._wait( read )

File "C:Program FilesPython37libsite-packagesaiohttpstreams.py", line 297, in _wait await waiter

aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed

问题的根源似乎是从<代码>r.content.read(81920)开始,这应当把数据输入81920英亩的丘陵,但只要我能找到,就能够做到。

我认为,这是我最终面临的一个网络问题,因为在这一服务机上还有与外部来源相关的其他小型工作,在这项工作进行期间,这些工作毫无争议地完成。 是否有任何想法在座?

Thank you!

http://www.ohchr.org。

I ve tried iter_any() instead of read() and still get the same error...

async for data in r.content.iter_any():
    await outfile.write(data)

I ve tried readline() and still get the same error...

async for line in r.content.readline():
    await outfile.write(line)

I have since worked in some retry functionality in the error handling piece of the code (not included in the original problem), which ultimately allows the jobs to complete. The payload errors are still happening, and that is still the main issue, but retrying the downloads has been a successful workaround. The problem still persists if anyone is able to provide further information.

问题回答

原文照搬:

                    ...
                    while True:
                        chunk = await r.content.read(81920)
                        await asyncio.sleep(0)
                        if not chunk:
                            break
                        await outfile.write(chunk)
                    ...

在亚马孙兰巴达,我有这个错误(有人根据要求推翻了这一错误)。

等同社会任务。

解决办法,确定建筑群:

FROM amazonlinux:2 AS 

纽约总部

FROM lambci/lambda:build-python3.8 

I guess the problem is that .so files or something at a lower level, used internally by the library 纽约总部 manage coroutines, is not compatible with the lambda environment. Hence, building in the right docker base you solve the issue.

"The event loop is already running," is a common issue when using asyncio.run within a script that is already running within an event loop. To resolve this, you can use asyncio.create_task to create and run your asynchronous tasks.

  • Modify async def retrieveResults :
    Added session as a parameter. Used async with session: instead of creating a new ClientSession.
    async def retrieveResults(session, jobId, batchId, sfObject):
        headers = {"X-SFDC-Session": sf.session_id,  Content-Encoding :  gzip }
        
        async with session.get(url=f {sfAPIURL}{jobId}/batch/{batchId}/result , headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data)  # list of batch results
    
            for resultID in batchResults:
                async with session.get(
                    url=f {sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text} ,
                    headers=headers,
                    timeout=None
                ) as r:
                    async with aiofiles.open(
                        f {sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv ,
                         wb 
                    ) as outfile:
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)
  • Modify async def downloadResults:
    Used async with aiohttp.ClientSession() as session: to create a session within the context of the function. Passed session to the retrieveResults function.
    async def asyncDownload():
        async with aiohttp.ClientSession() as session:
            tasks = [
                retrieveResults(session, objectDictionary[sfObject][ job ][ id ], objectDictionary[sfObject][ job ][ batch ][ id ], sfObject)
                for sfObject in objectDictionary
            ]
            await asyncio.gather(*tasks)
    
    if __name__ == "__main__":
        asyncio.run(asyncDownload())




相关问题
Can Django models use MySQL functions?

Is there a way to force Django models to pass a field to a MySQL function every time the model data is read or loaded? To clarify what I mean in SQL, I want the Django model to produce something like ...

An enterprise scheduler for python (like quartz)

I am looking for an enterprise tasks scheduler for python, like quartz is for Java. Requirements: Persistent: if the process restarts or the machine restarts, then all the jobs must stay there and ...

How to remove unique, then duplicate dictionaries in a list?

Given the following list that contains some duplicate and some unique dictionaries, what is the best method to remove unique dictionaries first, then reduce the duplicate dictionaries to single ...

What is suggested seed value to use with random.seed()?

Simple enough question: I m using python random module to generate random integers. I want to know what is the suggested value to use with the random.seed() function? Currently I am letting this ...

How can I make the PyDev editor selectively ignore errors?

I m using PyDev under Eclipse to write some Jython code. I ve got numerous instances where I need to do something like this: import com.work.project.component.client.Interface.ISubInterface as ...

How do I profile `paster serve` s startup time?

Python s paster serve app.ini is taking longer than I would like to be ready for the first request. I know how to profile requests with middleware, but how do I profile the initialization time? I ...

Pragmatically adding give-aways/freebies to an online store

Our business currently has an online store and recently we ve been offering free specials to our customers. Right now, we simply display the special and give the buyer a notice stating we will add the ...

Converting Dictionary to List? [duplicate]

I m trying to convert a Python dictionary into a Python list, in order to perform some calculations. #My dictionary dict = {} dict[ Capital ]="London" dict[ Food ]="Fish&Chips" dict[ 2012 ]="...

热门标签