Code:
/* Module
======
mpmux.c
Mpath Socket Multiplex/Demultiplex Module
Copyright 1995, 1996 Mpath Interactive, Inc.
/mpath/mcap/build/rel-2.0.2/src/mpreq/SCCS/s.mpmux.c 1.99 06/21/00 17:12:57
$Header$ */
#define MPLOGMODULE "mpreq_mux"
#if defined(SCCS)
static char* sccs_id="@(#) /mpath/mcap/build/rel-2.0.2/src/mpreq/SCCS/s.mpmux.c 1.99 06/21/00 17:12:57";
#else /* defined(SCCS) */
static char* RcsId="@$Header$";
#endif /* defined(SCCS) */
#include <mptypes.h>
#include <mpdefs.h>
#include <mplib.h>
#include <mpcntrct.h>
#include <mplog.h>
#include <mperrno.h>
#include <mptrace.h>
#include <mpthread.h>
#include <mpproc.h>
#include <mpevent.h>
#include <mpnet.h>
#include <mpoption.h>
#include "mprq_sess.h"
#include "mpio_if.h"
#include "mpmux_if.h"
#include "mpopt_if.h"
#include "mphdr.h"
#include "mpmalloc.h"
/* Internal Manifest
================= */
bool_t
MPMuxPassiveEventHndlr(void* context,
MPIO_EVENTFLAGS flags);
bool_t
MPMuxActiveEventHndlr(void* context,
MPIO_EVENTFLAGS flags);
bool_t
MPMuxRxEventHndlr(MPMUX_HNDL* mux);
void
MPMuxControlHndlr(MPMUX_HNDL* mux,
MPHDR_CNTL* hdr,
MPHDR* c_hdr,
caddr_t buf,
u_int len);
void
MPMuxDestroy(MPMUX_HNDL* mux);
u_int
MPMuxOptionsSetInternal(MPMUX_HNDL* mux,
MPOPT_VEC* opt_vec,
u_int opt_vec_cnt);
MPMUX_HNDL*
MPMuxOpen(MPIO_TYPE io_type,
MPTLADDR* local_addr,
MPTLADDR* remote_addr,
MPOPT_VEC* mpopt_vec,
u_int mpopt_vec_cnt);
MPMUX_HNDL*
MPMuxClone(MPMUX_HNDL* pmux);
u_int
MPMuxGetTlAddr(MPMUX_HNDL* mux,
MPTLADDR* tl_addr,
u_int local_flag);
STATIC void
MPMuxClose(MPMUX_HNDL* mux);
/* Module Global Data
================== */
STATIC MPPROCID MPMuxProcId;
STATIC MPEVENT MPMuxCloseEvent;
/* Sub-Module
==========
MPath Mux-I/O Handle List Objects and Utilities */
/* MPMUXLISTT */
LIST_TYPEDEF(MPMUX, MPMUX_HNDL*);
STATIC MPLOCK MPMuxLockHndl;
STATIC MPMUXLIST PassiveMuxAddrList;
STATIC MPMUXLIST ActiveMuxAddrList;
STATIC MPMUXLIST MPMuxHndlFreeList;
STATIC MPMUXLIST MPMuxHndlAllocList;
STATIC MPLOCK MPMuxHndlListLock;
STATIC u_int MPMuxHndlAllocCount;
STATIC u_int MPMuxHndlFreeCount;
STATIC void
MPMuxHndlHeapInit() {
MPLockCreate(&MPMuxHndlListLock, "mpmux:MPMuxHndlListLock");
return;
}
STATIC void
MPMuxHndlHeapCleanup() {
MPMUX_HNDL* mux;
if(MPMuxHndlAllocCount > MPMuxHndlFreeCount) {
mp_log("000QL", MPLT_DEBUG, "MPMuxHndlHeapCleanup: "
"%u MPMUX handles leaked",
MPMuxHndlAllocCount - MPMuxHndlFreeCount);
}
else if(MPMuxHndlFreeCount > MPMuxHndlAllocCount) {
mp_log("000QM", MPLT_DEBUG, "MPMuxHndlHeapCleanup: "
"%u MPMUX handles redundantly freed",
MPMuxHndlFreeCount - MPMuxHndlAllocCount);
}
MPLock(&MPMuxHndlListLock); /* SMP-LOCK */
while(LIST_COUNT(&MPMuxHndlFreeList)) {
LIST_REMOVE(MPMUX_HNDL, a_forw, a_back, &MPMuxHndlFreeList, mux);
debug_free((caddr_t)mux);
}
while(LIST_COUNT(&MPMuxHndlAllocList)) {
LIST_REMOVE(MPMUX_HNDL, a_forw, a_back, &MPMuxHndlAllocList, mux);
debug_free((caddr_t)mux);
}
MPUnlock(&MPMuxHndlListLock); /* SMP-ULCK */
MPLockDestroy(&MPMuxHndlListLock);
return;
}
STATIC MPMUX_HNDL*
MPMuxHndlAlloc(MPIO_TYPE io_type) {
MPMUX_HNDL* mux;
char lockname[100];
/* Attempt to allocate handle from our free list. If none
allocate a new one. */
MPLock(&MPMuxHndlListLock); /* SMP-LOCK */
if(LIST_COUNT(&MPMuxHndlFreeList)) {
LIST_REMOVE(MPMUX_HNDL, a_forw, a_back, &MPMuxHndlFreeList, mux);
bzero((caddr_t)mux, sizeof(MPMUX_HNDL));
}
else {
if((mux = (MPMUX_HNDL*)
debug_calloc(1, sizeof(MPMUX_HNDL))) == (MPMUX_HNDL*)0) {
MPErrorSet(MPERR_NOMEM);
}
}
if(mux != (MPMUX_HNDL*)0) {
LIST_INSERT(MPMUX_HNDL, a_forw, a_back, &MPMuxHndlAllocList, mux);
}
MPUnlock(&MPMuxHndlListLock); /* SMP-ULCK */
if(mux == (MPMUX_HNDL*)0) {
return mux;
}
sprintf(lockname, "mpmux:mux %x lock", mux);
MPLockCreate(&mux->lock, lockname);
mux->io_type = io_type;
MPREF_INIT(mux);
mux->state = MPMuxBuildingSt;
MPMuxHndlAllocCount++;
mux->plain_connect_timeout = MPMUX_DEFAULT_PLAIN_CONNECT_TIMEOUT;
return mux;
}
STATIC void
MPMuxHndlFree(MPMUX_HNDL* mux) {
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) == 0));
MPLockDestroy(&mux->lock);
MPREF_DESTROY(mux);
MPLock(&MPMuxHndlListLock); /* SMP-LOCK */
LIST_EREMOVE(MPMUX_HNDL, a_forw, a_back, &MPMuxHndlAllocList, mux);
LIST_INSERT(MPMUX_HNDL, a_forw, a_back, &MPMuxHndlFreeList, mux);
MPUnlock(&MPMuxHndlListLock); /* SMP-ULCK */
MPMuxHndlFreeCount++;
}
PRIVATE_INLINE void
MPMuxHndlLock(MPMUX_HNDL* mux) {
MPLock(&mux->lock); /* SMP-LOCK */
return;
}
PRIVATE_INLINE void
MPMuxHndlUnlock(MPMUX_HNDL* mux) {
MPUnlock(&mux->lock); /* SMP-ULCK */
return;
}
STATIC MPMUX_HNDL*
MPMuxRefInc(MPMUX_HNDL* mux) {
MPREF_INC(mux);
return mux;
}
STATIC MPMUX_HNDL*
MPMuxRefDec(MPMUX_HNDL* mux) {
u_int count;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
MPREF_DEC(mux, count);
if(count > 0) {
return mux;
}
mux->state = MPMuxDestroyingSt;
MPMuxDestroy(mux);
return (MPMUX_HNDL*)0;
}
/* External Mux Handle Reference Utilities.
Unlike their internal counterparts, user (ie external) references
are allowed to pass in null handles. This is permitted to allow
caller of mux entry points to increment mux handle refrence, call
entry point, and decrement mux handle reference, without having to
check for errors at every point. */
MPMUX_HNDL*
MPMuxUserRefInc(MPMUX_HNDL* mux) {
MPPrecondition((mux == (MPMUX_HNDL*)0) ||
(MPREF_COUNT(mux) > 0));
if(mux == (MPMUX_HNDL*)0) {
MPErrorSet(MPERR_INVALID);
return mux;
}
return MPMuxRefInc(mux);
}
MPMUX_HNDL*
MPMuxUserRefDec(MPMUX_HNDL* mux) {
MPPrecondition((mux == (MPMUX_HNDL*)0) ||
(MPREF_COUNT(mux) > 0));
if(mux == (MPMUX_HNDL*)0) {
MPErrorSet(MPERR_INVALID);
return mux;
}
return MPMuxRefDec(mux);
}
PRIVATE_INLINE u_int
MPMuxIsPassive(MPMUX_HNDL* mux) {
return mux->io_type == MPIOPassive;
}
PRIVATE_INLINE u_int
MPMuxIsActive(MPMUX_HNDL* mux) {
return mux->io_type == MPIOActive;
}
PRIVATE_INLINE MPIO_TYPE
MPMuxGetType(MPMUX_HNDL* mux) {
return mux->io_type;
}
PRIVATE_INLINE void
MPMuxLock() {
MPLock(&MPMuxLockHndl); /* SMP-LOCK */
return;
}
PRIVATE_INLINE void
MPMuxUnlock() {
MPUnlock(&MPMuxLockHndl); /* SMP-ULCK */
return;
}
STATIC void
MPMuxAddrListInsert(MPMUX_HNDL* mux) {
MPMUXLIST* list;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
!(mux->flags & MPMUXF_ON_ADDR_LIST));
MPMuxRefInc(mux);
mux->flags |= MPMUXF_ON_ADDR_LIST;
list = MPMuxIsActive(mux) ?
&ActiveMuxAddrList :
&PassiveMuxAddrList;
LIST_INSERT(MPMUX_HNDL, addr_forw, addr_back, list, mux);
return;
}
PRIVATE_INLINE void
MPMuxAddrListInsertLk(MPMUX_HNDL* mux) {
MPMuxLock(); /* SMP-LOCK */
MPMuxAddrListInsert(mux);
MPMuxUnlock(); /* SMP-ULCK */
return;
}
STATIC void
MPMuxAddrListRemove(MPMUX_HNDL* mux) {
MPMUXLIST* list;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(mux->flags | MPMUXF_ON_ADDR_LIST));
list = MPMuxIsActive(mux) ?
&ActiveMuxAddrList :
&PassiveMuxAddrList;
LIST_EREMOVE(MPMUX_HNDL, addr_forw, addr_back, list, mux);
MPMuxHndlLock(mux); /* SMP-LOCK */
mux->flags &= ~MPMUXF_ON_ADDR_LIST;
MPMuxHndlUnlock(mux); /* SMP-LOCK */
MPMuxRefDec(mux);
return;
}
PRIVATE_INLINE void
MPMuxAddrListRemoveLk(MPMUX_HNDL* mux) {
MPMuxLock(); /* SMP-LOCK */
MPMuxAddrListRemove(mux);
MPMuxUnlock(); /* SMP-ULCK */
return;
}
PRIVATE_INLINE u_int
MPMuxIsOpen(MPMUX_HNDL* mux) {
return (mux->state >= MPMuxOpenSt);
}
PRIVATE_INLINE u_int
MPMuxIsUp(MPMUX_HNDL* mux) {
return (mux->state > MPMuxDestroyingSt);
}
/* Sub-Module
==========
Per-mux-channel Session List Utilities */
STATIC u_int MPMuxSessListAllocCount;
STATIC u_int MPMuxSessListFreeCount;
STATIC MPMUX_SESSLIST*
SessListAlloc() {
MPMUX_SESSLIST* sess_list;
if((sess_list = (MPMUX_SESSLIST*)debug_calloc(1, sizeof(MPMUX_SESSLIST))) ==
(MPMUX_SESSLIST*)0) {
MPErrorSet(MPERR_NOMEM);
}
else {
MPLockCreate(&sess_list->lock, "mpmux:sess_list->lock");
MPREF_INIT(sess_list);
MPMuxSessListAllocCount++;
}
return sess_list;
}
STATIC void
SessListFree(MPMUX_SESSLIST* sess_list) {
MPPrecondition((sess_list != (MPMUX_SESSLIST*)0) &&
(MPREF_COUNT(sess_list) == 0));
MPLockDestroy(&sess_list->lock);
MPREF_DESTROY(sess_list);
debug_free((caddr_t)sess_list);
MPMuxSessListFreeCount++;
}
PRIVATE_INLINE void
SessListLock(MPMUX_SESSLIST* sess_list) {
MPPrecondition((sess_list != (MPMUX_SESSLIST*)0) &&
(MPREF_COUNT(sess_list) > 0));
MPLock(&sess_list->lock); /* SMP-LOCK */
}
PRIVATE_INLINE void
SessListUnlock(MPMUX_SESSLIST* sess_list) {
MPPrecondition((sess_list != (MPMUX_SESSLIST*)0) &&
(MPREF_COUNT(sess_list) >= 0));
MPUnlock(&sess_list->lock); /* SMP-ULCK */
}
STATIC void
SessListInsert(MPMUX_SESSLIST* sess_list,
MPSESS_HNDL* sess) {
u_int hid_bucket;
u_int haddr_bucket = MPLOCALADDR_TO_HBUCKET(sess->local_addr);
u_int hepoch_bucket;
MPPrecondition((sess_list != (MPMUX_SESSLIST*)0) &&
(MPREF_COUNT(sess_list) > 0));
if(MPSessRefInc(sess, MPSessRefMuxInserted) == (MPSESS_HNDL*)0) {
MPModuleInvariant(false);
}
LIST_INSERT(MPSESS_HNDL, m_forw, m_back, &sess_list->list, sess);
if (sess->type == MPActiveSess) {
hid_bucket = MPLOCALID_TO_HBUCKET(sess->apun.active.local_id);
LIST_INSERT(MPSESS_HNDL,
mhid_forw, mhid_back,
&sess_list->hash_localid[hid_bucket],
sess);
hepoch_bucket = MPEPOCH_TO_HBUCKET(sess->apun.active.epoch);
LIST_INSERT(MPSESS_HNDL,
mhepoch_forw, mhepoch_back,
&sess_list->hash_epoch[hepoch_bucket],
sess);
}
LIST_INSERT(MPSESS_HNDL,
mhaddr_forw, mhaddr_back,
&sess_list->hash_localaddr[haddr_bucket],
sess);
return;
}
PRIVATE_INLINE void
SessListInsertLk(MPMUX_SESSLIST* sess_list,
MPSESS_HNDL* sess) {
SessListLock(sess_list); /* SMP-LOCK */
SessListInsert(sess_list, sess);
SessListUnlock(sess_list); /* SMP-ULCK */
return;
}
STATIC void
SessListRemove(MPMUX_SESSLIST* sess_list,
MPSESS_HNDL* sess) {
u_int hid_bucket;
u_int haddr_bucket = MPLOCALADDR_TO_HBUCKET(sess->local_addr);
u_int hepoch_bucket;
MPPrecondition((sess_list != (MPMUX_SESSLIST*)0) &&
(MPREF_COUNT(sess_list) > 0) &&
(sess != (MPSESS_HNDL*)0));
LIST_EREMOVE(MPSESS_HNDL, m_forw, m_back, &sess_list->list, sess);
if (sess->type == MPActiveSess) {
hid_bucket = MPLOCALID_TO_HBUCKET(sess->apun.active.local_id);
LIST_EREMOVE(MPSESS_HNDL,
mhid_forw, mhid_back,
&sess_list->hash_localid[hid_bucket],
sess);
hepoch_bucket = MPEPOCH_TO_HBUCKET(sess->apun.active.epoch);
LIST_EREMOVE(MPSESS_HNDL,
mhepoch_forw, mhepoch_back,
&sess_list->hash_epoch[hepoch_bucket],
sess);
}
LIST_EREMOVE(MPSESS_HNDL,
mhaddr_forw, mhaddr_back,
&sess_list->hash_localaddr[haddr_bucket],
sess);
MPSessRefDec(sess, MPSessRefMuxInserted);
return;
}
PRIVATE_INLINE void
SessListRemoveLk(MPMUX_SESSLIST* sess_list,
MPSESS_HNDL* sess) {
SessListLock(sess_list); /* SMP-LOCK */
SessListRemove(sess_list, sess);
SessListUnlock(sess_list); /* SMP-ULCK */
return;
}
STATIC u_int
SessListCount(MPMUX_SESSLIST* list) {
return LIST_COUNT(&list->list);
}
PRIVATE_INLINE u_int
SessListCountLk(MPMUX_SESSLIST* list) {
u_int retval;
SessListLock(list); /* SMP-LOCK */
retval = SessListCount(list);
SessListUnlock(list); /* SMP-ULCK */
return retval;
}
STATIC MPSESS_HNDL*
SessListHead(MPMUX_SESSLIST* list) {
return LIST_HEAD(&list->list);
}
STATIC MPMUX_SESSLIST*
SessListRefInc(MPMUX_SESSLIST* list) {
if (list)
MPREF_INC(list);
return list;
}
STATIC MPMUX_SESSLIST*
SessListRefDec(MPMUX_SESSLIST* list) {
u_int count;
MPREF_DEC(list, count);
if(count > 0) {
return list;
}
MPPostcondition(SessListCount(list) == 0);
SessListFree(list);
return (MPMUX_SESSLIST*)0;
}
STATIC u_int
SessListRefCount(MPMUX_SESSLIST* list) {
return MPREF_COUNT(list);
}
void
MPMuxSessListInsert(MPMUX_HNDL* mux,
MPSESS_HNDL* sess) {
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
if(mux->io_type == MPIOPassive) {
SessListInsert(mux->psess_list, sess);
}
else {
SessListInsert(mux->apun.active.asess_list, sess);
}
}
STATIC void
MPMuxSessListInsertLk(MPMUX_HNDL* mux,
MPSESS_HNDL* sess) {
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
MPMuxHndlLock(mux); /* SMP-LOCK */
MPMuxSessListInsert(mux, sess);
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return;
}
STATIC void
MPMuxSessListRemove(MPMUX_HNDL* mux,
MPSESS_HNDL* sess) {
MPMUX_SESSLIST* list;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
list = (mux->io_type == MPIOPassive) ?
mux->psess_list :
mux->apun.active.asess_list;
if(list != (MPMUX_SESSLIST*)0) {
SessListRemove(list, sess);
}
else {
MPModuleInvariant(mux->state == MPMuxClosingSt);
}
return;
}
STATIC void
MPMuxSessListRemoveLk(MPMUX_HNDL* mux,
MPSESS_HNDL* sess) {
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
MPMuxHndlLock(mux); /* SMP-LOCK */
MPMuxSessListRemove(mux, sess);
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return;
}
/* Sub-Module
==========
MPath Request/Response Parameter List Object Utilities */
STATIC void
MPMuxRxBufHeapInit() {
return;
}
STATIC void
MPMuxRxBufHeapCleanup() {
return;
}
STATIC caddr_t
MPMuxRxBufAlloc(u_int n) {
caddr_t hndl;
hndl = debug_calloc(1, n);
return hndl;
}
STATIC void
MPMuxRxBufFree(caddr_t hndl) {
if(hndl == (caddr_t)0) {
mp_log("000QN", MPLT_CRIT, "MPMuxRxBufFree: "
"Attempt to free null parameter list buffer.");
}
debug_free((void*)hndl);
return;
}
/* Function
========
MPMuxLocalIdToSess
Find the MPath Session Handle in the Mpath Channel list
for local_addr/remote_addr. */
MPSESS_HNDL*
MPMuxLocalIdToSess(MPMUX_HNDL* mux,
u_int local_id) {
MPSESS_HNDL *head,
*current,
*sess = (MPSESS_HNDL*)0;
u_int hid_bucket = MPLOCALID_TO_HBUCKET(local_id);
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
MPMuxHndlLock(mux); /* SMP-LOCK */
if(mux->state == MPMuxClosingSt) {
MPMuxHndlUnlock(mux); /* SMP-ULCK */
MPErrorSet(MPERR_INVALID);
mp_log("000QP", MPLT_DEBUG, "MPMuxLocalIdToSess[0x%x]: "
"can't find session by local id on a closing mux",
mux);
return sess;
}
if (!MPMuxIsActive(mux)) {
MPMuxHndlUnlock(mux); /* SMP-ULCK */
MPErrorSet(MPERR_INVALID);
mp_log("000QQ", MPLT_CRIT, "MPMuxLocalIdToSess[0x%x]: "
"Strange, can't possibly get local sessions on a passive MUX",
mux);
return sess;
}
head =
current =
LIST_HEAD(&mux->apun.active.asess_list->hash_localid[hid_bucket]);
if(head == (MPSESS_HNDL*)0) {
MPMuxHndlUnlock(mux); /* SMP-ULCK */
MPErrorSet(MPERR_INTERNAL);
#if defined(SHOW_BOGUS_MESSAGES)
mp_log("000QR", MPLT_DEBUG, "MPMuxLocalIdToSess[0x%x]: "
"Mux is open but session list is empty, possible stale session.",
mux);
#endif /* defined(SHOW_BOGUS_MESSAGES) */
return head;
}
do {
if(current->apun.active.local_id == local_id) {
sess = MPSessRefInc(current, MPSessRefFoundByLocalId);
break;
}
current = current->mhid_forw;
} while(current != head);
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return sess;
}
/* Function
========
MPMuxEpochToActiveSess
Find the Active MPath Session Handle in the Mpath Channel list
for local_addr/remote_addr and epoch. */
MPSESS_HNDL*
MPMuxEpochToActiveSess(MPMUX_SESSLIST* list,
MPADDR local_addr,
MPADDR remote_addr,
uint32_t epoch) {
MPSESS_HNDL *head,
*current,
*sess = (MPSESS_HNDL*)0;
u_int hepoch_bucket = MPEPOCH_TO_HBUCKET(epoch);
if (list == (MPMUX_SESSLIST*) 0)
return (MPSESS_HNDL*) 0;
head =
current =
LIST_HEAD(&list->hash_epoch[hepoch_bucket]);
if(head == (MPSESS_HNDL*)0) {
return head;
}
do {
if((current->local_addr == local_addr) &&
(current->remote_addr == remote_addr) &&
(current->apun.active.epoch == epoch)) {
sess = MPSessRefInc(current, MPSessRefFoundByEpoch);
break;
}
current = current->mhepoch_forw;
} while(current != head);
return sess;
}
/* Function
========
MPMuxLocalAddrToSess
Find the MPath Session Handle in the Mpath Channel list
for local_addr/remote_addr. */
MPSESS_HNDL*
MPMuxLocalAddrToSess(MPMUX_SESSLIST* list,
MPADDR local_addr) {
MPSESS_HNDL *head,
*current,
*sess = (MPSESS_HNDL*)0;
u_int haddr_bucket = MPLOCALADDR_TO_HBUCKET(local_addr);
if(list == (MPMUX_SESSLIST*)0) {
return (MPSESS_HNDL*)0;
}
head =
current =
LIST_HEAD(&list->hash_localaddr[haddr_bucket]);
if(head == (MPSESS_HNDL*)0) {
return head;
}
do {
if(current->local_addr == local_addr) {
sess = MPSessRefInc(current, MPSessRefFoundByLocalAddr);
break;
}
current = current->mhaddr_forw;
} while(current != head);
return sess;
}
void
MPMuxSessListCleanup(MPMUX_HNDL* mux) {
MPMUX_SESSLIST* list;
MPMuxHndlLock(mux); /* SMP-LOCK */
if(MPMuxIsActive(mux)) {
list = mux->apun.active.asess_list;
mux->apun.active.asess_list = (MPMUX_SESSLIST*)0;
}
else {
list = mux->psess_list;
mux->psess_list = (MPMUX_SESSLIST*)0;
}
MPMuxHndlUnlock(mux); /* SMP-LOCK */
if(list != (MPMUX_SESSLIST*)0) {
MPSESS_HNDL* sess;
SessListLock(list); /* SMP-LOCK */
while((sess = SessListHead(list)) != (MPSESS_HNDL*)0) {
MPSESS_HNDL* retval;
retval = MPSessRefInc(sess, MPSessRefMuxCleanup);
SessListRemove(list, sess);
SessListUnlock(list); /* SMP-ULCK */
MPSessRemoteClose(sess, MP_RCLOSE_TLDISC);
SessListLock(list); /* SMP-LOCK */
MPSessRefDec(sess, MPSessRefMuxCleanup);
}
SessListUnlock(list); /* SMP-ULCK */
list = SessListRefDec(list);
/* If this is an active mux, it must be the only referent
of the sess-list at this juncture. Passive sess lists may
be referenced by child active muxes. */
MPModuleInvariant((MPMuxIsActive(mux) &&
(list == (MPMUX_SESSLIST*)0)) ||
MPMuxIsPassive);
}
/* If this is an active mux, get rid of our reference to the
parent passive muxes passive session list if we have one.
Release bitmap of session ids. */
MPMuxHndlLock(mux); /* SMP-LOCK */
if(MPMuxIsActive(mux)) {
if((list = mux->psess_list) != (MPMUX_SESSLIST*)0) {
mux->psess_list = (MPMUX_SESSLIST*)0;
SessListRefDec(list);
}
if(mux->apun.active.asess_bmap != (MPBMAPLIST*)0) {
MPBMapDestroy(mux->apun.active.asess_bmap);
mux->apun.active.asess_bmap = (MPBMAPLIST*)0;
}
}
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return;
}
MPIO_HNDL*
MPMuxToIOChan(MPMUX_HNDL* mux) {
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
return MPIOUserRefInc(mux->io_chan);
}
MPIO_HNDL*
MPMuxToIOChanLk(MPMUX_HNDL* mux) {
MPIO_HNDL* retval;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
MPMuxHndlLock(mux); /* SMP-LOCK */
retval = MPMuxToIOChan(mux);
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return retval;
}
/* Function
========
MPMuxDestroy
If the mux has no more referent sessions, destroy it, otherwise unlock it. */
void
MPMuxDestroy(MPMUX_HNDL* mux) {
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(mux->state == MPMuxDestroyingSt) &&
(mux->flags == 0) &&
(MPREF_COUNT(mux) == 0));
MPTRCID(mux_dest,
mux,
mux->state,
MPREF_COUNT(mux),
0,
0);
mux->state = MPMuxInvalidSt;
MPMuxHndlFree(mux);
return;
}
/* Function
========
MPMuxPlainConnectTmo()
If no session has connected within this amount of time on a MUX, then
close the MUX. */
STATIC void
MPMuxPlainConnectTmo(void* tmo_context) {
MPMUX_HNDL* mux = (MPMUX_HNDL*) tmo_context;
int do_close = false;
MPTLADDR tladdr;
char tladdrstr[256];
if (!mux->has_session_connected) {
MPMuxGetTlAddr(mux, &tladdr, false);
MPTLADDRtostr(&tladdr, tladdrstr);
mp_log("006NC", MPLT_WARN, "No connection received on "
"MUX 0x%x (%s) in %d secs, "
"closing it.",
mux,
tladdrstr,
mux->plain_connect_timeout);
MPMuxClose(mux);
}
MPMuxRefDec(mux); /* get rid of timeout reference count on mux */
}
/* Function
========
MPMuxClose()
Close the mux handle. Remove all sessions from the mux handle. */
STATIC void
MPMuxClose(MPMUX_HNDL* mux) {
MPIO_HNDL* io_chan;
MPTRCID(mux_clos,
mux,
mux->state,
MPREF_COUNT(mux),
0,
0);
MPMuxHndlLock(mux); /* SMP-LOCK */
if(mux->state == MPMuxClosingSt) {
mp_log("000QT", MPLT_DEBUG, "MPMuxClose[0x%x]: "
"mux already closed.", mux);
MPMuxHndlUnlock(mux); /* SMP-ULCK */
goto epilogue;
}
mux->state = MPMuxClosingSt;
io_chan = mux->io_chan;
mux->io_chan = (MPIO_HNDL*)0;
mux->dead_io_chan = io_chan;
MPMuxHndlUnlock(mux); /* SMP-ULCK */
/* Close the I/O channel, and remove reference given
to I/O module. */
if(io_chan != (MPIO_HNDL*)0) {
MPIOClose(io_chan);
MPIOUserRefDec(io_chan);
}
/* Remove mux from address lookup list if necessary */
if(mux->flags & MPMUXF_ON_ADDR_LIST) {
MPMuxAddrListRemoveLk(mux);
}
/* Clear session lists */
MPMuxSessListCleanup(mux);
if (MPUntimeout(MPMuxPlainConnectTmo,
(void*) mux) == 0) {
/* If we succeeded in removal, kill the reference */
MPMuxRefDec(mux);
} else {
/* otherwise it must be running now, let it go */
}
epilogue:
return;
}
#define MPMUX_TLADDR_CONVERT(tl_addr_A, \
sock_addr_A, \
failure_A) do { \
if((tl_addr_A) != (MPTLADDR*)0) { \
u_int sock_type; \
(failure_A) = MPTlToSockAddr((tl_addr_A), \
(sock_addr_A), \
&sock_type); \
} \
else { \
(failure_A) = false; \
(sock_addr_A) = (struct sockaddr*)0; \
} \
}while(0)
/* Function
========
MPMuxOpen
Create a Mux I/O Channel
Environment
============
Caller owns mux-list lock */
STATIC MPMUX_HNDL*
MPMuxOpen(MPIO_TYPE io_type,
MPTLADDR* local_addr,
MPTLADDR* remote_addr,
MPOPT_VEC* mpopt_vec,
u_int mpopt_vec_cnt) {
u_int failure = true;
struct sockaddr local_sock_addr_buf,
* local_sock_addr,
remote_sock_addr_buf,
* remote_sock_addr;
MPMUX_HNDL* mux = (MPMUX_HNDL*)0;
MPTLADDR* tl_addr;
MPIO_DATATYPE data_type;
MPIO_EVENTFLAGS event_flags;
MPIOEventHndlrCBF* event_f;
MPOPT_VEC* opt_vec00,
* opt_vec01;
u_int opt_vec_cnt00,
opt_vec_cnt01;
local_sock_addr = &local_sock_addr_buf;
MPMUX_TLADDR_CONVERT(local_addr, local_sock_addr, failure);
if(failure) {
mp_log("000QV", MPLT_ERR, "MPMuxOpen: "
"Unable to convert local transport layer "
"address to sockaddr." );
goto epilogue;
}
remote_sock_addr = &remote_sock_addr_buf;
MPMUX_TLADDR_CONVERT(remote_addr, remote_sock_addr, failure);
if(failure) {
mp_log("000QW", MPLT_ERR, "MPMuxOpen: "
"Unable to convert remote transport layer "
"address to sockaddr." );
goto epilogue;
}
failure = true;
tl_addr = (local_addr == (MPTLADDR*)0) ? remote_addr : local_addr;
data_type = MPTlAddrIsReliable(tl_addr) ?
MPIOReliableStream :
MPIOUnreliableDatagram;
if((mux = MPMuxHndlAlloc(io_type)) ==
(MPMUX_HNDL*)0) {
mp_log("000QX", MPLT_ERR, "MPMuxOpen: "
"Unable to allocate mux channel handle." );
goto epilogue;
}
MPTRCID(mux_open,
mux,
local_addr ? ((uint32_t*)local_addr)[0] : 0,
local_addr ? ((uint32_t*)local_addr)[1] : 0,
remote_addr ? ((uint32_t*)remote_addr)[0] : 0,
remote_addr ? ((uint32_t*)remote_addr)[1] : 0);
event_flags = (MPIO_EVENTFLAGS)
(MPIORxEvent | MPIOExceptionEvent | MPIORefIncEvent | MPIORefDecEvent);
event_f = (io_type == MPIOPassive) ?
MPMuxPassiveEventHndlr : MPMuxActiveEventHndlr;
/* If this is a passive channel, allocate passive MP channel
list handle. */
if(io_type == MPIOPassive) {
if((mux->psess_list = SessListAlloc()) ==
(MPMUX_SESSLIST*)0) {
mp_log("000QY", MPLT_ERR, "MPMuxOpen[0x%x]: "
"Couldn't allocate passive MP channel list handle.",
mux);
goto epilogue;
}
}
else {
if((mux->apun.active.asess_list = SessListAlloc()) ==
(MPMUX_SESSLIST*)0) {
mp_log("000R0", MPLT_ERR, "MPMuxOpen[0x%x]: "
"Couldn't allocate active MP channel list handle.",
mux);
goto epilogue;
}
if((mux->apun.active.asess_bmap = MPBMapCreate()) ==
(MPBMAPLIST*)0) {
mp_log("000R1", MPLT_ERR, "MPMuxOpen[0x%x]: "
"Couldn't allocate active MP session ID bitmap.",
mux);
goto epilogue;
}
}
mux->state = MPMuxConnectingSt;
/* Add channel to either master passive mux-channel list
or master active mux-channel list. */
MPMuxAddrListInsertLk(mux);
/* Now add any mux options */
MPOPT_VEC_SPLIT(mpopt_vec, mpopt_vec_cnt,
opt_vec00, opt_vec_cnt00,
opt_vec01, opt_vec_cnt01,
MPMUX_OPT_MIN, MPMUX_OPT_MAX,
failure);
MPMuxOptionsSetInternal(mux, opt_vec00, opt_vec_cnt00);
failure = true;
if((mux->io_chan = MPIOOpen(io_type,
data_type,
local_sock_addr,
remote_sock_addr,
event_f,
(void*)mux,
event_flags,
opt_vec01,
opt_vec_cnt01)) == (MPIO_HNDL*)0) {
mp_log("000R2", MPLT_ERR, "MPMuxOpen[0x%x]: "
"Unable to create low-level I/O channel.",
mux);
goto epilogue;
}
MPMuxHndlLock(mux); /* SMP-LOCK */
if(mux->state != MPMuxConnectingSt) {
mp_log("000R3", MPLT_WARN, "MPMuxOpen[0x%x]: "
"Unexpected state transition %u during io open, "
"attempting to shut down.",
mux,
mux->state);
}
else {
mux->state = MPMuxOpenSt;
failure = false;
}
MPMuxHndlUnlock(mux); /* SMP-ULCK */
epilogue:
if(failure) {
MPTRCID(mux_EOPN,
mux,
local_addr ? ((uint32_t*)local_addr)[0] : 0,
local_addr ? ((uint32_t*)local_addr)[1] : 0,
remote_addr ? ((uint32_t*)remote_addr)[0] : 0,
remote_addr ? ((uint32_t*)remote_addr)[1] : 0);
if(mux != (MPMUX_HNDL*)0) {
MPMuxClose(mux);
mux = MPMuxRefDec(mux);
/* We've closed the mux, we've not told anyone
about it, therefore, this must have been the
last reference. */
MPModuleInvariant(mux == (MPMUX_HNDL*)0);
}
}
return mux;
}
/* Function
========
MPMuxClone */
MPMUX_HNDL*
MPMuxClone(MPMUX_HNDL* pmux) {
MPMUX_HNDL* amux = (MPMUX_HNDL*)0;
MPIO_HNDL* close_io_chan = (MPIO_HNDL*) 0;
MPMUX_SESSLIST* psess_list;
bool_t failure = true,
did_plain_connect_timeout = true;
MPIO_EVENTFLAGS event_flags;
/* Range Check */
if((pmux == (MPMUX_HNDL*)0) ||
(!MPMuxIsPassive(pmux))) {
MPErrorSet(MPERR_INVALID);
mp_log("000R4", MPLT_ERR, "MPMuxClone: "
"Invalid parameters.");
goto epilogue;
}
if((amux = MPMuxHndlAlloc(MPIOActive)) == (MPMUX_HNDL*)0) {
MPErrorSet(MPERR_NOMEM);
mp_log("000R5", MPLT_ERR, "MPMuxClone[0x%x]: "
"Unable to allocate MPath Active Channel Handle.",
pmux);
goto epilogue;
}
/* Copy parent's plain connect timeout setting */
amux->plain_connect_timeout = pmux->plain_connect_timeout;
if((amux->apun.active.asess_list = SessListAlloc()) ==
(MPMUX_SESSLIST*)0) {
mp_log("000R6", MPLT_ERR, "MPMuxClone[0x%x]: "
"Couldn't allocate active MP channel list handle.",
pmux);
goto epilogue;
}
if((amux->apun.active.asess_bmap = MPBMapCreate()) == (MPBMAPLIST*)0) {
MPErrorSet(MPERR_NOMEM);
mp_log("000R7", MPLT_ERR, "MPMuxClone[0x%x]: "
"Couldn't allocate active MP session ID bitmap.",
pmux);
goto epilogue;
}
MPTRCID(mux_clon,
amux,
pmux,
pmux->state,
MPREF_COUNT(pmux),
0);
MPMuxHndlLock(pmux); /* SMP-LOCK */
if((psess_list = SessListRefInc(pmux->psess_list)) ==
(MPMUX_SESSLIST*)0) {
mp_log("0046A", MPLT_ERR, "MPMuxClone[0x%x]: "
"Failed to increment reference on mux's session list.",
pmux);
MPMuxHndlUnlock(pmux);
goto epilogue;
}
MPMuxAddrListInsertLk(amux);
MPMuxHndlUnlock(pmux); /* SMP-LOCK */
MPMuxHndlLock(amux); /* SMP-LOCK */
amux->psess_list = psess_list;
amux->state = MPMuxCloningSt;
amux->has_session_connected = false;
MPMuxHndlUnlock(amux); /* SMP-ULCK */
/* Clone low-level I/O channel. */
event_flags = (MPIO_EVENTFLAGS)(MPIORefIncEvent |
MPIORefDecEvent |
MPIORxEvent |
MPIOExceptionEvent);
if((amux->io_chan = MPIOClone(pmux->io_chan,
MPMuxActiveEventHndlr,
amux,
event_flags)) == (MPIO_HNDL*)0) {
MPERROR err = MPErrorGet();
if (err != MPIO_EWOULDBLOCK) {
mp_log("000R8", MPLT_ERR, "MPMuxClone[0x%x]: "
"Unable to clone low-level I/O channel.",
pmux);
} else {
/* otherwise it's just that there was no connection to get */
}
goto epilogue;
}
MPMuxHndlLock(amux); /* SMP-LOCK */
if ((amux->state == MPMuxClosingSt) &&
(amux->dead_io_chan == (MPIO_HNDL*) 0) &&
(amux->io_chan != (MPIO_HNDL*) 0)) {
mp_log(MPL_CRIT, "Kludge: "
"mux 0x%x was closed while cloning but "
"we have cloned a valid io channel, "
"resetting it.",
amux);
close_io_chan = amux->io_chan;
amux->io_chan = (MPIO_HNDL*) 0;
amux->dead_io_chan = close_io_chan;
/* Close the IO channel */
MPIOClose(close_io_chan);
MPIOUserRefDec(close_io_chan);
MPMuxHndlUnlock(amux); /* SMP-ULCK */
goto epilogue;
}
/* Start plain connect timeout. If no session appears on the MUX within
the plain connect timeout's time limit, then close the MUX. */
if (amux->plain_connect_timeout && !amux->has_session_connected) {
MPTimeout(MPMuxPlainConnectTmo,
(void*) amux,
amux->plain_connect_timeout * 1000,
false);
MPMuxRefInc(amux); /* reference count for timeout task */
did_plain_connect_timeout = true;
}
MPMuxHndlUnlock(amux); /* SMP-ULCK */
failure = false;
epilogue:
if(failure) {
if(amux != (MPMUX_HNDL*)0) {
MPMuxClose(amux);
amux = MPMuxRefDec(amux);
if (did_plain_connect_timeout) {
/* Get rid of the plain connect timeout and its reference */
if (amux != (MPMUX_HNDL*) 0) {
if (MPUntimeout(MPMuxPlainConnectTmo,
(void*) amux) == 0) {
/* If we succeeded in removal, kill the reference */
amux = MPMuxRefDec(amux);
} else {
/* otherwise it must be running now */
amux = (MPMUX_HNDL*) 0; /* timeout task will close it */
}
}
}
/* We've closed the mux, we've not told anyone
about it, therefore, this must have been the
last reference. */
MPModuleInvariant(amux == (MPMUX_HNDL*)0);
}
}
return amux;
}
/* Sub-Module
==========
Mpath Mux I/O Entry Points */
/* Function
========
MPMuxFindAddr
Find a Mux I/O Channel Handle for given channel type, I/O family,
and transport-layer addresses.
Environment
===========
Caller owns mux-list lock. */
STATIC MPMUX_HNDL*
MPMuxFindAddr(MPIO_TYPE io_type,
MPTLADDR* local_addr,
MPTLADDR* remote_addr) {
MPMUX_HNDL *head,
*current;
MPTLADDR* tl_addr;
u_int found = false,
failure = false;
MPIO_DATATYPE data_type;
MPMUXLIST* list;
struct sockaddr local_sock_addr_buf,
* local_sock_addr,
remote_sock_addr_buf,
* remote_sock_addr;
local_sock_addr = &local_sock_addr_buf;
MPMUX_TLADDR_CONVERT(local_addr, local_sock_addr, failure);
if(failure) {
mp_log("000R9", MPLT_ERR, "MPMuxFindAddr: "
"Unable to convert local transport layer "
"address to sockaddr." );
return (MPMUX_HNDL*)0;
}
remote_sock_addr = &remote_sock_addr_buf;
MPMUX_TLADDR_CONVERT(remote_addr, remote_sock_addr, failure);
if(failure) {
mp_log("000RA", MPLT_ERR, "MPMuxFindAddr: "
"Unable to convert remote transport layer "
"address to sockaddr." );
return (MPMUX_HNDL*)0;
}
tl_addr = (local_addr == (MPTLADDR*)0) ? remote_addr : local_addr;
data_type = MPTlAddrIsReliable(tl_addr) ?
MPIOReliableStream :
MPIOUnreliableDatagram;
MPMuxLock(); /* SMP-LOCK */
list = (io_type == MPIOPassive) ?
&PassiveMuxAddrList :
&ActiveMuxAddrList;
head =
current =
LIST_HEAD(list);
if(head == (MPMUX_HNDL*)0) {
MPMuxUnlock(); /* SMP-ULCK */
return head;
}
/* Search for suitable existing channel.
For active channels, search for one with
precisely the same remote address. If
found verify that the local address is
either an exact or wildcard match.
For passive channels, search for one with
precisely the same local address. Remote
addresses are not compared, since, in general,
they must be wildcards.
Note:
Since we hold the mux handle lock for the duration of
the io-*-addr-eq calls, it is not necessary to get
a reference on the io-channel. We implicitly use the
original one. */
do {
if(MPMuxIsOpen(current)) {
MPMuxHndlLock(current); /* SMP-LOCK */
if(!MPMuxIsOpen(current)) {
MPMuxHndlUnlock(current); /* SMP-ULCK */
current = current->addr_forw;
continue;
}
if(io_type == MPIOActive) {
if(MPIORemoteAddrEq(current->io_chan,
data_type,
remote_sock_addr,
false)) {
found = ((local_sock_addr == (struct sockaddr*)0) ||
MPIOLocalAddrEq(current->io_chan,
data_type,
local_sock_addr,
true));
}
}
else {
found = MPIOLocalAddrEq(current->io_chan,
data_type,
local_sock_addr,
false);
}
MPMuxHndlUnlock(current); /* SMP-ULCK */
}
if(found == true) {
/* See if this mux channel is in a good state to return
to caller. If reference increment succeeds it is,
otherwise it was on the way down, so continue searching. */
if(MPMuxRefInc(current) == (MPMUX_HNDL*)0) {
found = false;
}
else {
break;
}
}
current = current->addr_forw;
} while(current != head);
MPMuxUnlock(); /* SMP-ULCK */
return found ? current : (MPMUX_HNDL*)0;
}
/* Function
========
MPMuxSessAdd
Add an Mpath Request/Response channel to a Mux I/O Channel. */
MPMUX_HNDL*
MPMuxSessAdd(MPSESS_HNDL* sess,
MPTLADDR* local_addr,
MPTLADDR* remote_addr,
u_int create_flag,
MPOPT_VEC* mpopt_vec,
u_int mpopt_vec_cnt) {
MPMUX_HNDL* mux = (MPMUX_HNDL*)0;
MPIO_TYPE io_type;
u_int failure = true;
if(sess == (MPSESS_HNDL*)0) {
MPErrorSet(MPERR_INVALID);
mp_log("000RB", MPLT_ERR, "MPMuxSessAdd: "
"Invalid Parameters.");
goto epilogue;
}
io_type = MPSessIsActive(sess) ? MPIOActive : MPIOPassive;
if(((io_type == MPIOActive) &&
(remote_addr == (MPTLADDR*)0)) ||
((io_type == MPIOPassive) &&
(local_addr == (MPTLADDR*)0))) {
MPErrorSet(MPERR_INVALID);
mp_log(MPL_ERR, "MPMuxSessAdd[0x%x]: "
"Invalid Parameters. Missing address",
sess);
goto epilogue;
}
if((mux = MPMuxFindAddr(io_type,
local_addr,
remote_addr)) != (MPMUX_HNDL*)0) {
/* We found the MUX, great. Now we can add the session to it.
* If necessary, set any mux options that apply to this mux,
* some may have changed and we may be expected to apply those changes.
*/
MPMuxOptionsSetInternal(mux, mpopt_vec, mpopt_vec_cnt);
}
else if(create_flag) {
if((mux = MPMuxOpen(io_type,
local_addr,
remote_addr,
mpopt_vec,
mpopt_vec_cnt)) == (MPMUX_HNDL*)0) {
goto epilogue;
}
}
MPMuxHndlLock(mux); /* SMP-LOCK */
if(MPSessIsActive(sess)) {
if (mux->apun.active.asess_bmap != (MPBMAPLIST*) 0) {
sess->apun.active.local_id = MPBMapAlloc(mux->apun.active.asess_bmap);
} else {
/* Whups, our BMap is gone, we must be shutting down */
MPMuxHndlUnlock(mux);
MPErrorSet(MPERR_INTERNAL);
mp_log(MPL_ERR, "MPMuxSessAdd[0x%x]: "
"MUX has closed down, cannot add session.",
sess);
goto epilogue;
}
}
MPMuxSessListInsert(mux, sess);
mux->has_session_connected = true;
MPMuxHndlUnlock(mux); /* SMP-ULCK */
MPTRCID(mux_sadd,
mux,
sess,
0,
0,
0);
failure = false;
epilogue:
if(failure) {
if(mux != (MPMUX_HNDL*)0) {
MPMuxRefDec(mux);
}
mux = (MPMUX_HNDL*)0;
}
return mux;
}
/* Function
========
MPMuxSessRemove
Remove an Mpath Request/Response channel to a Mux I/O Channel. */
void
MPMuxSessRemove(MPMUX_HNDL* mux,
MPSESS_HNDL* sess) {
MPMUX_SESSLIST* list;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0) &&
(sess != (MPSESS_HNDL*)0));
MPTRCID(mux_srem,
mux,
sess,
mux->state,
MPREF_COUNT(mux),
0);
/* NEEDSWORK:
We'd like to assert that session is in this mux, but it
it difficult to do so. */
MPMuxHndlLock(mux); /* SMP-LOCK */
MPMuxSessListRemove(mux, sess);
list = MPMuxIsActive(mux) ?
mux->apun.active.asess_list :
mux->psess_list;
if(mux->apun.active.asess_bmap != (MPBMAPLIST*)0) {
/* Only free the bmap entry if we haven't previously cleaned out the MUX--
the way to tell is to see if the bmap is there or not. */
if(MPSessIsActive(sess)) {
MPBMapFree(mux->apun.active.asess_bmap,
sess->apun.active.local_id);
}
}
if(list != (MPMUX_SESSLIST*)0) {
/* If this session was the last one on this mux-channel,
close the channel */
if(SessListCountLk(list) == 0) {
MPMuxHndlUnlock(mux); /* SMP-ULCK */
MPMuxClose(mux);
MPMuxHndlLock(mux); /* SMP-LOCK */
}
}
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return;
}
/* Function
========
MPMuxSessSetNewKeys
When epoch and local ID are to be changed for a session, remove it from its
old hash tables in the MUX and insert it into new ones. */
void
MPMuxSessSetNewKeys(MPMUX_HNDL* mux,
MPSESS_HNDL* sess,
u_int local_id,
uint32_t epoch,
MPADDR local_addr) {
MPMUX_SESSLIST* list;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0) &&
(sess != (MPSESS_HNDL*)0));
MPTRCID(mux_sset,
mux,
sess,
mux->state,
MPREF_COUNT(mux),
0);
MPMuxHndlLock(mux); /* SMP-LOCK */
MPMuxSessListRemove(mux, sess);
if (sess->type == MPActiveSess) {
sess->apun.active.local_id = local_id;
sess->apun.active.epoch = epoch;
}
sess->local_addr = local_addr;
MPMuxSessListInsert(mux, sess);
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return;
}
/* Sub-Module
==========
Mpath I/O asychronous event entry points */
/* Function
========
MPMuxRxEventHndlr */
bool_t
MPMuxRxEventHndlr(MPMUX_HNDL* mux) {
MPIO_HNDL* io_chan;
MPSESS_HNDL* sess = (MPSESS_HNDL*)0;
MPHDR_DATA* hdr;
MPINFLIGHT_MSG *rx_msg;
caddr_t rx_buf;
u_int failure = true;
char garbage[128];
int pt_buf_len;
u_int rx_buf_len;
int retval;
u_int done = false,
close_flag = false;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPMuxIsActive(mux)));
/* Get local copy of io-channel.
Note:
Since this function can only be called out of the context
of the io-layer, the io-channel must have a reference for
this thread, and thus we don't need to increment the io-chan
reference everytime we call back into the io-layer. It is
possible that another thread closed the io-channel just as
this was being called, so we record the io-channel under
mux-handle lock protection, and only proceed if it is valid. */
MPMuxHndlLock(mux); /* SMP-LOCK */
if((io_chan = mux->io_chan) == (MPIO_HNDL*)0) {
MPMuxHndlUnlock(mux); /* SMP-ULCK */
mp_log("000RC", MPLT_DEBUG, "MPMuxRxEventHndlr[0x%x]: "
"Recv event on closed io-channel, aborting recv.",
mux);
return failure;
}
/* Read Mpath Request/Response Message Header. */
rx_msg = &mux->apun.active.rx_msg;
hdr = &rx_msg->hdr.data;
MPMuxHndlUnlock(mux); /* SMP-ULCK */
MPTRCID(mux_rxev, mux,
mux->state, MPREF_COUNT(mux), mux->flags, rx_msg->state);
do {
switch(rx_msg->state) {
case MPInflightPTHdrStartSt:
/* Read minimum number of header bytes */
rx_msg->buf = (caddr_t)&rx_msg->c_hdr;
rx_msg->buf_len = MPHDR_MIN_LEN;
rx_msg->resid = MPHDR_MIN_LEN;
rx_msg->state = MPInflightPTHdrMinSt;
/* FALL THROUGH */
case MPInflightPTHdrMinSt:
rx_buf = &rx_msg->buf[rx_msg->buf_len - rx_msg->resid];
if((retval = MPIORx(io_chan,
rx_buf,
rx_msg->resid)) < 0) {
/* Note:
We don't log this failure because it is the
expected case when the remote side closes after
sending a message.
We do log closes in the middle of a message at
debug level because even though we don't mind them
we find them somewhat impolite. */
close_flag = true;
goto epilogue;
}
rx_msg->resid -= retval;
if(rx_msg->resid != 0) {
done = true;
break;
}
/* If we have completed reading the minimum number of
header bytes, figure out if we are looking at a
valid header and how much more we have to read. */
if(((pt_buf_len = MPPTHdrPackedLen(&rx_msg->c_hdr)) < 0) ||
(pt_buf_len < MPHDR_MIN_LEN)) {
mp_log("000RE", MPLT_ERR, "MPMuxRxEventHndlr[0x%x]: "
"Incorrect header length 0x%x, state 0x%x resid 0x%x..."
"closing mux.",
mux,
pt_buf_len,
rx_msg->state,
rx_msg->resid);
close_flag = true;
goto epilogue;
}
/* Since we're still reading into the same buffer (c_hdr),
reset length and residual to reflect new total
header length. */
rx_msg->resid = (u_int) pt_buf_len - rx_msg->buf_len;
rx_msg->buf_len = (u_int) pt_buf_len;
rx_msg->state = (rx_msg->resid == 0) ?
MPInflightPTHdrCompleteSt : MPInflightPTHdrResidSt;
break;
case MPInflightPTHdrResidSt:
rx_buf = &rx_msg->buf[rx_msg->buf_len - rx_msg->resid];
if((retval = MPIORx(io_chan,
rx_buf,
rx_msg->resid)) < 0) {
mp_log("000RF", MPLT_ERR, "MPMuxRxEventHndlr[0x%x]: "
"MPIORx of header failed 0x%x, "
"state 0x%x resid 0x%x...closing mux.",
mux,
retval,
rx_msg->state,
rx_msg->resid);
close_flag = true;
goto epilogue;
}
rx_msg->resid -= retval;
if(rx_msg->resid != 0) {
done = true;
break;
}
rx_msg->state = MPInflightPTHdrCompleteSt;
/* FALL THROUGH */
case MPInflightPTHdrCompleteSt:
if(MPPTHdrUnpack(&rx_msg->c_hdr, &rx_msg->hdr)) {
mp_log("000RG", MPLT_ERR, "MPMuxRxEventHndlr[0x%x]: "
"Unpack of header failed 0x%x, state 0x%x resid 0x%x..."
"closing mux.",
mux,
retval,
rx_msg->state,
rx_msg->resid);
close_flag = true;
goto epilogue;
}
/* Allocate a buffer into which to receive the remaining
cipher-text portion of the header, the cipher-text
request/response parameter data and any pad bytes, and
the final enciphered sequence number */
rx_msg->buf_len =
rx_msg->resid =
hdr->dlen +
hdr->plen +
MPCTHdrPackedLen(&rx_msg->c_hdr) +
MPHDR_SEQ01_LEN;
if((rx_msg->buf = MPMuxRxBufAlloc(rx_msg->buf_len)) == (caddr_t)0) {
mp_log("000RH", MPLT_ERR, "MPMuxRxEventHndlr[0x%x]: "
"Unable to allocate 0x%x bytes "
"for rx-buf, discarding message",
mux,
rx_msg->buf_len);
rx_msg->state = MPInflightTruncateSt;
}
else {
rx_msg->state = MPInflightCipherDataSt;
}
break;
case MPInflightCipherDataSt:
rx_buf = (caddr_t)(&rx_msg->buf[rx_msg->buf_len] - rx_msg->resid);
if((retval = MPIORx(io_chan,
rx_buf,
rx_msg->resid)) < 0) {
mp_log("000RJ", MPLT_ERR, "MPMuxRxEventHndlr[0x%x]: "
"MPIORx of cipher data failed 0x%x, "
"state 0x%x resid 0x%x...closing mux.",
mux,
retval,
rx_msg->state,
rx_msg->resid);
close_flag = true;
goto epilogue;
}
rx_msg->resid-= retval;
if(rx_msg->resid != 0) {
done = true;
break;
}
rx_msg->state = MPInflightDoneSt;
break;
case MPInflightTruncateSt:
MPTRCID(mux_RXTR,
mux, 0, 0, rx_msg, rx_msg->resid);
while(rx_msg->resid) {
rx_buf_len = min(rx_msg->resid, sizeof(garbage));
if((retval = MPIORx(io_chan,
&garbage[0],
rx_buf_len)) < 0) {
mp_log("000RK", MPLT_ERR, "MPMuxRxEventHndlr[0x%x]: "
"MPIORx of truncated data failed 0x%x, "
"state 0x%x resid 0x%x...closing mux.",
mux,
retval,
rx_msg->state,
rx_msg->resid);
close_flag = true;
goto epilogue;
}
rx_msg->resid -= retval;
if(retval == 0) {
break;
}
}
if(rx_msg->resid != 0) {
done = true;
break;
}
rx_msg->state = MPInflightDoneSt;
/* FALL THROUGH */
case MPInflightDoneSt:
rx_buf = rx_msg->buf;
rx_buf_len = rx_msg->buf_len;
rx_msg->state = MPInflightPTHdrStartSt;
rx_msg->buf = (caddr_t)0;
rx_msg->buf_len = 0;
if(rx_msg->hdr.generic.type == MPHDR_DATA_TYPE) {
if((sess = MPMuxLocalIdToSess(mux, hdr->sid)) ==
(MPSESS_HNDL*)0) {
#if defined(SHOW_BOGUS_MESSAGES)
mp_log("000RL", MPLT_ERR, "MPMuxRxEventHndlr[0x%x]: "
"Failed to find session handle "
"for id %x",
mux,
hdr->sid);
#endif /* defined(SHOW_BOGUS_MESSAGES) */
}
else {
MPSessRxEventHndlr(sess,
&rx_msg->hdr.data,
&rx_msg->c_hdr,
rx_buf,
rx_buf_len);
MPSessRefDec(sess, MPSessRefFoundByLocalId);
}
}
else {
MPMuxControlHndlr(mux,
&rx_msg->hdr.cntl,
&rx_msg->c_hdr,
rx_buf,
rx_buf_len);
}
if(rx_buf != (caddr_t)0) {
MPMuxRxBufFree(rx_buf);
}
/* If this was a new mux handle, but the first
message we got was a bogus data message for
an unknown session, close the mux know since
we started it up for a dead session.
Note:
We don't need to get lock when looking at
state, because if we are in the cloning state, we
can only get out of it in this handler in
the context of the io thread. */
if(mux->state == MPMuxCloningSt) {
close_flag = true;
/* We do not want to log this message because we get it
way too often when a server has just restarted and
is dealing with the messages from just-cut-off UDP
sessions. It doesn't really buy us hacking protection
because it isn't an ALERT. */
#if defined(SHOW_BOGUS_MESSAGES)
mp_log("000RM", MPLT_DEBUG, "MPMuxRxEventHndlr[0x%x]: "
"Bogus message on new mux, closing.",
mux);
#endif /* defined(SHOW_BOGUS_MESSAGES) */
}
done = true;
break;
default:
mp_log("000RN", MPLT_ERR, "MPMuxRxEventHndlr[0x%x]: "
"Illegal state.",
mux);
done = true;
break;
}
} while(!done);
failure = false;
epilogue:
if(close_flag) {
failure = true;
MPTRCID(mux_RXCL,
mux, mux->state, mux->flags, 0, 0);
MPMuxClose(mux);
}
return failure;
}
bool_t
MPMuxActiveEventHndlr(void* context,
MPIO_EVENTFLAGS flags) {
MPMUX_HNDL* mux = (MPMUX_HNDL*)context;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(mux) > 0));
if(flags & MPIORefIncEvent) {
MPMuxRefInc(mux); return false;
}
else if(flags & MPIORefDecEvent) {
MPMuxRefDec(mux); return false;
}
else if(flags & MPIORxEvent) {
MPMuxRxEventHndlr(mux); return false;
}
mp_log(MPL_WARN, "MPMuxActiveEventHndlr(0x%x): did not handle flag 0x%x",
mux,
flags);
return true;
}
bool_t
MPMuxPassiveEventHndlr(void* context,
MPIO_EVENTFLAGS flags) {
MPMUX_HNDL* pmux = (MPMUX_HNDL*)context;
MPMUX_HNDL* amux;
bool_t failure = true;
MPERRNO error;
MPPrecondition((pmux != (MPMUX_HNDL*)0) &&
(MPREF_COUNT(pmux) > 0));
if(flags & MPIORefIncEvent) {
MPMuxRefInc(pmux); return false;
}
else if(flags & MPIORefDecEvent) {
MPMuxRefDec(pmux); return false;
}
else if(flags & MPIORxEvent) {
if((amux = MPMuxClone(pmux)) == (MPMUX_HNDL*)0) {
error = MPErrorGet();
if (error != MPIO_EWOULDBLOCK) {
mp_log("000RP", MPLT_ERR, "MPMuxAcceptEventHndlr[0x%x]: "
"Unable to clone passive channel.",
pmux);
}
}
else {
/* Hack:
MPMuxClone gives us a reference, but we don't need
it since we aren't calling up to the session
layer yet, so just get rid of it. */
MPMuxRefDec(amux);
failure = false;
}
}
else {
mp_log_abort("0046B", MPLT_CRIT, "MPMuxPassiveEventHndlr[0x%x]: "
"Unexpected or illegal event on passive channel, "
"flags=%x.",
pmux,
flags);
}
return failure;
}
STATIC void
MPMuxConnectHndlr(MPMUX_HNDL* mux,
MPHDR_CNTL* hdr,
MPHDR* c_hdr,
caddr_t data,
u_int data_len) {
MPSESS_HNDL *a_sess,
*p_sess;
MPCONN_PARAMS* params = (MPCONN_PARAMS*)data;
MPADDR local_addr,
remote_addr;
uint32_t epoch;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPMuxIsActive(mux)) &&
(hdr != (MPHDR_CNTL*)0) &&
(((data != (caddr_t)0) &&
(data_len > 0)) ||
((data == (caddr_t)0) &&
(data_len == 0))));
/* Find passive session handle specified by connector's
destination address. */
epoch = ntohl(params->epoch);
local_addr = ntohl(params->dst_addr);
remote_addr = ntohl(params->src_addr);
MPMuxHndlLock(mux); /* SMP-LOCK */
/* See if we've already handled this connection, if so, let
session handler handle the ack */
if((a_sess = MPMuxEpochToActiveSess(mux->apun.active.asess_list,
local_addr,
remote_addr,
epoch)) != (MPSESS_HNDL*)0) {
MPMuxHndlUnlock(mux); /* SMP-ULCK */
MPSessConnectHndlr(a_sess,
hdr,
c_hdr,
data,
data_len);
MPSessRefDec(a_sess, MPSessRefFoundByEpoch);
return;
}
if((p_sess = MPMuxLocalAddrToSess(mux->psess_list, local_addr))
== (MPSESS_HNDL*)0) {
mp_log("000RQ", MPLT_INFO, "MPMuxConnectHndlr[0x%x]: "
"Request to connect to unbound server address "
"0x%08x from 0x%08x.",
mux,
local_addr,
remote_addr);
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return;
}
if (!MPMuxIsUp(mux)) {
mp_log("006BY", MPLT_ERR, "Unable to add session to mux 0x%x "
"which is in state %d",
mux,
mux->state);
MPMuxHndlUnlock(mux);
return;
}
MPMuxHndlUnlock(mux); /* SMP-ULCK */
a_sess = MPSessClone(p_sess);
/* Done with passive session, so get rid of reference. */
MPSessRefDec(p_sess, MPSessRefFoundByLocalAddr);
if(a_sess == (MPSESS_HNDL*)0) {
mp_log("000RR", MPLT_WARN, "MPMuxConnectHndlr[0x%x]: "
"Unable to clone active MPath channel from passive"
"one for 0x%08x/0x%08x",
mux,
local_addr,
remote_addr);
return;
}
MPMuxHndlLock(mux); /* SMP-LOCK */
/* Increment reference to mux for a_sess's pointer. */
MPMuxRefInc(mux);
if (mux->apun.active.asess_bmap != (MPBMAPLIST*) 0) {
a_sess->apun.active.local_id = MPBMapAlloc(mux->apun.active.asess_bmap);
} else {
MPMuxHndlUnlock(mux);
mp_log("000RR", MPLT_WARN, "MPMuxConnectHndlr[0x%x]: "
"MUX is closing, can't add session to MPBMap",
mux);
return;
}
a_sess->mux = mux;
a_sess->local_addr = local_addr;
a_sess->remote_addr = remote_addr;
a_sess->apun.active.remote_id = hdr->sid;
MPMuxSessListInsert(mux, a_sess);
mux->has_session_connected = true;
if(mux->state == MPMuxCloningSt) {
mux->state = MPMuxOpenSt;
}
MPMuxHndlUnlock(mux); /* SMP-ULCK */
/* Now that session is hooked up to mux, call session connect handler
to handle ack */
MPSessConnectHndlr(a_sess,
hdr,
c_hdr,
data,
data_len);
MPSessRefDec(a_sess, MPSessRefCreation);
return;
}
void
MPMuxConnectAckHndlr(MPMUX_HNDL* mux,
MPHDR_CNTL* hdr,
MPHDR* c_hdr,
caddr_t data,
u_int data_len) {
MPCONN_PARAMS* params = (MPCONN_PARAMS*)data;
MPSESS_HNDL *a_sess;
MPADDR local_addr,
remote_addr;
uint32_t epoch;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(MPMuxIsActive(mux)) &&
(hdr != (MPHDR_CNTL*)0) &&
(((data != (caddr_t)0) &&
(data_len > 0)) ||
((data == (caddr_t)0) &&
(data_len == 0))));
if(data_len < sizeof(MPCONN_PARAMS)) {
mp_log("000RT", MPLT_WARN, "MPMuxConnectAckHndlr[0x%x]: "
"Truncated connect request, got %d bytes, expected %d.",
mux,
data_len,
sizeof(MPCONN_PARAMS));
return;
}
epoch = ntohl(params->epoch);
local_addr = ntohl(params->dst_addr);
remote_addr = ntohl(params->src_addr);
/* Find passive session handle specified by connector's
destination address. */
MPMuxHndlLock(mux); /* SMP-LOCK */
if((a_sess = MPMuxEpochToActiveSess(mux->apun.active.asess_list,
local_addr,
remote_addr,
epoch))
== (MPSESS_HNDL*)0) {
mp_log("000RV", MPLT_INFO, "MPMuxConnectAckHndlr[0x%x]: "
"Connect ack destination unknown, "
"local=0x%08x remote=0x%08x.",
mux,
local_addr,
remote_addr);
MPMuxHndlUnlock(mux); /* SMP-ULCK */
return;
}
MPMuxHndlUnlock(mux); /* SMP-ULCK */
a_sess->local_addr = local_addr;
a_sess->apun.active.remote_id = hdr->sid;
MPSessConnectAckHndlr(a_sess,
hdr,
c_hdr,
data,
data_len);
MPSessRefDec(a_sess, MPSessRefFoundByEpoch);
return;
}
void
MPMuxControlHndlr(MPMUX_HNDL* mux,
MPHDR_CNTL* hdr,
MPHDR* c_hdr,
caddr_t buf,
u_int len) {
MPSESS_HNDL* sess;
MPHDR tmp_hdr;
u_int ct_len;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(hdr != (MPHDR_CNTL*)0) &&
(c_hdr != (MPHDR*)0));
/* All control message headers are entirely plaintext, so
unpack the complete header, and read in optional
parameter data if any, then call appropriate control
message handler. */
MPCTHdrUnpack(c_hdr, buf, &tmp_hdr);
ct_len = MPCTHdrPackedLen(c_hdr);
if((hdr->dlen + ct_len + hdr->plen + MPHDR_CSUM_LEN) != len) {
mp_log("000RW", MPLT_WARN, "MPMuxControlHndlr[0x%x]: "
"Malformed control message: "
"dlen 0x%x, ct_len 0x%x, plen 0x%x, len 0x%x",
mux,
hdr->dlen,
ct_len,
hdr->plen,
len);
return;
}
switch(tmp_hdr.cntl.cntl) {
case MPCNTL_CONN:
MPMuxConnectHndlr(mux,
&tmp_hdr.cntl,
c_hdr,
buf,
len);
break;
case MPCNTL_CACK:
MPMuxConnectAckHndlr(mux,
&tmp_hdr.cntl,
c_hdr,
buf,
len);
break;
default:
if((sess = MPMuxLocalIdToSess(mux, tmp_hdr.cntl.sid)) !=
(MPSESS_HNDL*)0) {
MPSessCntlEventHndlr(sess, &tmp_hdr.cntl, c_hdr,
buf,
len);
MPSessRefDec(sess, MPSessRefFoundByLocalId);
}
break;
}
return;
}
/* Sub-Module
==========
Mpath Request Response Protocol Message Transmit */
/* Function
========
MPMuxTxV()
Transmit data in iovec. */
int
MPMuxTxV(MPMUX_HNDL* mux,
MPIO_VEC* iovec,
u_int iovec_cnt) {
MPIO_HNDL* io_chan = (MPIO_HNDL*)0;
int retval = -1;
if((mux == (MPMUX_HNDL*)0) ||
((iovec == (MPIO_VEC*)0) &&
(iovec_cnt > 0)) ||
((iovec != (MPIO_VEC*)0) &&
(iovec_cnt == 0))) {
MPErrorSet(MPERR_INVALID);
mp_log("000RX", MPLT_ERR, "MPMuxTxV(%x %x %u): "
"Invalid parameters.",
mux, iovec, iovec_cnt);
return retval;
}
MPTRCID(mux_txv_,
mux, mux->state, MPREF_COUNT(mux), iovec, iovec_cnt);
if((io_chan = MPMuxToIOChanLk(mux)) != (MPIO_HNDL*)0) {
retval = MPIOTxV(io_chan, iovec, iovec_cnt);
MPIOUserRefDec(io_chan);
}
#if defined(NEEDSWORK)
/* NEEDSWORK:
Should we do a mux-close if tx fails?
Problem with mux-close is that it wants to call
remote-close for all the mux'ed sessions. This
can lead to confusing race conditions for the session
that was doing the transmit, and for that session
it really is weird to have a tx request cause an
asynch remote close event. If the remote-side truly
has closed, isn't it best to let either the receive
thread detect it, or in the case of UDP or really
hosed TCP/IP let the upper-level session-heartbeats
detect it.? */
if(retval < 0) {
MPTRCID(mux_TXCL,
mux, mux->state, mux->flags, 0, 0);
MPMuxClose(mux);
}
#endif /* defined(NEEDSWORK) */
return retval;
}
int
MPMuxTx(struct mpmux_hndl_s* mux,
caddr_t buf,
u_int len) {
MPIO_VEC iovec;
iovec.addr = buf;
iovec.length = len;
return MPMuxTxV(mux, &iovec, 1);
}
/* Function
======== */
u_int
MPMuxGetTlAddr(MPMUX_HNDL* mux,
MPTLADDR* tl_addr,
u_int local_flag) {
MPIO_HNDL* io_chan = (MPIO_HNDL*)0;
u_int failure = true;
if((mux == (MPMUX_HNDL*)0) ||
(tl_addr == (MPTLADDR*)0)) {
MPErrorSet(MPERR_INVALID);
mp_log("000RY", MPLT_ERR, "MPMuxGetTlAddr(%x %x %u): "
"Invalid parameters.",
mux, tl_addr,local_flag);
return failure;
}
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(tl_addr != (MPTLADDR*)0));
io_chan = MPMuxToIOChanLk(mux);
failure = MPIOGetTlAddr(io_chan, tl_addr, local_flag);
if(io_chan != (MPIO_HNDL*)0) {
MPIOUserRefDec(io_chan);
}
return failure;
}
/*
Sub-Module
==========
MuxOptions */
/* Function
--------
MPMuxOptionsSetInternal */
u_int
MPMuxOptionsSetInternal(MPMUX_HNDL* mux,
MPOPT_VEC* opt_vec,
u_int opt_vec_cnt) {
u_int failure = false;
MPOPT_VEC* opt_p;
MPPrecondition((mux != (MPMUX_HNDL*)0) &&
(((opt_vec == (MPOPT_VEC*)0) &&
(opt_vec_cnt == 0)) ||
((opt_vec != (MPOPT_VEC*)0) &&
(opt_vec_cnt > 0))));
for(opt_p = &opt_vec[0]; opt_p < &opt_vec[opt_vec_cnt]; opt_p++) {
switch(opt_p->opt) {
case MPMUX_OPT_PLAIN_CONNECT_TIMEOUT:
mux->plain_connect_timeout = (u_int)opt_p->data;
break;
default:
break;
}
}
return failure;
}
/* Function
--------
MPMuxOptionsSetV */
u_int
MPMuxOptionsSetV(MPMUX_HNDL* mux,
MPOPT_VEC* opt_vec,
u_int opt_vec_cnt) {
u_int failure = true;
MPIO_HNDL* io_chan = (MPIO_HNDL*)0;
if((mux == (MPMUX_HNDL*)0) ||
(opt_vec == (MPOPT_VEC*)0) ||
(opt_vec_cnt == 0)) {
MPErrorSet(MPERR_INVALID);
return failure;
}
if(opt_vec_cnt) {
io_chan = MPMuxToIOChanLk(mux);
failure = MPIOHndlOptionsSetV(io_chan, opt_vec, opt_vec_cnt);
}
else {
failure = false;
}
if(io_chan != (MPIO_HNDL*)0) {
MPIOUserRefDec(io_chan);
}
return failure;
}
/* Function
--------
MPMuxOptionsSet */
u_int
MPMuxOptionsSet(MPMUX_HNDL* mux,
u_int option,
caddr_t option_data) {
u_int failure = true;
MPOPT_VEC opt_vec[1];
opt_vec[0].opt = option;
opt_vec[0].data = option_data;
failure = MPMuxOptionsSetV(mux, &opt_vec[0], 1);
return failure;
}
/* Function
--------
MPMuxOptionsGet */
u_int
MPMuxOptionsGet(MPMUX_HNDL* mux,
u_int opt,
caddr_t opt_data) {
u_int failure = true;
MPIO_HNDL* io_chan = (MPIO_HNDL*)0;
if(mux == (MPMUX_HNDL*)0) {
MPErrorSet(MPERR_INVALID);
return failure;
}
io_chan = MPMuxToIOChanLk(mux);
failure = MPIOHndlOptionsGet(io_chan, opt, opt_data);
if(io_chan != (MPIO_HNDL*)0) {
MPIOUserRefDec(io_chan);
}
return failure;
}
/* Sub-Module
==========
Mux statistics */
/* Function
--------
MPMuxIOStatisticsGet
Returns statistics for a mux's io-channel */
u_int
MPMuxIOStatisticsGet(MPMUX_HNDL* mux,
MPIO_STAT *stat,
u_int *len) {
u_int failure = true;
MPIO_HNDL* io_chan = (MPIO_HNDL*)0;
if((mux == (MPMUX_HNDL*)0) ||
(stat == (MPIO_STAT*)0) ||
(len == (u_int*)0)) {
MPErrorSet(MPERR_INVALID);
return failure;
}
/* No, mux specific statistics, just fall through to
io-layer. */
io_chan = MPMuxToIOChanLk(mux);
failure = MPIOStatisticsGet(io_chan, stat, len);
if(io_chan != (MPIO_HNDL*)0) {
MPIOUserRefDec(io_chan);
}
return failure;
}
/* Sub-Module
==========
Module initialization/de-initialization entry points */
STATIC u_int MPMuxModuleInitCount = 0;
void
MPMuxModuleInit(MPOPT_VEC* opt_vec, u_int opt_vec_cnt) {
MPMuxModuleInitCount++;
if(MPMuxModuleInitCount > 1) {
return;
}
/* Scan option-vector for global options. */
/* Initialize the MPath I/O Module */
MPIOModuleInit(opt_vec, opt_vec_cnt);
MPTRC(mux_mini, 0, 0, 0, 0);
MPMuxProcId = MPGetProcId();
MPLockCreate(&MPMuxLockHndl, "mpmux:MPMuxLockHndl");
MPEventCreate(&MPMuxCloseEvent, (char*)0);
MPMuxHndlHeapInit();
MPMuxRxBufHeapInit();
return;
}
void
MPMuxModuleCleanup() {
if(MPMuxModuleInitCount != 0) {
MPMuxModuleInitCount--;
}
if(MPMuxModuleInitCount != 0) {
return;
}
MPIOModuleCleanup();
MPTRC(mux_mcle, 0, 0, 0, 0);
MPMuxRxBufHeapCleanup();
MPMuxHndlHeapCleanup();
MPLockDestroy(&MPMuxLockHndl);
MPEventDestroy(&MPMuxCloseEvent);
return;
}