Commit 9202e523 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Add one more parameter to Ask TIDs, to specify a wanted partition explicitly.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@188 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 330e087f
...@@ -9,7 +9,7 @@ from random import shuffle ...@@ -9,7 +9,7 @@ from random import shuffle
from neo.client.mq import MQ from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode, StorageNode from neo.node import NodeManager, MasterNode, StorageNode
from neo.connection import MTClientConnection from neo.connection import MTClientConnection
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, \ from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
TEMPORARILY_DOWN_STATE, \ TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
...@@ -698,7 +698,7 @@ class Application(object): ...@@ -698,7 +698,7 @@ class Application(object):
try: try:
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askTIDs(msg_id, first, last) p.askTIDs(msg_id, first, last, INVALID_PARTITION)
conn.addPacket(p) conn.addPacket(p)
finally: finally:
conn.unlock() conn.unlock()
......
...@@ -234,7 +234,7 @@ class EventHandler(object): ...@@ -234,7 +234,7 @@ class EventHandler(object):
serial_end, compression, checksum, data): serial_end, compression, checksum, data):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAskTIDs(self, conn, packet, first, last): def handleAskTIDs(self, conn, packet, first, last, partition):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAnswerTIDs(self, conn, packet, tid_list): def handleAnswerTIDs(self, conn, packet, tid_list):
......
...@@ -211,6 +211,7 @@ INVALID_TID = '\0\0\0\0\0\0\0\0' ...@@ -211,6 +211,7 @@ INVALID_TID = '\0\0\0\0\0\0\0\0'
INVALID_SERIAL = '\0\0\0\0\0\0\0\0' INVALID_SERIAL = '\0\0\0\0\0\0\0\0'
INVALID_OID = '\0\0\0\0\0\0\0\0' INVALID_OID = '\0\0\0\0\0\0\0\0'
INVALID_PTID = '\0\0\0\0\0\0\0\0' INVALID_PTID = '\0\0\0\0\0\0\0\0'
INVALID_PARTITION = 0xffffffff
class ProtocolError(Exception): pass class ProtocolError(Exception): pass
...@@ -575,10 +576,10 @@ class Packet(object): ...@@ -575,10 +576,10 @@ class Packet(object):
compression, checksum, len(data)) + data compression, checksum, len(data)) + data
return self return self
def askTIDs(self, msg_id, first, last): def askTIDs(self, msg_id, first, last, partition):
self._id = msg_id self._id = msg_id
self._type = ASK_TIDS self._type = ASK_TIDS
self._body = pack('!QQ', first, last) self._body = pack('!QQL', first, last, partition)
return self return self
def answerTIDs(self, msg_id, tid_list): def answerTIDs(self, msg_id, tid_list):
...@@ -1025,10 +1026,10 @@ class Packet(object): ...@@ -1025,10 +1026,10 @@ class Packet(object):
def _decodeAskTIDs(self): def _decodeAskTIDs(self):
try: try:
first, last = unpack('!QQ', self._body) first, last, partition = unpack('!QQL', self._body)
except: except:
raise ProtocolError(self, 'invalid ask tids') raise ProtocolError(self, 'invalid ask tids')
return first, last return first, last, partition
decode_table[ASK_TIDS] = _decodeAskTIDs decode_table[ASK_TIDS] = _decodeAskTIDs
def _decodeAnswerTIDs(self): def _decodeAnswerTIDs(self):
......
...@@ -192,7 +192,7 @@ class StorageEventHandler(EventHandler): ...@@ -192,7 +192,7 @@ class StorageEventHandler(EventHandler):
def handleAskObject(self, conn, packet, oid, serial, tid): def handleAskObject(self, conn, packet, oid, serial, tid):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAskTIDs(self, conn, packet, first, last): def handleAskTIDs(self, conn, packet, first, last, partition):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAskObjectHistory(self, conn, packet, oid, first, last): def handleAskObjectHistory(self, conn, packet, oid, first, last):
......
...@@ -2,6 +2,7 @@ import logging ...@@ -2,6 +2,7 @@ import logging
from neo.storage.handler import StorageEventHandler from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, INVALID_SERIAL, INVALID_TID, \ from neo.protocol import INVALID_UUID, INVALID_SERIAL, INVALID_TID, \
INVALID_PARTITION, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.util import dump from neo.util import dump
...@@ -313,7 +314,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -313,7 +314,7 @@ class OperationEventHandler(StorageEventHandler):
p.oidNotFound(packet.getId(), '%s does not exist' % dump(oid)) p.oidNotFound(packet.getId(), '%s does not exist' % dump(oid))
conn.addPacket(p) conn.addPacket(p)
def handleAskTIDs(self, conn, packet, first, last): def handleAskTIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return TIDs only # This method is complicated, because I must return TIDs only
# about usable partitions assigned to me. # about usable partitions assigned to me.
if first >= last: if first >= last:
...@@ -323,14 +324,17 @@ class OperationEventHandler(StorageEventHandler): ...@@ -323,14 +324,17 @@ class OperationEventHandler(StorageEventHandler):
app = self.app app = self.app
# Collect all usable partitions for me. if partition == INVALID_PARTITION:
getCellList = app.pt.getCellList # Collect all usable partitions for me.
partition_list = [] getCellList = app.pt.getCellList
for offset in xrange(app.num_partitions): partition_list = []
for cell in getCellList(offset, True): for offset in xrange(app.num_partitions):
if cell.getUUID() == app.uuid: for cell in getCellList(offset, True):
partition_list.append(offset) if cell.getUUID() == app.uuid:
break partition_list.append(offset)
break
else:
partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first, tid_list = app.dm.getTIDList(first, last - first,
app.num_partitions, partition_list) app.num_partitions, partition_list)
......
...@@ -183,7 +183,7 @@ class Replicator(object): ...@@ -183,7 +183,7 @@ class Replicator(object):
msg_id = self.current_connection.getNextId() msg_id = self.current_connection.getNextId()
p = Packet() p = Packet()
p.askTIDs(msg_id, 0, 1000) p.askTIDs(msg_id, 0, 1000, self.current_partition.getRID())
self.current_connection.addPacket(p) self.current_connection.addPacket(p)
self.current_connection.expectMessage(timeout = 300) self.current_connection.expectMessage(timeout = 300)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment