replication 6.69 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
#! /usr/bin/env python2.4

import sys
import time
import traceback
import transaction
from persistent import Persistent
from ZODB.tests.StorageTestBase import zodb_pickle

from neo.util import p64
from neo.protocol import CellStates
from neo.tests.benchmark import BenchmarkRunner
from neo.tests.functional import NEOCluster

PARTITIONS = 16
TRANSACTIONS = 1000
OBJECTS = 10000
REVISIONS = 5
OBJECT_SIZE = 100
CUT_AT = 0

def humanize(size):
    units = ['%.2f KB', '%.2f MB', '%2.f GB']
    unit = '%d bytes'
    while size >= 1024 and units:
        size /= 1024.0
        unit, units = units[0], units[1:]
    return unit % size

class DummyObject(Persistent):

    def __init__(self, data):
        self._data = None

class ReplicationBenchmark(BenchmarkRunner):
    """ Test replication process """

    def add_options(self, parser):
        add_option = parser.add_option
        add_option('', '--transactions', help="Total number of transactions")
        add_option('', '--objects', help="Total number of objects")
        add_option('', '--revisions', help="Number of revisions per object")
        add_option('', '--partitions', help="Number of partition")
        add_option('', '--object-size', help="Size of an object revision")
        add_option('', '--cut-at', help="Populate the destination up to this %")

    def load_options(self, options, args):
        transactions = int(options.transactions or TRANSACTIONS)
        objects = int(options.objects or OBJECTS)
        revisions = int(options.revisions or REVISIONS)
        if (objects * revisions) % transactions != 0:
            sys.exit('Invalid parameters (need multiples)')
        return dict(
            partitions = int(options.partitions or PARTITIONS),
            transactions = transactions,
            objects = objects,
            revisions = revisions,
            object_size = int(options.object_size or OBJECT_SIZE),
            cut_at = int(options.cut_at or CUT_AT),
        )

    def time_it(self, method, *args, **kw):
        start = time.time()
        method(*args, **kw)
        return time.time() - start

    def start(self):
        config = self._config
        # build a neo
        neo = NEOCluster(
            db_list=['neot_replication_%d' % i for i in xrange(2)],
            clear_databases=True,
            partitions=config.partitions,
            replicas=1,
            master_node_count=1,
            verbose=False,
        )
        neo.start()
        p_time = r_time = None
        content = ''
        try:
            try:
                p_time = self.time_it(self.populate, neo)
                neo.expectOudatedCells(self._config.partitions)
85 86 87
                storage = neo.getStorageProcessList()[-1]
                storage.start()
                neo.expectRunning(storage, delay=0.1)
88
                print "Source storage populated in %.3f secs" % p_time
89
                r_time = self.time_it(self.replicate, neo) + 0.1
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
            except Exception:
                content = ''.join(traceback.format_exc())
        finally:
            neo.stop()
        return self.buildReport(p_time, r_time), content

    def replicate(self, neo):
        def number_of_oudated_cell():
            row_list = neo.neoctl.getPartitionRowList()[1]
            number_of_oudated = 0
            for row in row_list:
                for cell in row[1]:
                    if cell[1] == CellStates.OUT_OF_DATE:
                        number_of_oudated += 1
            return number_of_oudated
        start_time = time.time()
        end_time = time.time() + 3600
        while time.time() <= end_time and number_of_oudated_cell() > 0:
            time.sleep(1)
        if number_of_oudated_cell() > 0:
            raise Exception('Replication takes too long')

    def buildReport(self, p_time, r_time):
        add_status = self.add_status
        cut_at = self._config.cut_at
        objects = self._config.objects
        revisions = self._config.revisions
        object_size = self._config.object_size
        partitions = self._config.partitions
        objects_revisions = revisions * objects
        objects_space = objects_revisions * object_size
        add_status('Partitions', self._config.partitions)
        add_status('Transactions', self._config.transactions)
        add_status('Objects', objects)
        add_status('Revisions', revisions)
        add_status('Cut at', '%d%%' % cut_at)
        add_status('Object size', humanize(object_size))
        add_status('Objects space', humanize(objects_space))
        if p_time is None:
            return 'Populate failed'
        add_status('Population time', '%.3f secs' % p_time)
        if r_time is None:
            return 'Replication failed'
        bandwidth = objects_space / r_time
        add_status('Replication time', '%.3f secs' % r_time)
        add_status('Time per partition', '%.3f secs' % (r_time / partitions))
        add_status('Time per object', '%.3f secs' % (r_time / objects_revisions))
        add_status('Global bandwidth', '%s/sec' % humanize(bandwidth))
        summary = "%d%% of %s replicated at %s/sec" % (100 - cut_at,
            humanize(objects_space), humanize(bandwidth))
        return summary

    def populate(self, neo):
        print "Start populate"
        db, conn = neo.getZODBConnection(compress=False)
        storage = conn._storage
        cut_at = self._config.cut_at
        objects = self._config.objects
        transactions = self._config.transactions
        revisions = self._config.revisions
        objects_turn = objects / transactions
        objects_per_transaction = (objects * revisions) / transactions

        objects_revisions = objects * revisions
        base_oid = 1
        data = zodb_pickle(DummyObject("-" * self._config.object_size))
        prev = p64(0)
        progress = 0
        cutted = False
        for tidx in xrange(transactions):
            if not cutted and (100 * progress) / objects_revisions == cut_at:
                print "Cut at %d%%" % (cut_at, )
                neo.getStorageProcessList()[-1].stop()
                cutted = True
            txn = transaction.Transaction()
            txn.description = "Transaction %s" % tidx
            # print txn.description
            storage.tpc_begin(txn)
            for oidx in xrange(objects_per_transaction):
                progress += 1
                oid = base_oid + oidx
                storage.store(p64(oid), prev, data, '', txn)
                # print "  OID %d" % oid
            storage.tpc_vote(txn)
            prev = storage.tpc_finish(txn)
            if tidx % objects_turn == 1:
                base_oid += objects_per_transaction
        if not cutted:
            assert cut_at == 100
            neo.getStorageProcessList()[-1].stop()

if __name__ == "__main__":
    ReplicationBenchmark().run()