Commit 10e27752 authored by Dave Kleikamp's avatar Dave Kleikamp

JFS: Implement multiple commit threads

For scalability, jfs now allows specifying the number of commit
threads with the module parameter commit_threads=<num>.  It defaults
to the number of processors.

You can also change the number of txBlocks and txLocks allocated with
the nTxBlock and nTxLock module parameters.  "modinfo jfs.ko" for
specifics.
parent 25c30df1
...@@ -152,6 +152,11 @@ struct jfs_sb_info { ...@@ -152,6 +152,11 @@ struct jfs_sb_info {
pxd_t ait2; /* pxd describing AIT copy */ pxd_t ait2; /* pxd describing AIT copy */
char uuid[16]; /* 128-bit uuid for volume */ char uuid[16]; /* 128-bit uuid for volume */
char loguuid[16]; /* 128-bit uuid for log */ char loguuid[16]; /* 128-bit uuid for log */
/*
* commit_state is used for synchronization of the jfs_commit
* threads. It is protected by LAZY_LOCK().
*/
int commit_state; /* commit state */
/* Formerly in ipimap */ /* Formerly in ipimap */
uint gengen; /* inode generation generator*/ uint gengen; /* inode generation generator*/
uint inostamp; /* shows inode belongs to fileset*/ uint inostamp; /* shows inode belongs to fileset*/
...@@ -164,6 +169,9 @@ struct jfs_sb_info { ...@@ -164,6 +169,9 @@ struct jfs_sb_info {
uint p_state; /* state prior to going no integrity */ uint p_state; /* state prior to going no integrity */
}; };
/* jfs_sb_info commit_state */
#define IN_LAZYCOMMIT 1
static inline struct jfs_inode_info *JFS_IP(struct inode *inode) static inline struct jfs_inode_info *JFS_IP(struct inode *inode)
{ {
return list_entry(inode, struct jfs_inode_info, vfs_inode); return list_entry(inode, struct jfs_inode_info, vfs_inode);
......
...@@ -171,6 +171,7 @@ DECLARE_MUTEX(jfs_log_sem); ...@@ -171,6 +171,7 @@ DECLARE_MUTEX(jfs_log_sem);
extern void txLazyUnlock(struct tblock * tblk); extern void txLazyUnlock(struct tblock * tblk);
extern int jfs_stop_threads; extern int jfs_stop_threads;
extern struct completion jfsIOwait; extern struct completion jfsIOwait;
extern int jfs_tlocks_low;
/* /*
* forward references * forward references
...@@ -524,12 +525,7 @@ lmWriteRecord(struct jfs_log * log, struct tblock * tblk, struct lrd * lrd, ...@@ -524,12 +525,7 @@ lmWriteRecord(struct jfs_log * log, struct tblock * tblk, struct lrd * lrd,
tblk->eor = log->eor; tblk->eor = log->eor;
/* enqueue transaction to commit queue */ /* enqueue transaction to commit queue */
tblk->cqnext = NULL; list_add_tail(&tblk->cqueue, &log->cqueue);
if (log->cqueue.head) {
log->cqueue.tail->cqnext = tblk;
log->cqueue.tail = tblk;
} else
log->cqueue.head = log->cqueue.tail = tblk;
LOGGC_UNLOCK(log); LOGGC_UNLOCK(log);
} }
...@@ -587,7 +583,10 @@ static int lmNextPage(struct jfs_log * log) ...@@ -587,7 +583,10 @@ static int lmNextPage(struct jfs_log * log)
* write or queue the full page at the tail of write queue * write or queue the full page at the tail of write queue
*/ */
/* get the tail tblk on commit queue */ /* get the tail tblk on commit queue */
tblk = log->cqueue.tail; if (list_empty(&log->cqueue))
tblk = NULL;
else
tblk = list_entry(log->cqueue.prev, struct tblock, cqueue);
/* every tblk who has COMMIT record on the current page, /* every tblk who has COMMIT record on the current page,
* and has not been committed, must be on commit queue * and has not been committed, must be on commit queue
...@@ -688,8 +687,9 @@ int lmGroupCommit(struct jfs_log * log, struct tblock * tblk) ...@@ -688,8 +687,9 @@ int lmGroupCommit(struct jfs_log * log, struct tblock * tblk)
if (tblk->xflag & COMMIT_LAZY) if (tblk->xflag & COMMIT_LAZY)
tblk->flag |= tblkGC_LAZY; tblk->flag |= tblkGC_LAZY;
if ((!(log->cflag & logGC_PAGEOUT)) && log->cqueue.head && if ((!(log->cflag & logGC_PAGEOUT)) && (!list_empty(&log->cqueue)) &&
(!(tblk->xflag & COMMIT_LAZY) || test_bit(log_FLUSH, &log->flag))) { (!(tblk->xflag & COMMIT_LAZY) || test_bit(log_FLUSH, &log->flag)
|| jfs_tlocks_low)) {
/* /*
* No pageout in progress * No pageout in progress
* *
...@@ -753,7 +753,7 @@ static void lmGCwrite(struct jfs_log * log, int cant_write) ...@@ -753,7 +753,7 @@ static void lmGCwrite(struct jfs_log * log, int cant_write)
struct logpage *lp; struct logpage *lp;
int gcpn; /* group commit page number */ int gcpn; /* group commit page number */
struct tblock *tblk; struct tblock *tblk;
struct tblock *xtblk; struct tblock *xtblk = NULL;
/* /*
* build the commit group of a log page * build the commit group of a log page
...@@ -762,15 +762,16 @@ static void lmGCwrite(struct jfs_log * log, int cant_write) ...@@ -762,15 +762,16 @@ static void lmGCwrite(struct jfs_log * log, int cant_write)
* transactions with COMMIT records on the same log page. * transactions with COMMIT records on the same log page.
*/ */
/* get the head tblk on the commit queue */ /* get the head tblk on the commit queue */
tblk = xtblk = log->cqueue.head; gcpn = list_entry(log->cqueue.next, struct tblock, cqueue)->pn;
gcpn = tblk->pn;
list_for_each_entry(tblk, &log->cqueue, cqueue) {
if (tblk->pn != gcpn)
break;
while (tblk && tblk->pn == gcpn) {
xtblk = tblk; xtblk = tblk;
/* state transition: (QUEUE, READY) -> COMMIT */ /* state transition: (QUEUE, READY) -> COMMIT */
tblk->flag |= tblkGC_COMMIT; tblk->flag |= tblkGC_COMMIT;
tblk = tblk->cqnext;
} }
tblk = xtblk; /* last tblk of the page */ tblk = xtblk; /* last tblk of the page */
...@@ -816,7 +817,7 @@ static void lmPostGC(struct lbuf * bp) ...@@ -816,7 +817,7 @@ static void lmPostGC(struct lbuf * bp)
unsigned long flags; unsigned long flags;
struct jfs_log *log = bp->l_log; struct jfs_log *log = bp->l_log;
struct logpage *lp; struct logpage *lp;
struct tblock *tblk; struct tblock *tblk, *temp;
//LOGGC_LOCK(log); //LOGGC_LOCK(log);
spin_lock_irqsave(&log->gclock, flags); spin_lock_irqsave(&log->gclock, flags);
...@@ -826,7 +827,9 @@ static void lmPostGC(struct lbuf * bp) ...@@ -826,7 +827,9 @@ static void lmPostGC(struct lbuf * bp)
* remove/wakeup transactions from commit queue who were * remove/wakeup transactions from commit queue who were
* group committed with the current log page * group committed with the current log page
*/ */
while ((tblk = log->cqueue.head) && (tblk->flag & tblkGC_COMMIT)) { list_for_each_entry_safe(tblk, temp, &log->cqueue, cqueue) {
if (!(tblk->flag & tblkGC_COMMIT))
break;
/* if transaction was marked GC_COMMIT then /* if transaction was marked GC_COMMIT then
* it has been shipped in the current pageout * it has been shipped in the current pageout
* and made it to disk - it is committed. * and made it to disk - it is committed.
...@@ -836,11 +839,8 @@ static void lmPostGC(struct lbuf * bp) ...@@ -836,11 +839,8 @@ static void lmPostGC(struct lbuf * bp)
tblk->flag |= tblkGC_ERROR; tblk->flag |= tblkGC_ERROR;
/* remove it from the commit queue */ /* remove it from the commit queue */
log->cqueue.head = tblk->cqnext; list_del(&tblk->cqueue);
if (log->cqueue.head == NULL)
log->cqueue.tail = NULL;
tblk->flag &= ~tblkGC_QUEUE; tblk->flag &= ~tblkGC_QUEUE;
tblk->cqnext = 0;
if (tblk == log->flush_tblk) { if (tblk == log->flush_tblk) {
/* we can stop flushing the log now */ /* we can stop flushing the log now */
...@@ -893,9 +893,9 @@ static void lmPostGC(struct lbuf * bp) ...@@ -893,9 +893,9 @@ static void lmPostGC(struct lbuf * bp)
* select the latest ready transaction as new group leader and * select the latest ready transaction as new group leader and
* wake her up to lead her group. * wake her up to lead her group.
*/ */
if ((tblk = log->cqueue.head) && if ((!list_empty(&log->cqueue)) &&
((log->gcrtc > 0) || (tblk->bp->l_wqnext != NULL) || ((log->gcrtc > 0) || (tblk->bp->l_wqnext != NULL) ||
test_bit(log_FLUSH, &log->flag))) test_bit(log_FLUSH, &log->flag) || jfs_tlocks_low))
/* /*
* Call lmGCwrite with new group leader * Call lmGCwrite with new group leader
*/ */
...@@ -1288,7 +1288,7 @@ int lmLogInit(struct jfs_log * log) ...@@ -1288,7 +1288,7 @@ int lmLogInit(struct jfs_log * log)
init_waitqueue_head(&log->syncwait); init_waitqueue_head(&log->syncwait);
log->cqueue.head = log->cqueue.tail = NULL; INIT_LIST_HEAD(&log->cqueue);
log->flush_tblk = NULL; log->flush_tblk = NULL;
log->count = 0; log->count = 0;
...@@ -1535,7 +1535,7 @@ int lmLogClose(struct super_block *sb) ...@@ -1535,7 +1535,7 @@ int lmLogClose(struct super_block *sb)
void jfs_flush_journal(struct jfs_log *log, int wait) void jfs_flush_journal(struct jfs_log *log, int wait)
{ {
int i; int i;
struct tblock *target; struct tblock *target = NULL;
/* jfs_write_inode may call us during read-only mount */ /* jfs_write_inode may call us during read-only mount */
if (!log) if (!log)
...@@ -1545,13 +1545,12 @@ void jfs_flush_journal(struct jfs_log *log, int wait) ...@@ -1545,13 +1545,12 @@ void jfs_flush_journal(struct jfs_log *log, int wait)
LOGGC_LOCK(log); LOGGC_LOCK(log);
target = log->cqueue.head; if (!list_empty(&log->cqueue)) {
if (target) {
/* /*
* This ensures that we will keep writing to the journal as long * This ensures that we will keep writing to the journal as long
* as there are unwritten commit records * as there are unwritten commit records
*/ */
target = list_entry(log->cqueue.prev, struct tblock, cqueue);
if (test_bit(log_FLUSH, &log->flag)) { if (test_bit(log_FLUSH, &log->flag)) {
/* /*
...@@ -1602,16 +1601,16 @@ void jfs_flush_journal(struct jfs_log *log, int wait) ...@@ -1602,16 +1601,16 @@ void jfs_flush_journal(struct jfs_log *log, int wait)
* If there was recent activity, we may need to wait * If there was recent activity, we may need to wait
* for the lazycommit thread to catch up * for the lazycommit thread to catch up
*/ */
if (log->cqueue.head || !list_empty(&log->synclist)) { if ((!list_empty(&log->cqueue)) || !list_empty(&log->synclist)) {
for (i = 0; i < 800; i++) { /* Too much? */ for (i = 0; i < 800; i++) { /* Too much? */
current->state = TASK_INTERRUPTIBLE; current->state = TASK_INTERRUPTIBLE;
schedule_timeout(HZ / 4); schedule_timeout(HZ / 4);
if ((log->cqueue.head == NULL) && if (list_empty(&log->cqueue) &&
list_empty(&log->synclist)) list_empty(&log->synclist))
break; break;
} }
} }
assert(log->cqueue.head == NULL); assert(list_empty(&log->cqueue));
assert(list_empty(&log->synclist)); assert(list_empty(&log->synclist));
clear_bit(log_FLUSH, &log->flag); clear_bit(log_FLUSH, &log->flag);
} }
......
...@@ -398,10 +398,7 @@ struct jfs_log { ...@@ -398,10 +398,7 @@ struct jfs_log {
/* commit */ /* commit */
uint cflag; /* 4: */ uint cflag; /* 4: */
struct { /* 8: FIFO commit queue header */ struct list_head cqueue; /* FIFO commit queue */
struct tblock *head;
struct tblock *tail;
} cqueue;
struct tblock *flush_tblk; /* tblk we're waiting on for flush */ struct tblock *flush_tblk; /* tblk we're waiting on for flush */
int gcrtc; /* 4: GC_READY transaction count */ int gcrtc; /* 4: GC_READY transaction count */
struct tblock *gclrt; /* 4: latest GC_READY transaction */ struct tblock *gclrt; /* 4: latest GC_READY transaction */
......
...@@ -48,6 +48,8 @@ ...@@ -48,6 +48,8 @@
#include <linux/smp_lock.h> #include <linux/smp_lock.h>
#include <linux/completion.h> #include <linux/completion.h>
#include <linux/suspend.h> #include <linux/suspend.h>
#include <linux/module.h>
#include <linux/moduleparam.h>
#include "jfs_incore.h" #include "jfs_incore.h"
#include "jfs_filsys.h" #include "jfs_filsys.h"
#include "jfs_metapage.h" #include "jfs_metapage.h"
...@@ -61,25 +63,22 @@ ...@@ -61,25 +63,22 @@
* transaction management structures * transaction management structures
*/ */
static struct { static struct {
/* tblock */
int freetid; /* index of a free tid structure */ int freetid; /* index of a free tid structure */
wait_queue_head_t freewait; /* eventlist of free tblock */
/* tlock */
int freelock; /* index first free lock word */ int freelock; /* index first free lock word */
wait_queue_head_t freewait; /* eventlist of free tblock */
wait_queue_head_t freelockwait; /* eventlist of free tlock */ wait_queue_head_t freelockwait; /* eventlist of free tlock */
wait_queue_head_t lowlockwait; /* eventlist of ample tlocks */ wait_queue_head_t lowlockwait; /* eventlist of ample tlocks */
int tlocksInUse; /* Number of tlocks in use */ int tlocksInUse; /* Number of tlocks in use */
int TlocksLow; /* Indicates low number of available tlocks */
spinlock_t LazyLock; /* synchronize sync_queue & unlock_queue */ spinlock_t LazyLock; /* synchronize sync_queue & unlock_queue */
/* struct tblock *sync_queue; * Transactions waiting for data sync */ /* struct tblock *sync_queue; * Transactions waiting for data sync */
struct tblock *unlock_queue; /* Txns waiting to be released */ struct list_head unlock_queue; /* Txns waiting to be released */
struct tblock *unlock_tail; /* Tail of unlock_queue */
struct list_head anon_list; /* inodes having anonymous txns */ struct list_head anon_list; /* inodes having anonymous txns */
struct list_head anon_list2; /* inodes having anonymous txns struct list_head anon_list2; /* inodes having anonymous txns
that couldn't be sync'ed */ that couldn't be sync'ed */
} TxAnchor; } TxAnchor;
int jfs_tlocks_low; /* Indicates low number of available tlocks */
#ifdef CONFIG_JFS_STATISTICS #ifdef CONFIG_JFS_STATISTICS
struct { struct {
uint txBegin; uint txBegin;
...@@ -95,11 +94,19 @@ struct { ...@@ -95,11 +94,19 @@ struct {
#endif #endif
static int nTxBlock = 512; /* number of transaction blocks */ static int nTxBlock = 512; /* number of transaction blocks */
struct tblock *TxBlock; /* transaction block table */ module_param(nTxBlock, int, 0);
MODULE_PARM_DESC(nTxBlock,
"Number of transaction blocks (default:512, max:65536)");
static int nTxLock = 4096; /* number of transaction locks */ static int nTxLock = 4096; /* number of transaction locks */
static int TxLockLWM = 4096*.4; /* Low water mark for number of txLocks used */ module_param(nTxLock, int, 0);
static int TxLockHWM = 4096*.8; /* High water mark for number of txLocks used */ MODULE_PARM_DESC(nTxLock,
"Number of transaction locks (default:4096, max:65536)");
struct tblock *TxBlock; /* transaction block table */
static int TxLockLWM; /* Low water mark for number of txLocks used */
static int TxLockHWM; /* High water mark for number of txLocks used */
static int TxLockVHWM; /* Very High water mark */
struct tlock *TxLock; /* transaction lock table */ struct tlock *TxLock; /* transaction lock table */
...@@ -162,7 +169,6 @@ extern void lmSync(struct jfs_log *); ...@@ -162,7 +169,6 @@ extern void lmSync(struct jfs_log *);
extern int jfs_commit_inode(struct inode *, int); extern int jfs_commit_inode(struct inode *, int);
extern int jfs_stop_threads; extern int jfs_stop_threads;
struct task_struct *jfsCommitTask;
extern struct completion jfsIOwait; extern struct completion jfsIOwait;
/* /*
...@@ -210,9 +216,9 @@ static lid_t txLockAlloc(void) ...@@ -210,9 +216,9 @@ static lid_t txLockAlloc(void)
TXN_SLEEP(&TxAnchor.freelockwait); TXN_SLEEP(&TxAnchor.freelockwait);
TxAnchor.freelock = TxLock[lid].next; TxAnchor.freelock = TxLock[lid].next;
HIGHWATERMARK(stattx.maxlid, lid); HIGHWATERMARK(stattx.maxlid, lid);
if ((++TxAnchor.tlocksInUse > TxLockHWM) && (TxAnchor.TlocksLow == 0)) { if ((++TxAnchor.tlocksInUse > TxLockHWM) && (jfs_tlocks_low == 0)) {
jfs_info("txLockAlloc TlocksLow"); jfs_info("txLockAlloc tlocks low");
TxAnchor.TlocksLow = 1; jfs_tlocks_low = 1;
wake_up(&jfs_sync_thread_wait); wake_up(&jfs_sync_thread_wait);
} }
...@@ -224,9 +230,9 @@ static void txLockFree(lid_t lid) ...@@ -224,9 +230,9 @@ static void txLockFree(lid_t lid)
TxLock[lid].next = TxAnchor.freelock; TxLock[lid].next = TxAnchor.freelock;
TxAnchor.freelock = lid; TxAnchor.freelock = lid;
TxAnchor.tlocksInUse--; TxAnchor.tlocksInUse--;
if (TxAnchor.TlocksLow && (TxAnchor.tlocksInUse < TxLockLWM)) { if (jfs_tlocks_low && (TxAnchor.tlocksInUse < TxLockLWM)) {
jfs_info("txLockFree TlocksLow no more"); jfs_info("txLockFree jfs_tlocks_low no more");
TxAnchor.TlocksLow = 0; jfs_tlocks_low = 0;
TXN_WAKEUP(&TxAnchor.lowlockwait); TXN_WAKEUP(&TxAnchor.lowlockwait);
} }
TXN_WAKEUP(&TxAnchor.freelockwait); TXN_WAKEUP(&TxAnchor.freelockwait);
...@@ -245,12 +251,25 @@ int txInit(void) ...@@ -245,12 +251,25 @@ int txInit(void)
{ {
int k, size; int k, size;
/* Verify tunable parameters */
if (nTxBlock < 16)
nTxBlock = 16; /* No one should set it this low */
if (nTxBlock > 65536)
nTxBlock = 65536;
if (nTxLock < 256)
nTxLock = 256; /* No one should set it this low */
if (nTxLock > 65536)
nTxLock = 65536;
/* /*
* initialize transaction block (tblock) table * initialize transaction block (tblock) table
* *
* transaction id (tid) = tblock index * transaction id (tid) = tblock index
* tid = 0 is reserved. * tid = 0 is reserved.
*/ */
TxLockLWM = (nTxLock * 4) / 10;
TxLockHWM = (nTxLock * 8) / 10;
TxLockVHWM = (nTxLock * 9) / 10;
size = sizeof(struct tblock) * nTxBlock; size = sizeof(struct tblock) * nTxBlock;
TxBlock = (struct tblock *) vmalloc(size); TxBlock = (struct tblock *) vmalloc(size);
if (TxBlock == NULL) if (TxBlock == NULL)
...@@ -295,6 +314,9 @@ int txInit(void) ...@@ -295,6 +314,9 @@ int txInit(void)
INIT_LIST_HEAD(&TxAnchor.anon_list); INIT_LIST_HEAD(&TxAnchor.anon_list);
INIT_LIST_HEAD(&TxAnchor.anon_list2); INIT_LIST_HEAD(&TxAnchor.anon_list2);
LAZY_LOCK_INIT();
INIT_LIST_HEAD(&TxAnchor.unlock_queue);
stattx.maxlid = 1; /* statistics */ stattx.maxlid = 1; /* statistics */
return 0; return 0;
...@@ -358,7 +380,7 @@ tid_t txBegin(struct super_block *sb, int flag) ...@@ -358,7 +380,7 @@ tid_t txBegin(struct super_block *sb, int flag)
* unless COMMIT_FORCE or COMMIT_INODE (which may ultimately * unless COMMIT_FORCE or COMMIT_INODE (which may ultimately
* free tlocks) * free tlocks)
*/ */
if (TxAnchor.TlocksLow) { if (TxAnchor.tlocksInUse > TxLockVHWM) {
INCREMENT(TxStat.txBegin_lockslow); INCREMENT(TxStat.txBegin_lockslow);
TXN_SLEEP(&TxAnchor.lowlockwait); TXN_SLEEP(&TxAnchor.lowlockwait);
goto retry; goto retry;
...@@ -450,7 +472,7 @@ void txBeginAnon(struct super_block *sb) ...@@ -450,7 +472,7 @@ void txBeginAnon(struct super_block *sb)
/* /*
* Don't begin transaction if we're getting starved for tlocks * Don't begin transaction if we're getting starved for tlocks
*/ */
if (TxAnchor.TlocksLow) { if (TxAnchor.tlocksInUse > TxLockVHWM) {
INCREMENT(TxStat.txBeginAnon_lockslow); INCREMENT(TxStat.txBeginAnon_lockslow);
TXN_SLEEP(&TxAnchor.lowlockwait); TXN_SLEEP(&TxAnchor.lowlockwait);
goto retry; goto retry;
...@@ -2559,6 +2581,7 @@ void txFreelock(struct inode *ip) ...@@ -2559,6 +2581,7 @@ void txFreelock(struct inode *ip)
if (!jfs_ip->atlhead) if (!jfs_ip->atlhead)
return; return;
TXN_LOCK();
xtlck = (struct tlock *) &jfs_ip->atlhead; xtlck = (struct tlock *) &jfs_ip->atlhead;
while ((lid = xtlck->next)) { while ((lid = xtlck->next)) {
...@@ -2579,10 +2602,9 @@ void txFreelock(struct inode *ip) ...@@ -2579,10 +2602,9 @@ void txFreelock(struct inode *ip)
/* /*
* If inode was on anon_list, remove it * If inode was on anon_list, remove it
*/ */
TXN_LOCK();
list_del_init(&jfs_ip->anon_inode_list); list_del_init(&jfs_ip->anon_inode_list);
TXN_UNLOCK();
} }
TXN_UNLOCK();
} }
...@@ -2707,50 +2729,54 @@ int jfs_lazycommit(void *arg) ...@@ -2707,50 +2729,54 @@ int jfs_lazycommit(void *arg)
int WorkDone; int WorkDone;
struct tblock *tblk; struct tblock *tblk;
unsigned long flags; unsigned long flags;
struct jfs_sb_info *sbi;
daemonize("jfsCommit"); daemonize("jfsCommit");
jfsCommitTask = current;
LAZY_LOCK_INIT();
TxAnchor.unlock_queue = TxAnchor.unlock_tail = 0;
complete(&jfsIOwait); complete(&jfsIOwait);
do { do {
LAZY_LOCK(flags); LAZY_LOCK(flags);
restart: while (!list_empty(&TxAnchor.unlock_queue)) {
WorkDone = 0; WorkDone = 0;
while ((tblk = TxAnchor.unlock_queue)) { list_for_each_entry(tblk, &TxAnchor.unlock_queue,
/* cqueue) {
* We can't get ahead of user thread. Spinning is
* simpler than blocking/waking. We shouldn't spin
* very long, since user thread shouldn't be blocking
* between lmGroupCommit & txEnd.
*/
WorkDone = 1;
/* sbi = JFS_SBI(tblk->sb);
* Remove first transaction from queue /*
*/ * For each volume, the transactions must be
TxAnchor.unlock_queue = tblk->cqnext; * handled in order. If another commit thread
tblk->cqnext = 0; * is handling a tblk for this superblock,
if (TxAnchor.unlock_tail == tblk) * skip it
TxAnchor.unlock_tail = 0; */
if (sbi->commit_state & IN_LAZYCOMMIT)
continue;
LAZY_UNLOCK(flags); sbi->commit_state |= IN_LAZYCOMMIT;
txLazyCommit(tblk); WorkDone = 1;
/* /*
* We can be running indefinitely if other processors * Remove transaction from queue
* are adding transactions to this list */
*/ list_del(&tblk->cqueue);
cond_resched();
LAZY_LOCK(flags);
}
if (WorkDone) LAZY_UNLOCK(flags);
goto restart; txLazyCommit(tblk);
LAZY_LOCK(flags);
sbi->commit_state &= ~IN_LAZYCOMMIT;
/*
* Don't continue in the for loop. (We can't
* anyway, it's unsafe!) We want to go back to
* the beginning of the list.
*/
break;
}
/* If there was nothing to do, don't continue */
if (!WorkDone)
break;
}
if (current->flags & PF_FREEZE) { if (current->flags & PF_FREEZE) {
LAZY_UNLOCK(flags); LAZY_UNLOCK(flags);
...@@ -2767,7 +2793,7 @@ int jfs_lazycommit(void *arg) ...@@ -2767,7 +2793,7 @@ int jfs_lazycommit(void *arg)
} }
} while (!jfs_stop_threads); } while (!jfs_stop_threads);
if (TxAnchor.unlock_queue) if (!list_empty(&TxAnchor.unlock_queue))
jfs_err("jfs_lazycommit being killed w/pending transactions!"); jfs_err("jfs_lazycommit being killed w/pending transactions!");
else else
jfs_info("jfs_lazycommit being killed\n"); jfs_info("jfs_lazycommit being killed\n");
...@@ -2780,14 +2806,14 @@ void txLazyUnlock(struct tblock * tblk) ...@@ -2780,14 +2806,14 @@ void txLazyUnlock(struct tblock * tblk)
LAZY_LOCK(flags); LAZY_LOCK(flags);
if (TxAnchor.unlock_tail) list_add_tail(&tblk->cqueue, &TxAnchor.unlock_queue);
TxAnchor.unlock_tail->cqnext = tblk; /*
else * Don't wake up a commit thread if there is already one servicing
TxAnchor.unlock_queue = tblk; * this superblock.
TxAnchor.unlock_tail = tblk; */
tblk->cqnext = 0; if (!(JFS_SBI(tblk->sb)->commit_state & IN_LAZYCOMMIT))
wake_up(&jfs_commit_thread_wait);
LAZY_UNLOCK(flags); LAZY_UNLOCK(flags);
wake_up(&jfs_commit_thread_wait);
} }
static void LogSyncRelease(struct metapage * mp) static void LogSyncRelease(struct metapage * mp)
...@@ -2821,7 +2847,7 @@ static void LogSyncRelease(struct metapage * mp) ...@@ -2821,7 +2847,7 @@ static void LogSyncRelease(struct metapage * mp)
* completion * completion
* *
* This does almost the same thing as jfs_sync below. We don't * This does almost the same thing as jfs_sync below. We don't
* worry about deadlocking when TlocksLow is set, since we would * worry about deadlocking when jfs_tlocks_low is set, since we would
* expect jfs_sync to get us out of that jam. * expect jfs_sync to get us out of that jam.
*/ */
void txQuiesce(struct super_block *sb) void txQuiesce(struct super_block *sb)
...@@ -2912,7 +2938,7 @@ int jfs_sync(void *arg) ...@@ -2912,7 +2938,7 @@ int jfs_sync(void *arg)
* write each inode on the anonymous inode list * write each inode on the anonymous inode list
*/ */
TXN_LOCK(); TXN_LOCK();
while (TxAnchor.TlocksLow && !list_empty(&TxAnchor.anon_list)) { while (jfs_tlocks_low && !list_empty(&TxAnchor.anon_list)) {
jfs_ip = list_entry(TxAnchor.anon_list.next, jfs_ip = list_entry(TxAnchor.anon_list.next,
struct jfs_inode_info, struct jfs_inode_info,
anon_inode_list); anon_inode_list);
...@@ -3008,18 +3034,16 @@ int jfs_txanchor_read(char *buffer, char **start, off_t offset, int length, ...@@ -3008,18 +3034,16 @@ int jfs_txanchor_read(char *buffer, char **start, off_t offset, int length,
"freelockwait = %s\n" "freelockwait = %s\n"
"lowlockwait = %s\n" "lowlockwait = %s\n"
"tlocksInUse = %d\n" "tlocksInUse = %d\n"
"TlocksLow = %d\n" "jfs_tlocks_low = %d\n"
"unlock_queue = 0x%p\n" "unlock_queue is %sempty\n",
"unlock_tail = 0x%p\n",
TxAnchor.freetid, TxAnchor.freetid,
freewait, freewait,
TxAnchor.freelock, TxAnchor.freelock,
freelockwait, freelockwait,
lowlockwait, lowlockwait,
TxAnchor.tlocksInUse, TxAnchor.tlocksInUse,
TxAnchor.TlocksLow, jfs_tlocks_low,
TxAnchor.unlock_queue, list_empty(&TxAnchor.unlock_queue) ? "" : "not ");
TxAnchor.unlock_tail);
begin = offset; begin = offset;
*start = buffer + begin; *start = buffer + begin;
......
/* /*
* Copyright (c) International Business Machines Corp., 2000-2002 * Copyright (C) International Business Machines Corp., 2000-2004
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
...@@ -53,7 +53,7 @@ struct tblock { ...@@ -53,7 +53,7 @@ struct tblock {
u32 logtid; /* log transaction id */ u32 logtid; /* log transaction id */
/* commit management */ /* commit management */
struct tblock *cqnext; /* commit queue link */ struct list_head cqueue; /* commit queue list */
s32 clsn; /* commit lsn */ s32 clsn; /* commit lsn */
struct lbuf *bp; struct lbuf *bp;
s32 pn; /* commit record log page number */ s32 pn; /* commit record log page number */
...@@ -93,16 +93,16 @@ extern struct tblock *TxBlock; /* transaction block table */ ...@@ -93,16 +93,16 @@ extern struct tblock *TxBlock; /* transaction block table */
* transaction lock * transaction lock
*/ */
struct tlock { struct tlock {
lid_t next; /* index next lockword on tid locklist lid_t next; /* 2: index next lockword on tid locklist
* next lockword on freelist * next lockword on freelist
*/ */
tid_t tid; /* transaction id holding lock */ tid_t tid; /* 2: transaction id holding lock */
u16 flag; /* 2: lock control */ u16 flag; /* 2: lock control */
u16 type; /* 2: log type */ u16 type; /* 2: log type */
struct metapage *mp; /* 4: object page buffer locked */ struct metapage *mp; /* 4/8: object page buffer locked */
struct inode *ip; /* 4: object */ struct inode *ip; /* 4/8: object */
/* (16) */ /* (16) */
s16 lock[24]; /* 48: overlay area */ s16 lock[24]; /* 48: overlay area */
...@@ -167,7 +167,7 @@ struct lv { ...@@ -167,7 +167,7 @@ struct lv {
#define TLOCKLONG 28 #define TLOCKLONG 28
struct linelock { struct linelock {
u16 next; /* 2: next linelock */ lid_t next; /* 2: next linelock */
s8 maxcnt; /* 1: */ s8 maxcnt; /* 1: */
s8 index; /* 1: */ s8 index; /* 1: */
...@@ -183,7 +183,7 @@ struct linelock { ...@@ -183,7 +183,7 @@ struct linelock {
#define dt_lock linelock #define dt_lock linelock
struct xtlock { struct xtlock {
u16 next; /* 2: */ lid_t next; /* 2: */
s8 maxcnt; /* 1: */ s8 maxcnt; /* 1: */
s8 index; /* 1: */ s8 index; /* 1: */
...@@ -214,7 +214,7 @@ struct xtlock { ...@@ -214,7 +214,7 @@ struct xtlock {
* free maplock (i.e., number of maplock) in the tlock; * free maplock (i.e., number of maplock) in the tlock;
*/ */
struct maplock { struct maplock {
u16 next; /* 2: */ lid_t next; /* 2: */
u8 maxcnt; /* 2: */ u8 maxcnt; /* 2: */
u8 index; /* 2: next free maplock index */ u8 index; /* 2: next free maplock index */
...@@ -242,7 +242,7 @@ struct maplock { ...@@ -242,7 +242,7 @@ struct maplock {
#define pxd_lock maplock #define pxd_lock maplock
struct xdlistlock { struct xdlistlock {
u16 next; /* 2: */ lid_t next; /* 2: */
u8 maxcnt; /* 2: */ u8 maxcnt; /* 2: */
u8 index; /* 2: */ u8 index; /* 2: */
......
/* /*
* Copyright (c) International Business Machines Corp., 2000-2002 * Copyright (C) International Business Machines Corp., 2000-2004
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
...@@ -34,9 +34,12 @@ ...@@ -34,9 +34,12 @@
/* /*
* transaction and lock id's * transaction and lock id's
*
* Don't change these without carefully considering the impact on the
* size and alignment of all of the linelock variants
*/ */
typedef uint tid_t; typedef u16 tid_t;
typedef uint lid_t; typedef u16 lid_t;
/* /*
* Almost identical to Linux's timespec, but not quite * Almost identical to Linux's timespec, but not quite
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <linux/parser.h> #include <linux/parser.h>
#include <linux/completion.h> #include <linux/completion.h>
#include <linux/vfs.h> #include <linux/vfs.h>
#include <linux/moduleparam.h>
#include <asm/uaccess.h> #include <asm/uaccess.h>
#include "jfs_incore.h" #include "jfs_incore.h"
...@@ -44,15 +45,20 @@ static struct super_operations jfs_super_operations; ...@@ -44,15 +45,20 @@ static struct super_operations jfs_super_operations;
static struct export_operations jfs_export_operations; static struct export_operations jfs_export_operations;
static struct file_system_type jfs_fs_type; static struct file_system_type jfs_fs_type;
#define MAX_COMMIT_THREADS 64
static int commit_threads = 0;
module_param(commit_threads, int, 0);
MODULE_PARM_DESC(commit_threads, "Number of commit threads");
int jfs_stop_threads; int jfs_stop_threads;
static pid_t jfsIOthread; static pid_t jfsIOthread;
static pid_t jfsCommitThread; static pid_t jfsCommitThread[MAX_COMMIT_THREADS];
static pid_t jfsSyncThread; static pid_t jfsSyncThread;
DECLARE_COMPLETION(jfsIOwait); DECLARE_COMPLETION(jfsIOwait);
#ifdef CONFIG_JFS_DEBUG #ifdef CONFIG_JFS_DEBUG
int jfsloglevel = JFS_LOGLEVEL_WARN; int jfsloglevel = JFS_LOGLEVEL_WARN;
MODULE_PARM(jfsloglevel, "i"); module_param(jfsloglevel, int, 644);
MODULE_PARM_DESC(jfsloglevel, "Specify JFS loglevel (0, 1 or 2)"); MODULE_PARM_DESC(jfsloglevel, "Specify JFS loglevel (0, 1 or 2)");
#endif #endif
...@@ -564,6 +570,7 @@ static void init_once(void *foo, kmem_cache_t * cachep, unsigned long flags) ...@@ -564,6 +570,7 @@ static void init_once(void *foo, kmem_cache_t * cachep, unsigned long flags)
static int __init init_jfs_fs(void) static int __init init_jfs_fs(void)
{ {
int i;
int rc; int rc;
jfs_inode_cachep = jfs_inode_cachep =
...@@ -600,12 +607,23 @@ static int __init init_jfs_fs(void) ...@@ -600,12 +607,23 @@ static int __init init_jfs_fs(void)
} }
wait_for_completion(&jfsIOwait); /* Wait until thread starts */ wait_for_completion(&jfsIOwait); /* Wait until thread starts */
jfsCommitThread = kernel_thread(jfs_lazycommit, 0, CLONE_KERNEL); if (commit_threads < 1)
if (jfsCommitThread < 0) { commit_threads = num_online_cpus();
jfs_err("init_jfs_fs: fork failed w/rc = %d", jfsCommitThread); else if (commit_threads > MAX_COMMIT_THREADS)
goto kill_iotask; commit_threads = MAX_COMMIT_THREADS;
for (i = 0; i < commit_threads; i++) {
jfsCommitThread[i] = kernel_thread(jfs_lazycommit, 0,
CLONE_KERNEL);
if (jfsCommitThread[i] < 0) {
jfs_err("init_jfs_fs: fork failed w/rc = %d",
jfsCommitThread[i]);
commit_threads = i;
goto kill_committask;
}
/* Wait until thread starts */
wait_for_completion(&jfsIOwait);
} }
wait_for_completion(&jfsIOwait); /* Wait until thread starts */
jfsSyncThread = kernel_thread(jfs_sync, 0, CLONE_KERNEL); jfsSyncThread = kernel_thread(jfs_sync, 0, CLONE_KERNEL);
if (jfsSyncThread < 0) { if (jfsSyncThread < 0) {
...@@ -622,10 +640,10 @@ static int __init init_jfs_fs(void) ...@@ -622,10 +640,10 @@ static int __init init_jfs_fs(void)
kill_committask: kill_committask:
jfs_stop_threads = 1; jfs_stop_threads = 1;
wake_up(&jfs_commit_thread_wait); wake_up_all(&jfs_commit_thread_wait);
wait_for_completion(&jfsIOwait); /* Wait for thread exit */ for (i = 0; i < commit_threads; i++)
kill_iotask: wait_for_completion(&jfsIOwait);
jfs_stop_threads = 1;
wake_up(&jfs_IO_thread_wait); wake_up(&jfs_IO_thread_wait);
wait_for_completion(&jfsIOwait); /* Wait for thread exit */ wait_for_completion(&jfsIOwait); /* Wait for thread exit */
end_txmngr: end_txmngr:
...@@ -639,6 +657,8 @@ static int __init init_jfs_fs(void) ...@@ -639,6 +657,8 @@ static int __init init_jfs_fs(void)
static void __exit exit_jfs_fs(void) static void __exit exit_jfs_fs(void)
{ {
int i;
jfs_info("exit_jfs_fs called"); jfs_info("exit_jfs_fs called");
jfs_stop_threads = 1; jfs_stop_threads = 1;
...@@ -646,8 +666,9 @@ static void __exit exit_jfs_fs(void) ...@@ -646,8 +666,9 @@ static void __exit exit_jfs_fs(void)
metapage_exit(); metapage_exit();
wake_up(&jfs_IO_thread_wait); wake_up(&jfs_IO_thread_wait);
wait_for_completion(&jfsIOwait); /* Wait until IO thread exits */ wait_for_completion(&jfsIOwait); /* Wait until IO thread exits */
wake_up(&jfs_commit_thread_wait); wake_up_all(&jfs_commit_thread_wait);
wait_for_completion(&jfsIOwait); /* Wait until Commit thread exits */ for (i = 0; i < commit_threads; i++)
wait_for_completion(&jfsIOwait);
wake_up(&jfs_sync_thread_wait); wake_up(&jfs_sync_thread_wait);
wait_for_completion(&jfsIOwait); /* Wait until Sync thread exits */ wait_for_completion(&jfsIOwait); /* Wait until Sync thread exits */
#ifdef PROC_FS_JFS #ifdef PROC_FS_JFS
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment