cregit-Linux how code gets into the kernel

Release 4.10 fs/dlm/lock.c

Directory: fs/dlm
/******************************************************************************
*******************************************************************************
**
**  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
**
**  This copyrighted material is made available to anyone wishing to use,
**  modify, copy, or redistribute it subject to the terms and conditions
**  of the GNU General Public License v.2.
**
*******************************************************************************
******************************************************************************/

/* Central locking logic has four stages:

   dlm_lock()
   dlm_unlock()

   request_lock(ls, lkb)
   convert_lock(ls, lkb)
   unlock_lock(ls, lkb)
   cancel_lock(ls, lkb)

   _request_lock(r, lkb)
   _convert_lock(r, lkb)
   _unlock_lock(r, lkb)
   _cancel_lock(r, lkb)

   do_request(r, lkb)
   do_convert(r, lkb)
   do_unlock(r, lkb)
   do_cancel(r, lkb)

   Stage 1 (lock, unlock) is mainly about checking input args and
   splitting into one of the four main operations:

       dlm_lock          = request_lock
       dlm_lock+CONVERT  = convert_lock
       dlm_unlock        = unlock_lock
       dlm_unlock+CANCEL = cancel_lock

   Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
   provided to the next stage.

   Stage 3, _xxxx_lock(), determines if the operation is local or remote.
   When remote, it calls send_xxxx(), when local it calls do_xxxx().

   Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
   given rsb and lkb and queues callbacks.

   For remote operations, send_xxxx() results in the corresponding do_xxxx()
   function being executed on the remote node.  The connecting send/receive
   calls on local (L) and remote (R) nodes:

   L: send_xxxx()              ->  R: receive_xxxx()
                                   R: do_xxxx()
   L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
*/
#include <linux/types.h>
#include <linux/rbtree.h>
#include <linux/slab.h>
#include "dlm_internal.h"
#include <linux/dlm_device.h>
#include "memory.h"
#include "lowcomms.h"
#include "requestqueue.h"
#include "util.h"
#include "dir.h"
#include "member.h"
#include "lockspace.h"
#include "ast.h"
#include "lock.h"
#include "rcom.h"
#include "recover.h"
#include "lvb_table.h"
#include "user.h"
#include "config.h"

static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int send_remove(struct dlm_rsb *r);
static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
				    struct dlm_message *ms);
static int receive_extralen(struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void del_timeout(struct dlm_lkb *lkb);
static void toss_rsb(struct kref *kref);

/*
 * Lock compatibilty matrix - thanks Steve
 * UN = Unlocked state. Not really a state, used as a flag
 * PD = Padding. Used to make the matrix a nice power of two in size
 * Other states are the same as the VMS DLM.
 * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
 */


static const int __dlm_compat_matrix[8][8] = {
      /* UN NL CR CW PR PW EX PD */
        {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
        {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
        {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
        {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
        {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
        {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
        {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
        {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
};

/*
 * This defines the direction of transfer of LVB data.
 * Granted mode is the row; requested mode is the column.
 * Usage: matrix[grmode+1][rqmode+1]
 * 1 = LVB is returned to the caller
 * 0 = LVB is written to the resource
 * -1 = nothing happens to the LVB
 */


const int dlm_lvb_operations[8][8] = {
        /* UN   NL  CR  CW  PR  PW  EX  PD*/
        {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
        {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
        {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
        {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
        {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
        {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
        {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
        {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
};


#define modes_compat(gr, rq) \
	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]


int dlm_modes_compat(int mode1, int mode2) { return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland24100.00%1100.00%
Total24100.00%1100.00%

/* * Compatibility matrix for conversions with QUECVT set. * Granted mode is the row; requested mode is the column. * Usage: matrix[grmode+1][rqmode+1] */ static const int __quecvt_compat_matrix[8][8] = { /* UN NL CR CW PR PW EX PD */ {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ };
void dlm_print_lkb(struct dlm_lkb *lkb) { printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, (unsigned long long)lkb->lkb_recover_seq); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland66100.00%3100.00%
Total66100.00%3100.00%


static void dlm_print_rsb(struct dlm_rsb *r) { printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " "rlc %d name %s\n", r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, r->res_flags, r->res_first_lkid, r->res_recover_locks_count, r->res_name); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland4597.83%266.67%
adrian bunkadrian bunk12.17%133.33%
Total46100.00%3100.00%


void dlm_dump_rsb(struct dlm_rsb *r) { struct dlm_lkb *lkb; dlm_print_rsb(r); printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); printk(KERN_ERR "rsb lookup list\n"); list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) dlm_print_lkb(lkb); printk(KERN_ERR "rsb grant queue:\n"); list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) dlm_print_lkb(lkb); printk(KERN_ERR "rsb convert queue:\n"); list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) dlm_print_lkb(lkb); printk(KERN_ERR "rsb wait queue:\n"); list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) dlm_print_lkb(lkb); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland118100.00%1100.00%
Total118100.00%1100.00%

/* Threads cannot use the lockspace while it's being recovered */
static inline void dlm_lock_recovery(struct dlm_ls *ls) { down_read(&ls->ls_in_recovery); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland20100.00%2100.00%
Total20100.00%2100.00%


void dlm_unlock_recovery(struct dlm_ls *ls) { up_read(&ls->ls_in_recovery); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland18100.00%2100.00%
Total18100.00%2100.00%


int dlm_lock_recovery_try(struct dlm_ls *ls) { return down_read_trylock(&ls->ls_in_recovery); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland19100.00%2100.00%
Total19100.00%2100.00%


static inline int can_be_queued(struct dlm_lkb *lkb) { return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland22100.00%1100.00%
Total22100.00%1100.00%


static inline int force_blocking_asts(struct dlm_lkb *lkb) { return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland21100.00%1100.00%
Total21100.00%1100.00%


static inline int is_demoted(struct dlm_lkb *lkb) { return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland21100.00%1100.00%
Total21100.00%1100.00%


static inline int is_altmode(struct dlm_lkb *lkb) { return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland21100.00%1100.00%
Total21100.00%1100.00%


static inline int is_granted(struct dlm_lkb *lkb) { return (lkb->lkb_status == DLM_LKSTS_GRANTED); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland21100.00%1100.00%
Total21100.00%1100.00%


static inline int is_remote(struct dlm_rsb *r) { DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); return !!r->res_nodeid; }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland27100.00%1100.00%
Total27100.00%1100.00%


static inline int is_process_copy(struct dlm_lkb *lkb) { return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland28100.00%1100.00%
Total28100.00%1100.00%


static inline int is_master_copy(struct dlm_lkb *lkb) { return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland25100.00%2100.00%
Total25100.00%2100.00%


static inline int middle_conversion(struct dlm_lkb *lkb) { if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) return 1; return 0; }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland48100.00%2100.00%
Total48100.00%2100.00%


static inline int down_conversion(struct dlm_lkb *lkb) { return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland29100.00%1100.00%
Total29100.00%1100.00%


static inline int is_overlap_unlock(struct dlm_lkb *lkb) { return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland19100.00%1100.00%
Total19100.00%1100.00%


static inline int is_overlap_cancel(struct dlm_lkb *lkb) { return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland19100.00%1100.00%
Total19100.00%1100.00%


static inline int is_overlap(struct dlm_lkb *lkb) { return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | DLM_IFL_OVERLAP_CANCEL)); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland25100.00%1100.00%
Total25100.00%1100.00%


static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) { if (is_master_copy(lkb)) return; del_timeout(lkb); DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); /* if the operation was a cancel, then return -DLM_ECANCEL, if a timeout caused the cancel then return -ETIMEDOUT */ if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; rv = -ETIMEDOUT; } if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; rv = -EDEADLK; } dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland115100.00%7100.00%
Total115100.00%7100.00%


static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) { queue_cast(r, lkb, is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland35100.00%1100.00%
Total35100.00%1100.00%


static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) { if (is_master_copy(lkb)) { send_bast(r, lkb, rqmode); } else { dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); } }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland53100.00%5100.00%
Total53100.00%5100.00%

/* * Basic operations on rsb's and lkb's */ /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */
static inline void hold_rsb(struct dlm_rsb *r) { kref_get(&r->res_ref); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland20100.00%1100.00%
Total20100.00%1100.00%


void dlm_hold_rsb(struct dlm_rsb *r) { hold_rsb(r); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland15100.00%1100.00%
Total15100.00%1100.00%

/* When all references to the rsb are gone it's transferred to the tossed list for later disposal. */
static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; uint32_t bucket = r->res_bucket; spin_lock(&ls->ls_rsbtbl[bucket].lock); kref_put(&r->res_ref, toss_rsb); spin_unlock(&ls->ls_rsbtbl[bucket].lock); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland63100.00%1100.00%
Total63100.00%1100.00%


void dlm_put_rsb(struct dlm_rsb *r) { put_rsb(r); }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland15100.00%1100.00%
Total15100.00%1100.00%


static int pre_rsb_struct(struct dlm_ls *ls) { struct dlm_rsb *r1, *r2; int count = 0; spin_lock(&ls->ls_new_rsb_spin); if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { spin_unlock(&ls->ls_new_rsb_spin); return 0; } spin_unlock(&ls->ls_new_rsb_spin); r1 = dlm_allocate_rsb(ls); r2 = dlm_allocate_rsb(ls); spin_lock(&ls->ls_new_rsb_spin); if (r1) { list_add(&r1->res_hashchain, &ls->ls_new_rsb); ls->ls_new_rsb_count++; } if (r2) { list_add(&r2->res_hashchain, &ls->ls_new_rsb); ls->ls_new_rsb_count++; } count = ls->ls_new_rsb_count; spin_unlock(&ls->ls_new_rsb_spin); if (!count) return -ENOMEM; return 0; }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland161100.00%2100.00%
Total161100.00%2100.00%

/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can unlock any spinlocks, go back and call pre_rsb_struct again. Otherwise, take an rsb off the list and return it. */
static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, struct dlm_rsb **r_ret) { struct dlm_rsb *r; int count; spin_lock(&ls->ls_new_rsb_spin); if (list_empty(&ls->ls_new_rsb)) { count = ls->ls_new_rsb_count; spin_unlock(&ls->ls_new_rsb_spin); log_debug(ls, "find_rsb retry %d %d %s", count, dlm_config.ci_new_rsb_count, name); return -EAGAIN; } r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); list_del(&r->res_hashchain); /* Convert the empty list_head to a NULL rb_node for tree usage: */ memset(&r->res_hashnode, 0, sizeof(struct rb_node)); ls->ls_new_rsb_count--; spin_unlock(&ls->ls_new_rsb_spin); r->res_ls = ls; r->res_length = len; memcpy(r->res_name, name, len); mutex_init(&r->res_mutex); INIT_LIST_HEAD(&r->res_lookup); INIT_LIST_HEAD(&r->res_grantqueue); INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); INIT_LIST_HEAD(&r->res_recover_list); *r_ret = r; return 0; }

Contributors

PersonTokensPropCommitsCommitProp
david teiglanddavid teigland20892.44%375.00%
robert s. petersonrobert s. peterson177.56%125.00%
Total225100.00%4100.00%


static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) { char maxname[DLM_RESNAME_MAXLEN]; memset(maxname, 0, DLM_RESNAME_MAXLEN); memcpy(maxname, name, nlen); return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); }

Contributors

PersonTokensPropCommitsCommitProp
robert s. petersonrobert s. peterson5090.91%150.00%
david teiglanddavid teigland59.09%150.00%
Total55100.00%2100.00%


int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, struct dlm_rsb **r_ret) { struct rb_node *node = tree->rb_node; struct dlm_rsb *r; int rc; while (node) { r = rb_entry(node, struct dlm_rsb, res_hashnode); rc = rsb_cmp(r, name, len); if (rc < 0) node = node->rb_left; else if (rc > 0) node = node->rb_right; else goto found; } *r_ret = NULL; return -EBADR; found: *r_ret = r; return 0; }

Contributors

PersonTokensPropCommitsCommitProp
robert s. petersonrobert s. peterson6152.14%116.67%
david teiglanddavid teigland5143.59%466.67%
benny halevybenny halevy54.27%116.67%
Total117100.00%6100.00%


static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) { struct rb_node **newn = &tree->rb_node; struct rb_node *parent = NULL; int rc; while (*newn) { struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, res_hashnode); parent = *newn; rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); if (rc < 0) newn = &parent->rb_left; else if (rc > 0) newn = &parent->rb_right; else { log_print("rsb_insert match"); dlm_dump_rsb(rsb); dlm_dump_rsb(cur); return -EEXIST; } } rb_link_node(&rsb->res_hashnode, parent, newn); rb_insert_color(&rsb->res_hashnode, tree); return 0; }

Contributors

PersonTokensPropCommitsCommitProp
robert s. petersonrobert s. peterson154100.00%1100.00%
Total154100.00%1100.00%

/* * Find rsb in rsbtbl and potentially create/add one * * Delaying the release of rsb's has a similar benefit to applications keeping * NL locks on an rsb, but without the guarantee that the cached master value * will still be valid when the rsb is reused. Apps aren't always smart enough * to keep NL locks on an rsb that they may lock again shortly; this can lead * to excessive master lookups and removals if we don't delay the release. * * Searching for an rsb means looking through both the normal list and toss * list. When found on the toss list the rsb is moved to the normal list with * ref count of 1; when found on normal list the ref count is incremented. * * rsb's on the keep list are being used locally and refcounted. * rsb's on the toss list are not being used locally, and are not refcounted. * * The toss list rsb's were either * - previously used locally but not any more (were on keep list, then * moved to toss list when last refcount dropped) * - created and put on toss list as a directory record for a lookup * (we are the dir node for the res, but are not using the res right now, * but some other node is) * * The purpose of find_rsb() is to return a refcounted rsb for local use. * So, if the given rsb is on the toss list, it is moved to the keep list * before being returned. * * toss_rsb() happens when all local usage of the rsb is done, i.e. no * more refcounts exist, so the rsb is moved from the keep list to the * toss list. * * rsb's on both keep and toss lists are used for doing a name to master * lookups. rsb's that are in use locally (and being refcounted) are on * the keep list, rsb's that are not in use locally (not refcounted) and * only exist for name/master lookups are on the toss list. * * rsb's on the toss list who's dir_nodeid is not local can have stale * name/master mappings. So, remote requests on such rsb's can potentially * return with an error, which means the mapping is stale and needs to * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and * first_lkid is to keep only a single outstanding request on an rsb * while that rsb has a potentially stale master.) */
static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, uint32_t hash, uint32_t b, int dir_nodeid, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { struct dlm_rsb *r = NULL; int our_nodeid = dlm_our_nodeid(); int from_local = 0; int from_other = 0; int from_dir = 0; int create = 0; int error; if (flags & R_RECEIVE_REQUEST) { if (from_nodeid == dir_nodeid) from_dir = 1; else from_other = 1; } else if (flags & R_REQUEST) { from_local = 1; } /* * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so * from_nodeid has sent us a lock in dlm_recover_locks, believing * we're the new master. Our local recovery may not have set * res_master_nodeid to our_nodeid yet, so allow either. Don't * create the rsb; dlm_recover_process_copy() will handle EBADR * by resending. * * If someone sends us a request, we are the dir node, and we do * not find the rsb anywhere, then recreate it. This happens if * someone sends us a request after we have removed/freed an rsb * from our toss list. (They sent a request instead of lookup * because they are using an rsb from their toss list.) */ if (from_local || from_dir || (from_other && (dir_nodeid == our_nodeid))) { create = 1; } retry: if (create) { error = pre_rsb_struct(ls); if (error < 0) goto out; } spin_lock(&ls->ls_rsbtbl[b].lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); if (error) goto do_toss; /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ kref_get(&r->res_ref); error = 0; goto out_unlock; do_toss: error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); if (error) goto do_new; /* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread * is using this rsb because it's on the toss list, so we can * look at or update res_master_nodeid without lock_rsb. */ if ((r->res_master_nodeid != our_nodeid) && from_other) { /* our rsb was not master, and another node (not the dir node) has sent us a request */ log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", from_nodeid, r->res_master_nodeid, dir_nodeid, r->res_name); error = -ENOTBLK; goto out_unlock; } if ((r->res_master_nodeid != our_nodeid) && from_dir) { /* don't think this should ever happen */ log_error(ls, "find_rsb toss from_dir %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); /* fix it and go on */ r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); r->res_first_lkid = 0; } if (from_local && (r->res_master_nodeid != our_nodeid)) { /* Because we have held no locks on this rsb, res_master_nodeid could have become stale. */ rsb_set_flag(r, RSB_MASTER_UNCERTAIN); r->res_first_lkid = 0; } rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); goto out_unlock; do_new: /* * rsb not found */ if (error == -EBADR && !create) goto out_unlock; error = get_rsb_struct(ls, name, len, &r); if (error == -EAGAIN) { spin_unlock(&ls->ls_rsbtbl[b].lock); goto retry; } if (error) goto out_unlock; r->res_hash = hash; r->res_bucket = b; r->res_dir_nodeid = dir_nodeid; kref_init(&r->res_ref); if (from_dir) {