|
|
@ -152,7 +152,36 @@ class Client: |
|
|
|
return buffer |
|
|
|
return buffer |
|
|
|
|
|
|
|
|
|
|
|
def bulk_write(self, cs, cmd, bulk, id=None, pld=None): |
|
|
|
def bulk_write(self, cs, cmd, bulk, id=None, pld=None): |
|
|
|
""" Perform a bulk write. If cs is None, cmd is used as message type """ |
|
|
|
""" |
|
|
|
|
|
|
|
Perform a bulk write. If cs is None, cmd is used as message type. |
|
|
|
|
|
|
|
bulk is the data to write. |
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
offer = self.query(cs=cs, cmd=cmd, id=id, pld=pld) |
|
|
|
|
|
|
|
if offer.type != gex.MSG_BULK_WRITE_OFFER: |
|
|
|
|
|
|
|
raise Exception("Bulk write rejected! %s" % offer.data.decode('utf-8')) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# parse the offer |
|
|
|
|
|
|
|
pp = PayloadParser(offer.data) |
|
|
|
|
|
|
|
max_size = pp.u32() |
|
|
|
|
|
|
|
max_chunk = pp.u32() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
total = len(bulk) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if max_size < total: |
|
|
|
|
|
|
|
# announce we changed our mind and won't write anything |
|
|
|
|
|
|
|
self.send_raw(type=gex.MSG_BULK_ABORT, id=offer.id) |
|
|
|
|
|
|
|
raise Exception("Bulk write not possible, not enough space (needed %d bytes, max %d)" % (total, max_size)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
at = 0 |
|
|
|
|
|
|
|
while at < total: |
|
|
|
|
|
|
|
chunklen = min(max_chunk, total - at) |
|
|
|
|
|
|
|
# Send data |
|
|
|
|
|
|
|
rv = self.query_raw(type=gex.MSG_BULK_DATA if chunklen == max_chunk else gex.MSG_BULK_END, |
|
|
|
|
|
|
|
id=offer.id, |
|
|
|
|
|
|
|
pld=bulk[at:at+chunklen]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if rv.type != gex.MSG_SUCCESS: |
|
|
|
|
|
|
|
raise Exception("Unexpected bulk frame type %d" % rv.type) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
at += chunklen |
|
|
|