diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2016-08-03 02:39:09 +0300 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2016-08-03 02:39:09 +0300 |
commit | 72b5ac54d620b29cae23d25f0405f2765b466f72 (patch) | |
tree | 276e6313a16f0821cb3da7061372f37b0feb8ace /fs/ceph/mds_client.c | |
parent | c7fac299672ee98a1da90ea2e473180fc75d2c53 (diff) | |
parent | a0f2b65275413b3438e9f55b1427273cd893c3b2 (diff) | |
download | linux-72b5ac54d620b29cae23d25f0405f2765b466f72.tar.xz |
Merge tag 'ceph-for-4.8-rc1' of git://github.com/ceph/ceph-client
Pull Ceph updates from Ilya Dryomov:
"The highlights are:
- RADOS namespace support in libceph and CephFS (Zheng Yan and
myself). The stopgaps added in 4.5 to deny access to inodes in
namespaces are removed and CEPH_FEATURE_FS_FILE_LAYOUT_V2 feature
bit is now fully supported
- A large rework of the MDS cap flushing code (Zheng Yan)
- Handle some of ->d_revalidate() in RCU mode (Jeff Layton). We were
overly pessimistic before, bailing at the first sight of LOOKUP_RCU
On top of that we've got a few CephFS bug fixes, a couple of cleanups
and Arnd's workaround for a weird genksyms issue"
* tag 'ceph-for-4.8-rc1' of git://github.com/ceph/ceph-client: (34 commits)
ceph: fix symbol versioning for ceph_monc_do_statfs
ceph: Correctly return NXIO errors from ceph_llseek
ceph: Mark the file cache as unreclaimable
ceph: optimize cap flush waiting
ceph: cleanup ceph_flush_snaps()
ceph: kick cap flushes before sending other cap message
ceph: introduce an inode flag to indicates if snapflush is needed
ceph: avoid sending duplicated cap flush message
ceph: unify cap flush and snapcap flush
ceph: use list instead of rbtree to track cap flushes
ceph: update types of some local varibles
ceph: include 'follows' of pending snapflush in cap reconnect message
ceph: update cap reconnect message to version 3
ceph: mount non-default filesystem by name
libceph: fsmap.user subscription support
ceph: handle LOOKUP_RCU in ceph_d_revalidate
ceph: allow dentry_lease_is_valid to work under RCU walk
ceph: clear d_fsinfo pointer under d_lock
ceph: remove ceph_mdsc_lease_release
ceph: don't use ->d_time
...
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 358 |
1 files changed, 189 insertions, 169 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 4e8678a612b6..fa59a85226b2 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -48,7 +48,7 @@ struct ceph_reconnect_state { int nr_caps; struct ceph_pagelist *pagelist; - bool flock; + unsigned msg_version; }; static void __wake_requests(struct ceph_mds_client *mdsc, @@ -100,12 +100,15 @@ static int parse_reply_info_in(void **p, void *end, } else info->inline_version = CEPH_INLINE_NONE; + info->pool_ns_len = 0; + info->pool_ns_data = NULL; if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { ceph_decode_32_safe(p, end, info->pool_ns_len, bad); - ceph_decode_need(p, end, info->pool_ns_len, bad); - *p += info->pool_ns_len; - } else { - info->pool_ns_len = 0; + if (info->pool_ns_len > 0) { + ceph_decode_need(p, end, info->pool_ns_len, bad); + info->pool_ns_data = *p; + *p += info->pool_ns_len; + } } return 0; @@ -469,7 +472,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); INIT_LIST_HEAD(&s->s_cap_flushing); - INIT_LIST_HEAD(&s->s_cap_snaps_flushing); dout("register_session mds%d\n", mds); if (mds >= mdsc->max_sessions) { @@ -1145,19 +1147,17 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) invalidate = true; - while (true) { - struct rb_node *n = rb_first(&ci->i_cap_flush_tree); - if (!n) - break; - cf = rb_entry(n, struct ceph_cap_flush, i_node); - rb_erase(&cf->i_node, &ci->i_cap_flush_tree); - list_add(&cf->list, &to_remove); + while (!list_empty(&ci->i_cap_flush_list)) { + cf = list_first_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); + list_add(&cf->i_list, &to_remove); } spin_lock(&mdsc->cap_dirty_lock); - list_for_each_entry(cf, &to_remove, list) - rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + list_for_each_entry(cf, &to_remove, i_list) + list_del(&cf->g_list); if (!list_empty(&ci->i_dirty_item)) { pr_warn_ratelimited( @@ -1181,7 +1181,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_unlock(&mdsc->cap_dirty_lock); if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { - list_add(&ci->i_prealloc_cap_flush->list, &to_remove); + list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); ci->i_prealloc_cap_flush = NULL; } } @@ -1189,8 +1189,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, while (!list_empty(&to_remove)) { struct ceph_cap_flush *cf; cf = list_first_entry(&to_remove, - struct ceph_cap_flush, list); - list_del(&cf->list); + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); ceph_free_cap_flush(cf); } @@ -1212,6 +1212,8 @@ static void remove_session_caps(struct ceph_mds_session *session) dout("remove_session_caps on %p\n", session); iterate_session_caps(session, remove_session_caps_cb, fsc); + wake_up_all(&fsc->mdsc->cap_flushing_wq); + spin_lock(&session->s_cap_lock); if (session->s_nr_caps > 0) { struct inode *inode; @@ -1478,35 +1480,21 @@ static int trim_caps(struct ceph_mds_client *mdsc, return 0; } -static int check_capsnap_flush(struct ceph_inode_info *ci, - u64 want_snap_seq) -{ - int ret = 1; - spin_lock(&ci->i_ceph_lock); - if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { - struct ceph_cap_snap *capsnap = - list_first_entry(&ci->i_cap_snaps, - struct ceph_cap_snap, ci_item); - ret = capsnap->follows >= want_snap_seq; - } - spin_unlock(&ci->i_ceph_lock); - return ret; -} - static int check_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_tid) { - struct rb_node *n; - struct ceph_cap_flush *cf; int ret = 1; spin_lock(&mdsc->cap_dirty_lock); - n = rb_first(&mdsc->cap_flush_tree); - cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; - if (cf && cf->tid <= want_flush_tid) { - dout("check_caps_flush still flushing tid %llu <= %llu\n", - cf->tid, want_flush_tid); - ret = 0; + if (!list_empty(&mdsc->cap_flush_list)) { + struct ceph_cap_flush *cf = + list_first_entry(&mdsc->cap_flush_list, + struct ceph_cap_flush, g_list); + if (cf->tid <= want_flush_tid) { + dout("check_caps_flush still flushing tid " + "%llu <= %llu\n", cf->tid, want_flush_tid); + ret = 0; + } } spin_unlock(&mdsc->cap_dirty_lock); return ret; @@ -1518,54 +1506,9 @@ static int check_caps_flush(struct ceph_mds_client *mdsc, * returns true if we've flushed through want_flush_tid */ static void wait_caps_flush(struct ceph_mds_client *mdsc, - u64 want_flush_tid, u64 want_snap_seq) + u64 want_flush_tid) { - int mds; - - dout("check_caps_flush want %llu snap want %llu\n", - want_flush_tid, want_snap_seq); - mutex_lock(&mdsc->mutex); - for (mds = 0; mds < mdsc->max_sessions; ) { - struct ceph_mds_session *session = mdsc->sessions[mds]; - struct inode *inode = NULL; - - if (!session) { - mds++; - continue; - } - get_session(session); - mutex_unlock(&mdsc->mutex); - - mutex_lock(&session->s_mutex); - if (!list_empty(&session->s_cap_snaps_flushing)) { - struct ceph_cap_snap *capsnap = - list_first_entry(&session->s_cap_snaps_flushing, - struct ceph_cap_snap, - flushing_item); - struct ceph_inode_info *ci = capsnap->ci; - if (!check_capsnap_flush(ci, want_snap_seq)) { - dout("check_cap_flush still flushing snap %p " - "follows %lld <= %lld to mds%d\n", - &ci->vfs_inode, capsnap->follows, - want_snap_seq, mds); - inode = igrab(&ci->vfs_inode); - } - } - mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); - - if (inode) { - wait_event(mdsc->cap_flushing_wq, - check_capsnap_flush(ceph_inode(inode), - want_snap_seq)); - iput(inode); - } else { - mds++; - } - - mutex_lock(&mdsc->mutex); - } - mutex_unlock(&mdsc->mutex); + dout("check_caps_flush want %llu\n", want_flush_tid); wait_event(mdsc->cap_flushing_wq, check_caps_flush(mdsc, want_flush_tid)); @@ -2163,6 +2106,11 @@ static int __do_request(struct ceph_mds_client *mdsc, mds = __choose_mds(mdsc, req); if (mds < 0 || ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { + if (mdsc->mdsmap_err) { + err = mdsc->mdsmap_err; + dout("do_request mdsmap err %d\n", err); + goto finish; + } dout("do_request no mds or not active, waiting for map\n"); list_add(&req->r_wait, &mdsc->waiting_for_map); goto out; @@ -2292,14 +2240,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - /* deny access to directories with pool_ns layouts */ - if (req->r_inode && S_ISDIR(req->r_inode->i_mode) && - ceph_inode(req->r_inode)->i_pool_ns_len) - return -EIO; - if (req->r_locked_dir && - ceph_inode(req->r_locked_dir)->i_pool_ns_len) - return -EIO; - /* issue */ mutex_lock(&mdsc->mutex); __register_request(mdsc, req, dir); @@ -2791,13 +2731,13 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, struct ceph_mds_cap_reconnect v2; struct ceph_mds_cap_reconnect_v1 v1; } rec; - size_t reclen; struct ceph_inode_info *ci; struct ceph_reconnect_state *recon_state = arg; struct ceph_pagelist *pagelist = recon_state->pagelist; char *path; int pathlen, err; u64 pathbase; + u64 snap_follows; struct dentry *dentry; ci = cap->ci; @@ -2820,9 +2760,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, path = NULL; pathlen = 0; } - err = ceph_pagelist_encode_string(pagelist, path, pathlen); - if (err) - goto out_free; spin_lock(&ci->i_ceph_lock); cap->seq = 0; /* reset cap seq */ @@ -2830,14 +2767,13 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, cap->mseq = 0; /* and migrate_seq */ cap->cap_gen = cap->session->s_cap_gen; - if (recon_state->flock) { + if (recon_state->msg_version >= 2) { rec.v2.cap_id = cpu_to_le64(cap->cap_id); rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); rec.v2.issued = cpu_to_le32(cap->issued); rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); rec.v2.pathbase = cpu_to_le64(pathbase); rec.v2.flock_len = 0; - reclen = sizeof(rec.v2); } else { rec.v1.cap_id = cpu_to_le64(cap->cap_id); rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); @@ -2847,13 +2783,23 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); rec.v1.pathbase = cpu_to_le64(pathbase); - reclen = sizeof(rec.v1); + } + + if (list_empty(&ci->i_cap_snaps)) { + snap_follows = 0; + } else { + struct ceph_cap_snap *capsnap = + list_first_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, ci_item); + snap_follows = capsnap->follows; } spin_unlock(&ci->i_ceph_lock); - if (recon_state->flock) { + if (recon_state->msg_version >= 2) { int num_fcntl_locks, num_flock_locks; struct ceph_filelock *flocks; + size_t struct_len, total_len = 0; + u8 struct_v = 0; encode_again: ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); @@ -2872,20 +2818,51 @@ encode_again: goto encode_again; goto out_free; } + + if (recon_state->msg_version >= 3) { + /* version, compat_version and struct_len */ + total_len = 2 * sizeof(u8) + sizeof(u32); + struct_v = 2; + } /* * number of encoded locks is stable, so copy to pagelist */ - rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) + - (num_fcntl_locks+num_flock_locks) * - sizeof(struct ceph_filelock)); - err = ceph_pagelist_append(pagelist, &rec, reclen); - if (!err) - err = ceph_locks_to_pagelist(flocks, pagelist, - num_fcntl_locks, - num_flock_locks); + struct_len = 2 * sizeof(u32) + + (num_fcntl_locks + num_flock_locks) * + sizeof(struct ceph_filelock); + rec.v2.flock_len = cpu_to_le32(struct_len); + + struct_len += sizeof(rec.v2); + struct_len += sizeof(u32) + pathlen; + + if (struct_v >= 2) + struct_len += sizeof(u64); /* snap_follows */ + + total_len += struct_len; + err = ceph_pagelist_reserve(pagelist, total_len); + + if (!err) { + if (recon_state->msg_version >= 3) { + ceph_pagelist_encode_8(pagelist, struct_v); + ceph_pagelist_encode_8(pagelist, 1); + ceph_pagelist_encode_32(pagelist, struct_len); + } + ceph_pagelist_encode_string(pagelist, path, pathlen); + ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); + ceph_locks_to_pagelist(flocks, pagelist, + num_fcntl_locks, + num_flock_locks); + if (struct_v >= 2) + ceph_pagelist_encode_64(pagelist, snap_follows); + } kfree(flocks); } else { - err = ceph_pagelist_append(pagelist, &rec, reclen); + size_t size = sizeof(u32) + pathlen + sizeof(rec.v1); + err = ceph_pagelist_reserve(pagelist, size); + if (!err) { + ceph_pagelist_encode_string(pagelist, path, pathlen); + ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); + } } recon_state->nr_caps++; @@ -2976,7 +2953,12 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, recon_state.nr_caps = 0; recon_state.pagelist = pagelist; - recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; + if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) + recon_state.msg_version = 3; + else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK) + recon_state.msg_version = 2; + else + recon_state.msg_version = 1; err = iterate_session_caps(session, encode_caps_cb, &recon_state); if (err < 0) goto fail; @@ -3005,8 +2987,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, goto fail; } - if (recon_state.flock) - reply->hdr.version = cpu_to_le16(2); + reply->hdr.version = cpu_to_le16(recon_state.msg_version); /* raced with cap release? */ if (s_nr_caps != recon_state.nr_caps) { @@ -3231,7 +3212,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, msecs_to_jiffies(le32_to_cpu(h->duration_ms)); di->lease_seq = seq; - dentry->d_time = di->lease_renew_from + duration; + di->time = di->lease_renew_from + duration; di->lease_renew_after = di->lease_renew_from + (duration >> 1); di->lease_renew_from = 0; @@ -3297,47 +3278,6 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, } /* - * Preemptively release a lease we expect to invalidate anyway. - * Pass @inode always, @dentry is optional. - */ -void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, - struct dentry *dentry) -{ - struct ceph_dentry_info *di; - struct ceph_mds_session *session; - u32 seq; - - BUG_ON(inode == NULL); - BUG_ON(dentry == NULL); - - /* is dentry lease valid? */ - spin_lock(&dentry->d_lock); - di = ceph_dentry(dentry); - if (!di || !di->lease_session || - di->lease_session->s_mds < 0 || - di->lease_gen != di->lease_session->s_cap_gen || - !time_before(jiffies, dentry->d_time)) { - dout("lease_release inode %p dentry %p -- " - "no lease\n", - inode, dentry); - spin_unlock(&dentry->d_lock); - return; - } - - /* we do have a lease on this dentry; note mds and seq */ - session = ceph_get_mds_session(di->lease_session); - seq = di->lease_seq; - __ceph_mdsc_drop_dentry_lease(dentry); - spin_unlock(&dentry->d_lock); - - dout("lease_release inode %p dentry %p to mds%d\n", - inode, dentry, session->s_mds); - ceph_mdsc_lease_send_msg(session, inode, dentry, - CEPH_MDS_LEASE_RELEASE, seq); - ceph_put_mds_session(session); -} - -/* * drop all leases (and dentry refs) in preparation for umount */ static void drop_leases(struct ceph_mds_client *mdsc) @@ -3470,7 +3410,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); mdsc->last_cap_flush_tid = 1; - mdsc->cap_flush_tree = RB_ROOT; + INIT_LIST_HEAD(&mdsc->cap_flush_list); INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; @@ -3585,7 +3525,7 @@ restart: void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { - u64 want_tid, want_flush, want_snap; + u64 want_tid, want_flush; if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) return; @@ -3598,17 +3538,19 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ceph_flush_dirty_caps(mdsc); spin_lock(&mdsc->cap_dirty_lock); want_flush = mdsc->last_cap_flush_tid; + if (!list_empty(&mdsc->cap_flush_list)) { + struct ceph_cap_flush *cf = + list_last_entry(&mdsc->cap_flush_list, + struct ceph_cap_flush, g_list); + cf->wake = true; + } spin_unlock(&mdsc->cap_dirty_lock); - down_read(&mdsc->snap_rwsem); - want_snap = mdsc->last_snap_seq; - up_read(&mdsc->snap_rwsem); - - dout("sync want tid %lld flush_seq %lld snap_seq %lld\n", - want_tid, want_flush, want_snap); + dout("sync want tid %lld flush_seq %lld\n", + want_tid, want_flush); wait_unsafe_requests(mdsc, want_tid); - wait_caps_flush(mdsc, want_flush, want_snap); + wait_caps_flush(mdsc, want_flush); } /* @@ -3729,11 +3671,86 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc) dout("mdsc_destroy %p done\n", mdsc); } +void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) +{ + struct ceph_fs_client *fsc = mdsc->fsc; + const char *mds_namespace = fsc->mount_options->mds_namespace; + void *p = msg->front.iov_base; + void *end = p + msg->front.iov_len; + u32 epoch; + u32 map_len; + u32 num_fs; + u32 mount_fscid = (u32)-1; + u8 struct_v, struct_cv; + int err = -EINVAL; + + ceph_decode_need(&p, end, sizeof(u32), bad); + epoch = ceph_decode_32(&p); + + dout("handle_fsmap epoch %u\n", epoch); + + ceph_decode_need(&p, end, 2 + sizeof(u32), bad); + struct_v = ceph_decode_8(&p); + struct_cv = ceph_decode_8(&p); + map_len = ceph_decode_32(&p); + + ceph_decode_need(&p, end, sizeof(u32) * 3, bad); + p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ + + num_fs = ceph_decode_32(&p); + while (num_fs-- > 0) { + void *info_p, *info_end; + u32 info_len; + u8 info_v, info_cv; + u32 fscid, namelen; + + ceph_decode_need(&p, end, 2 + sizeof(u32), bad); + info_v = ceph_decode_8(&p); + info_cv = ceph_decode_8(&p); + info_len = ceph_decode_32(&p); + ceph_decode_need(&p, end, info_len, bad); + info_p = p; + info_end = p + info_len; + p = info_end; + + ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); + fscid = ceph_decode_32(&info_p); + namelen = ceph_decode_32(&info_p); + ceph_decode_need(&info_p, info_end, namelen, bad); + + if (mds_namespace && + strlen(mds_namespace) == namelen && + !strncmp(mds_namespace, (char *)info_p, namelen)) { + mount_fscid = fscid; + break; + } + } + + ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); + if (mount_fscid != (u32)-1) { + fsc->client->monc.fs_cluster_id = mount_fscid; + ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, + 0, true); + ceph_monc_renew_subs(&fsc->client->monc); + } else { + err = -ENOENT; + goto err_out; + } + return; +bad: + pr_err("error decoding fsmap\n"); +err_out: + mutex_lock(&mdsc->mutex); + mdsc->mdsmap_err = -ENOENT; + __wake_requests(mdsc, &mdsc->waiting_for_map); + mutex_unlock(&mdsc->mutex); + return; +} /* * handle mds map update. */ -void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) +void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { u32 epoch; u32 maplen; @@ -3840,7 +3857,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) switch (type) { case CEPH_MSG_MDS_MAP: - ceph_mdsc_handle_map(mdsc, msg); + ceph_mdsc_handle_mdsmap(mdsc, msg); + break; + case CEPH_MSG_FS_MAP_USER: + ceph_mdsc_handle_fsmap(mdsc, msg); break; case CEPH_MSG_CLIENT_SESSION: handle_session(s, msg); |